View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import java.util.Collections;
19  import java.util.LinkedHashSet;
20  import java.util.Set;
21  import java.util.concurrent.Executor;
22  import java.util.concurrent.ExecutorService;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  /**
27   * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
28   * the same time.
29   */
30  public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
31  
32      private final EventExecutor[] children;
33      private final Set<EventExecutor> readonlyChildren;
34      private final AtomicInteger childIndex = new AtomicInteger();
35      private final AtomicInteger terminatedChildren = new AtomicInteger();
36      private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
37      private final EventExecutorChooser chooser;
38  
39      /**
40       * @param nEventExecutors           the number of {@link EventExecutor}s that will be used by this instance.
41       *                                  If {@code executor} is {@code null} this number will also be the parallelism
42       *                                  requested from the default executor. It is generally advised for the number
43       *                                  of {@link EventExecutor}s and the number of {@link Thread}s used by the
44       *                                  {@code executor} to lie very close together.
45       * @param executorServiceFactory    the {@link ExecutorServiceFactory} to use, or {@code null} if the default
46       *                                  should be used.
47       * @param args                      arguments which will passed to each {@link #newChild(Executor, Object...)} call.
48       */
49      protected MultithreadEventExecutorGroup(int nEventExecutors,
50                                              ExecutorServiceFactory executorServiceFactory,
51                                              Object... args) {
52          this(nEventExecutors, executorServiceFactory != null
53                                  ? executorServiceFactory.newExecutorService(nEventExecutors)
54                                  : null,
55               true, args);
56      }
57  
58      /**
59       * @param nEventExecutors   the number of {@link EventExecutor}s that will be used by this instance.
60       *                          If {@code executor} is {@code null} this number will also be the parallelism
61       *                          requested from the default executor. It is generally advised for the number
62       *                          of {@link EventExecutor}s and the number of {@link Thread}s used by the
63       *                          {@code executor} to lie very close together.
64       * @param executor          the {@link Executor} to use, or {@code null} if the default should be used.
65       * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
66       */
67      protected MultithreadEventExecutorGroup(int nEventExecutors, Executor executor, Object... args) {
68          this(nEventExecutors, executor, false, args);
69      }
70  
71      private MultithreadEventExecutorGroup(int nEventExecutors,
72                                            Executor executor,
73                                            boolean shutdownExecutor,
74                                            Object... args) {
75          if (nEventExecutors <= 0) {
76              throw new IllegalArgumentException(
77                      String.format("nEventExecutors: %d (expected: > 0)", nEventExecutors));
78          }
79  
80          if (executor == null) {
81              executor = newDefaultExecutorService(nEventExecutors);
82              shutdownExecutor = true;
83          }
84  
85          children = new EventExecutor[nEventExecutors];
86          if (isPowerOfTwo(children.length)) {
87              chooser = new PowerOfTwoEventExecutorChooser();
88          } else {
89              chooser = new GenericEventExecutorChooser();
90          }
91  
92          for (int i = 0; i < nEventExecutors; i ++) {
93              boolean success = false;
94              try {
95                  children[i] = newChild(executor, args);
96                  success = true;
97              } catch (Exception e) {
98                  // TODO: Think about if this is a good exception type
99                  throw new IllegalStateException("failed to create a child event loop", e);
100             } finally {
101                 if (!success) {
102                     for (int j = 0; j < i; j ++) {
103                         children[j].shutdownGracefully();
104                     }
105 
106                     for (int j = 0; j < i; j ++) {
107                         EventExecutor e = children[j];
108                         try {
109                             while (!e.isTerminated()) {
110                                 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
111                             }
112                         } catch (InterruptedException interrupted) {
113                             // Let the caller handle the interruption.
114                             Thread.currentThread().interrupt();
115                             break;
116                         }
117                     }
118                 }
119             }
120         }
121 
122         final boolean shutdownExecutor0 = shutdownExecutor;
123         final Executor executor0 = executor;
124         final FutureListener<Object> terminationListener = new FutureListener<Object>() {
125             @Override
126             public void operationComplete(Future<Object> future) throws Exception {
127                 if (terminatedChildren.incrementAndGet() == children.length) {
128                     terminationFuture.setSuccess(null);
129                     if (shutdownExecutor0) {
130                         // This cast is correct because shutdownExecutor0 is only try if
131                         // executor0 is of type ExecutorService.
132                         ((ExecutorService) executor0).shutdown();
133                     }
134                 }
135             }
136         };
137 
138         for (EventExecutor e: children) {
139             e.terminationFuture().addListener(terminationListener);
140         }
141 
142         Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
143         Collections.addAll(childrenSet, children);
144         readonlyChildren = Collections.unmodifiableSet(childrenSet);
145     }
146 
147     protected ExecutorService newDefaultExecutorService(int nEventExecutors) {
148         return new DefaultExecutorServiceFactory(getClass()).newExecutorService(nEventExecutors);
149     }
150 
151     @Override
152     public EventExecutor next() {
153         return chooser.next();
154     }
155 
156     /**
157      * Return the number of {@link EventExecutor} this implementation uses. This number is the maps
158      * 1:1 to the threads it use.
159      */
160     public final int executorCount() {
161         return children.length;
162     }
163 
164     @Override
165     @SuppressWarnings("unchecked")
166     public final <E extends EventExecutor> Set<E> children() {
167         return (Set<E>) readonlyChildren;
168     }
169 
170     /**
171      * Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
172      * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
173      *
174      */
175     protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
176 
177     @Override
178     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
179         for (EventExecutor l: children) {
180             l.shutdownGracefully(quietPeriod, timeout, unit);
181         }
182         return terminationFuture();
183     }
184 
185     @Override
186     public Future<?> terminationFuture() {
187         return terminationFuture;
188     }
189 
190     @Override
191     @Deprecated
192     public void shutdown() {
193         for (EventExecutor l: children) {
194             l.shutdown();
195         }
196     }
197 
198     @Override
199     public boolean isShuttingDown() {
200         for (EventExecutor l: children) {
201             if (!l.isShuttingDown()) {
202                 return false;
203             }
204         }
205         return true;
206     }
207 
208     @Override
209     public boolean isShutdown() {
210         for (EventExecutor l: children) {
211             if (!l.isShutdown()) {
212                 return false;
213             }
214         }
215         return true;
216     }
217 
218     @Override
219     public boolean isTerminated() {
220         for (EventExecutor l: children) {
221             if (!l.isTerminated()) {
222                 return false;
223             }
224         }
225         return true;
226     }
227 
228     @Override
229     public boolean awaitTermination(long timeout, TimeUnit unit)
230             throws InterruptedException {
231         long deadline = System.nanoTime() + unit.toNanos(timeout);
232         loop: for (EventExecutor l: children) {
233             for (;;) {
234                 long timeLeft = deadline - System.nanoTime();
235                 if (timeLeft <= 0) {
236                     break loop;
237                 }
238                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
239                     break;
240                 }
241             }
242         }
243         return isTerminated();
244     }
245 
246     private static boolean isPowerOfTwo(int val) {
247         return (val & -val) == val;
248     }
249 
250     private interface EventExecutorChooser {
251         EventExecutor next();
252     }
253 
254     private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
255         @Override
256         public EventExecutor next() {
257             return children[childIndex.getAndIncrement() & children.length - 1];
258         }
259     }
260 
261     private final class GenericEventExecutorChooser implements EventExecutorChooser {
262         @Override
263         public EventExecutor next() {
264             return children[Math.abs(childIndex.getAndIncrement() % children.length)];
265         }
266     }
267 }