View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.concurrent.AutoScalingEventExecutorChooserFactory.AutoScalingUtilizationMetric;
19  import io.netty.util.concurrent.EventExecutorChooserFactory.ObservableEventExecutorChooser;
20  
21  import java.util.Collections;
22  import java.util.Iterator;
23  import java.util.LinkedHashSet;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  import static io.netty.util.internal.ObjectUtil.checkPositive;
32  
33  /**
34   * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
35   * the same time.
36   */
37  public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
38  
39      private final EventExecutor[] children;
40      private final Set<EventExecutor> readonlyChildren;
41      private final AtomicInteger terminatedChildren = new AtomicInteger();
42      private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
43      private final EventExecutorChooserFactory.EventExecutorChooser chooser;
44  
45      /**
46       * Create a new instance.
47       *
48       * @param nThreads          the number of threads that will be used by this instance.
49       * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
50       * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
51       */
52      protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
53          this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
54      }
55  
56      /**
57       * Create a new instance.
58       *
59       * @param nThreads          the number of threads that will be used by this instance.
60       * @param executor          the Executor to use, or {@code null} if the default should be used.
61       * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
62       */
63      protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
64          this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
65      }
66  
67      /**
68       * Create a new instance.
69       *
70       * @param nThreads          the number of threads that will be used by this instance.
71       * @param executor          the Executor to use, or {@code null} if the default should be used.
72       * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
73       * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
74       */
75      protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
76                                              EventExecutorChooserFactory chooserFactory, Object... args) {
77          checkPositive(nThreads, "nThreads");
78  
79          if (executor == null) {
80              executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
81          }
82  
83          children = new EventExecutor[nThreads];
84  
85          for (int i = 0; i < nThreads; i ++) {
86              boolean success = false;
87              try {
88                  children[i] = newChild(executor, args);
89                  success = true;
90              } catch (Exception e) {
91                  // TODO: Think about if this is a good exception type
92                  throw new IllegalStateException("failed to create a child event loop", e);
93              } finally {
94                  if (!success) {
95                      for (int j = 0; j < i; j ++) {
96                          children[j].shutdownGracefully();
97                      }
98  
99                      for (int j = 0; j < i; j ++) {
100                         EventExecutor e = children[j];
101                         try {
102                             while (!e.isTerminated()) {
103                                 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
104                             }
105                         } catch (InterruptedException interrupted) {
106                             // Let the caller handle the interruption.
107                             Thread.currentThread().interrupt();
108                             break;
109                         }
110                     }
111                 }
112             }
113         }
114 
115         chooser = chooserFactory.newChooser(children);
116 
117         final FutureListener<Object> terminationListener = future -> {
118             if (terminatedChildren.incrementAndGet() == children.length) {
119                 terminationFuture.setSuccess(null);
120             }
121         };
122 
123         for (EventExecutor e: children) {
124             e.terminationFuture().addListener(terminationListener);
125         }
126 
127         Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
128         Collections.addAll(childrenSet, children);
129         readonlyChildren = Collections.unmodifiableSet(childrenSet);
130     }
131 
132     protected ThreadFactory newDefaultThreadFactory() {
133         return new DefaultThreadFactory(getClass());
134     }
135 
136     @Override
137     public EventExecutor next() {
138         return chooser.next();
139     }
140 
141     @Override
142     public Iterator<EventExecutor> iterator() {
143         return readonlyChildren.iterator();
144     }
145 
146     /**
147      * Return the number of {@link EventExecutor} this implementation uses. This number is the maps
148      * 1:1 to the threads it use.
149      */
150     public final int executorCount() {
151         return children.length;
152     }
153 
154     /**
155      * Returns the number of currently active threads if the group is using an
156      * {@link ObservableEventExecutorChooser}. Otherwise, for a non-scaling group,
157      * this method returns the total number of threads, as all are considered active.
158      *
159      * @return the count of active threads.
160      */
161     public int activeExecutorCount() {
162         if (chooser instanceof ObservableEventExecutorChooser) {
163             return ((ObservableEventExecutorChooser) chooser).activeExecutorCount();
164         }
165         return executorCount();
166     }
167 
168     /**
169      * Returns a list of real-time utilization metrics if the group was configured
170      * with a compatible {@link EventExecutorChooserFactory}, otherwise an empty list.
171      *
172      * @return A list of {@link AutoScalingUtilizationMetric} objects.
173      */
174     public List<AutoScalingUtilizationMetric> executorUtilizations() {
175         if (chooser instanceof ObservableEventExecutorChooser) {
176             return ((ObservableEventExecutorChooser) chooser).executorUtilizations();
177         }
178         return Collections.emptyList();
179     }
180 
181     /**
182      * Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
183      * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
184      *
185      */
186     protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
187 
188     @Override
189     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
190         for (EventExecutor l: children) {
191             l.shutdownGracefully(quietPeriod, timeout, unit);
192         }
193         return terminationFuture();
194     }
195 
196     @Override
197     public Future<?> terminationFuture() {
198         return terminationFuture;
199     }
200 
201     @Override
202     @Deprecated
203     public void shutdown() {
204         for (EventExecutor l: children) {
205             l.shutdown();
206         }
207     }
208 
209     @Override
210     public boolean isShuttingDown() {
211         for (EventExecutor l: children) {
212             if (!l.isShuttingDown()) {
213                 return false;
214             }
215         }
216         return true;
217     }
218 
219     @Override
220     public boolean isShutdown() {
221         for (EventExecutor l: children) {
222             if (!l.isShutdown()) {
223                 return false;
224             }
225         }
226         return true;
227     }
228 
229     @Override
230     public boolean isTerminated() {
231         for (EventExecutor l: children) {
232             if (!l.isTerminated()) {
233                 return false;
234             }
235         }
236         return true;
237     }
238 
239     @Override
240     public boolean awaitTermination(long timeout, TimeUnit unit)
241             throws InterruptedException {
242         long deadline = System.nanoTime() + unit.toNanos(timeout);
243         loop: for (EventExecutor l: children) {
244             for (;;) {
245                 long timeLeft = deadline - System.nanoTime();
246                 if (timeLeft <= 0) {
247                     break loop;
248                 }
249                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
250                     break;
251                 }
252             }
253         }
254         return isTerminated();
255     }
256 }