View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositive;
19  
20  import java.util.Collections;
21  import java.util.Iterator;
22  import java.util.LinkedHashSet;
23  import java.util.Set;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.ThreadFactory;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  /**
30   * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
31   * the same time.
32   */
33  public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
34  
35      private final EventExecutor[] children;
36      private final Set<EventExecutor> readonlyChildren;
37      private final AtomicInteger terminatedChildren = new AtomicInteger();
38      private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
39      private final EventExecutorChooserFactory.EventExecutorChooser chooser;
40  
41      /**
42       * Create a new instance.
43       *
44       * @param nThreads          the number of threads that will be used by this instance.
45       * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
46       * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
47       */
48      protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
49          this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
50      }
51  
52      /**
53       * Create a new instance.
54       *
55       * @param nThreads          the number of threads that will be used by this instance.
56       * @param executor          the Executor to use, or {@code null} if the default should be used.
57       * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
58       */
59      protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
60          this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
61      }
62  
63      /**
64       * Create a new instance.
65       *
66       * @param nThreads          the number of threads that will be used by this instance.
67       * @param executor          the Executor to use, or {@code null} if the default should be used.
68       * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
69       * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
70       */
71      protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
72                                              EventExecutorChooserFactory chooserFactory, Object... args) {
73          checkPositive(nThreads, "nThreads");
74  
75          if (executor == null) {
76              executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
77          }
78  
79          children = new EventExecutor[nThreads];
80  
81          for (int i = 0; i < nThreads; i ++) {
82              boolean success = false;
83              try {
84                  children[i] = newChild(executor, args);
85                  success = true;
86              } catch (Exception e) {
87                  // TODO: Think about if this is a good exception type
88                  throw new IllegalStateException("failed to create a child event loop", e);
89              } finally {
90                  if (!success) {
91                      for (int j = 0; j < i; j ++) {
92                          children[j].shutdownGracefully();
93                      }
94  
95                      for (int j = 0; j < i; j ++) {
96                          EventExecutor e = children[j];
97                          try {
98                              while (!e.isTerminated()) {
99                                  e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
100                             }
101                         } catch (InterruptedException interrupted) {
102                             // Let the caller handle the interruption.
103                             Thread.currentThread().interrupt();
104                             break;
105                         }
106                     }
107                 }
108             }
109         }
110 
111         chooser = chooserFactory.newChooser(children);
112 
113         final FutureListener<Object> terminationListener = new FutureListener<Object>() {
114             @Override
115             public void operationComplete(Future<Object> future) throws Exception {
116                 if (terminatedChildren.incrementAndGet() == children.length) {
117                     terminationFuture.setSuccess(null);
118                 }
119             }
120         };
121 
122         for (EventExecutor e: children) {
123             e.terminationFuture().addListener(terminationListener);
124         }
125 
126         Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
127         Collections.addAll(childrenSet, children);
128         readonlyChildren = Collections.unmodifiableSet(childrenSet);
129     }
130 
131     protected ThreadFactory newDefaultThreadFactory() {
132         return new DefaultThreadFactory(getClass());
133     }
134 
135     @Override
136     public EventExecutor next() {
137         return chooser.next();
138     }
139 
140     @Override
141     public Iterator<EventExecutor> iterator() {
142         return readonlyChildren.iterator();
143     }
144 
145     /**
146      * Return the number of {@link EventExecutor} this implementation uses. This number is the maps
147      * 1:1 to the threads it use.
148      */
149     public final int executorCount() {
150         return children.length;
151     }
152 
153     /**
154      * Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
155      * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
156      *
157      */
158     protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
159 
160     @Override
161     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
162         for (EventExecutor l: children) {
163             l.shutdownGracefully(quietPeriod, timeout, unit);
164         }
165         return terminationFuture();
166     }
167 
168     @Override
169     public Future<?> terminationFuture() {
170         return terminationFuture;
171     }
172 
173     @Override
174     @Deprecated
175     public void shutdown() {
176         for (EventExecutor l: children) {
177             l.shutdown();
178         }
179     }
180 
181     @Override
182     public boolean isShuttingDown() {
183         for (EventExecutor l: children) {
184             if (!l.isShuttingDown()) {
185                 return false;
186             }
187         }
188         return true;
189     }
190 
191     @Override
192     public boolean isShutdown() {
193         for (EventExecutor l: children) {
194             if (!l.isShutdown()) {
195                 return false;
196             }
197         }
198         return true;
199     }
200 
201     @Override
202     public boolean isTerminated() {
203         for (EventExecutor l: children) {
204             if (!l.isTerminated()) {
205                 return false;
206             }
207         }
208         return true;
209     }
210 
211     @Override
212     public boolean awaitTermination(long timeout, TimeUnit unit)
213             throws InterruptedException {
214         long deadline = System.nanoTime() + unit.toNanos(timeout);
215         loop: for (EventExecutor l: children) {
216             for (;;) {
217                 long timeLeft = deadline - System.nanoTime();
218                 if (timeLeft <= 0) {
219                     break loop;
220                 }
221                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
222                     break;
223                 }
224             }
225         }
226         return isTerminated();
227     }
228 }