View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.concurrent.AutoScalingEventExecutorChooserFactory.AutoScalingUtilizationMetric;
19  import io.netty.util.concurrent.EventExecutorChooserFactory.ObservableEventExecutorChooser;
20  
21  import java.util.Collections;
22  import java.util.Iterator;
23  import java.util.LinkedHashSet;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  import static io.netty.util.internal.ObjectUtil.checkPositive;
32  
33  /**
34   * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
35   * the same time.
36   */
37  public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
38  
39      private final EventExecutor[] children;
40      private final Set<EventExecutor> readonlyChildren;
41      private final AtomicInteger terminatedChildren = new AtomicInteger();
42      private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
43      private final EventExecutorChooserFactory.EventExecutorChooser chooser;
44  
45      /**
46       * Create a new instance.
47       *
48       * @param nThreads          the number of threads that will be used by this instance.
49       * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
50       * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
51       */
52      protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
53          this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
54      }
55  
56      /**
57       * Create a new instance.
58       *
59       * @param nThreads          the number of threads that will be used by this instance.
60       * @param executor          the Executor to use, or {@code null} if the default should be used.
61       * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
62       */
63      protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
64          this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
65      }
66  
67      /**
68       * Create a new instance.
69       *
70       * @param nThreads          the number of threads that will be used by this instance.
71       * @param executor          the Executor to use, or {@code null} if the default should be used.
72       * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
73       * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
74       */
75      protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
76                                              EventExecutorChooserFactory chooserFactory, Object... args) {
77          checkPositive(nThreads, "nThreads");
78  
79          if (executor == null) {
80              executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
81          }
82  
83          children = new EventExecutor[nThreads];
84  
85          for (int i = 0; i < nThreads; i ++) {
86              boolean success = false;
87              try {
88                  children[i] = newChild(executor, args);
89                  success = true;
90              } catch (Exception e) {
91                  // TODO: Think about if this is a good exception type
92                  throw new IllegalStateException("failed to create a child event loop", e);
93              } finally {
94                  if (!success) {
95                      for (int j = 0; j < i; j ++) {
96                          children[j].shutdownGracefully();
97                      }
98  
99                      for (int j = 0; j < i; j ++) {
100                         EventExecutor e = children[j];
101                         try {
102                             while (!e.isTerminated()) {
103                                 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
104                             }
105                         } catch (InterruptedException interrupted) {
106                             // Let the caller handle the interruption.
107                             Thread.currentThread().interrupt();
108                             break;
109                         }
110                     }
111                 }
112             }
113         }
114 
115         chooser = chooserFactory.newChooser(children);
116 
117         final FutureListener<Object> terminationListener = new FutureListener<Object>() {
118             @Override
119             public void operationComplete(Future<Object> future) throws Exception {
120                 if (terminatedChildren.incrementAndGet() == children.length) {
121                     terminationFuture.setSuccess(null);
122                 }
123             }
124         };
125 
126         for (EventExecutor e: children) {
127             e.terminationFuture().addListener(terminationListener);
128         }
129 
130         Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
131         Collections.addAll(childrenSet, children);
132         readonlyChildren = Collections.unmodifiableSet(childrenSet);
133     }
134 
135     protected ThreadFactory newDefaultThreadFactory() {
136         return new DefaultThreadFactory(getClass());
137     }
138 
139     @Override
140     public EventExecutor next() {
141         return chooser.next();
142     }
143 
144     @Override
145     public Iterator<EventExecutor> iterator() {
146         return readonlyChildren.iterator();
147     }
148 
149     /**
150      * Return the number of {@link EventExecutor} this implementation uses. This number is the maps
151      * 1:1 to the threads it use.
152      */
153     public final int executorCount() {
154         return children.length;
155     }
156 
157     /**
158      * Returns the number of currently active threads if the group is using an
159      * {@link ObservableEventExecutorChooser}. Otherwise, for a non-scaling group,
160      * this method returns the total number of threads, as all are considered active.
161      *
162      * @return the count of active threads.
163      */
164     public int activeExecutorCount() {
165         if (chooser instanceof ObservableEventExecutorChooser) {
166             return ((ObservableEventExecutorChooser) chooser).activeExecutorCount();
167         }
168         return executorCount();
169     }
170 
171     /**
172      * Returns a list of real-time utilization metrics if the group was configured
173      * with a compatible {@link EventExecutorChooserFactory}, otherwise an empty list.
174      *
175      * @return A list of {@link AutoScalingUtilizationMetric} objects.
176      */
177     public List<AutoScalingUtilizationMetric> executorUtilizations() {
178         if (chooser instanceof ObservableEventExecutorChooser) {
179             return ((ObservableEventExecutorChooser) chooser).executorUtilizations();
180         }
181         return Collections.emptyList();
182     }
183 
184     /**
185      * Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
186      * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
187      *
188      */
189     protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
190 
191     @Override
192     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
193         for (EventExecutor l: children) {
194             l.shutdownGracefully(quietPeriod, timeout, unit);
195         }
196         return terminationFuture();
197     }
198 
199     @Override
200     public Future<?> terminationFuture() {
201         return terminationFuture;
202     }
203 
204     @Override
205     @Deprecated
206     public void shutdown() {
207         for (EventExecutor l: children) {
208             l.shutdown();
209         }
210     }
211 
212     @Override
213     public boolean isShuttingDown() {
214         for (EventExecutor l: children) {
215             if (!l.isShuttingDown()) {
216                 return false;
217             }
218         }
219         return true;
220     }
221 
222     @Override
223     public boolean isShutdown() {
224         for (EventExecutor l: children) {
225             if (!l.isShutdown()) {
226                 return false;
227             }
228         }
229         return true;
230     }
231 
232     @Override
233     public boolean isTerminated() {
234         for (EventExecutor l: children) {
235             if (!l.isTerminated()) {
236                 return false;
237             }
238         }
239         return true;
240     }
241 
242     @Override
243     public boolean awaitTermination(long timeout, TimeUnit unit)
244             throws InterruptedException {
245         long deadline = System.nanoTime() + unit.toNanos(timeout);
246         loop: for (EventExecutor l: children) {
247             for (;;) {
248                 long timeLeft = deadline - System.nanoTime();
249                 if (timeLeft <= 0) {
250                     break loop;
251                 }
252                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
253                     break;
254                 }
255             }
256         }
257         return isTerminated();
258     }
259 }