View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  
19  import io.netty.util.concurrent.AbstractEventExecutorGroup;
20  import io.netty.util.concurrent.DefaultPromise;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.Future;
23  import io.netty.util.concurrent.FutureListener;
24  import io.netty.util.concurrent.GlobalEventExecutor;
25  import io.netty.util.concurrent.Promise;
26  import io.netty.util.concurrent.ThreadPerTaskExecutor;
27  import io.netty.util.internal.EmptyArrays;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.ReadOnlyIterator;
30  import io.netty.util.internal.ThrowableUtil;
31  
32  import java.util.Collections;
33  import java.util.Iterator;
34  import java.util.Queue;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.Executors;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.TimeUnit;
42  
43  /**
44   * An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
45   */
46  public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
47  
48      private final Object[] childArgs;
49      private final int maxChannels;
50      final Executor executor;
51      final Set<EventLoop> activeChildren =
52              Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
53      final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
54      private final ChannelException tooManyChannels;
55  
56      private volatile boolean shuttingDown;
57      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
58      private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
59          @Override
60          public void operationComplete(Future<Object> future) throws Exception {
61              // Inefficient, but works.
62              if (isTerminated()) {
63                  terminationFuture.trySuccess(null);
64              }
65          }
66      };
67  
68      /**
69       * Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
70       */
71      protected ThreadPerChannelEventLoopGroup() {
72          this(0);
73      }
74  
75      /**
76       * Create a new {@link ThreadPerChannelEventLoopGroup}.
77       *
78       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
79       *                          a new {@link Channel} and the maximum is exceed it will throw an
80       *                          {@link ChannelException}. on the {@link #register(Channel)} and
81       *                          {@link #register(ChannelPromise)} method.
82       *                          Use {@code 0} to use no limit
83       */
84      protected ThreadPerChannelEventLoopGroup(int maxChannels) {
85          this(maxChannels, Executors.defaultThreadFactory());
86      }
87  
88      /**
89       * Create a new {@link ThreadPerChannelEventLoopGroup}.
90       *
91       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
92       *                          a new {@link Channel} and the maximum is exceed it will throw an
93       *                          {@link ChannelException} on the {@link #register(Channel)} and
94       *                          {@link #register(ChannelPromise)} method.
95       *                          Use {@code 0} to use no limit
96       * @param threadFactory     the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
97       *                          registered {@link Channel}s
98       * @param args              arguments which will passed to each {@link #newChild(Object...)} call.
99       */
100     protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
101         this(maxChannels, new ThreadPerTaskExecutor(threadFactory), args);
102     }
103 
104     /**
105      * Create a new {@link ThreadPerChannelEventLoopGroup}.
106      *
107      * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
108      *                          a new {@link Channel} and the maximum is exceed it will throw an
109      *                          {@link ChannelException} on the {@link #register(Channel)} and
110      *                          {@link #register(ChannelPromise)} method.
111      *                          Use {@code 0} to use no limit
112      * @param executor          the {@link Executor} used to create new {@link Thread} instances that handle the
113      *                          registered {@link Channel}s
114      * @param args              arguments which will passed to each {@link #newChild(Object...)} call.
115      */
116     protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
117         if (maxChannels < 0) {
118             throw new IllegalArgumentException(String.format(
119                     "maxChannels: %d (expected: >= 0)", maxChannels));
120         }
121         if (executor == null) {
122             throw new NullPointerException("executor");
123         }
124 
125         if (args == null) {
126             childArgs = EmptyArrays.EMPTY_OBJECTS;
127         } else {
128             childArgs = args.clone();
129         }
130 
131         this.maxChannels = maxChannels;
132         this.executor = executor;
133 
134         tooManyChannels = ThrowableUtil.unknownStackTrace(
135                 new ChannelException("too many channels (max: " + maxChannels + ')'),
136                 ThreadPerChannelEventLoopGroup.class, "nextChild()");
137     }
138 
139     /**
140      * Creates a new {@link EventLoop}.  The default implementation creates a new {@link ThreadPerChannelEventLoop}.
141      */
142     protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
143         return new ThreadPerChannelEventLoop(this);
144     }
145 
146     @Override
147     public Iterator<EventExecutor> iterator() {
148         return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
149     }
150 
151     @Override
152     public EventLoop next() {
153         throw new UnsupportedOperationException();
154     }
155 
156     @Override
157     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
158         shuttingDown = true;
159 
160         for (EventLoop l: activeChildren) {
161             l.shutdownGracefully(quietPeriod, timeout, unit);
162         }
163         for (EventLoop l: idleChildren) {
164             l.shutdownGracefully(quietPeriod, timeout, unit);
165         }
166 
167         // Notify the future if there was no children.
168         if (isTerminated()) {
169             terminationFuture.trySuccess(null);
170         }
171 
172         return terminationFuture();
173     }
174 
175     @Override
176     public Future<?> terminationFuture() {
177         return terminationFuture;
178     }
179 
180     @Override
181     @Deprecated
182     public void shutdown() {
183         shuttingDown = true;
184 
185         for (EventLoop l: activeChildren) {
186             l.shutdown();
187         }
188         for (EventLoop l: idleChildren) {
189             l.shutdown();
190         }
191 
192         // Notify the future if there was no children.
193         if (isTerminated()) {
194             terminationFuture.trySuccess(null);
195         }
196     }
197 
198     @Override
199     public boolean isShuttingDown() {
200         for (EventLoop l: activeChildren) {
201             if (!l.isShuttingDown()) {
202                 return false;
203             }
204         }
205         for (EventLoop l: idleChildren) {
206             if (!l.isShuttingDown()) {
207                 return false;
208             }
209         }
210         return true;
211     }
212 
213     @Override
214     public boolean isShutdown() {
215         for (EventLoop l: activeChildren) {
216             if (!l.isShutdown()) {
217                 return false;
218             }
219         }
220         for (EventLoop l: idleChildren) {
221             if (!l.isShutdown()) {
222                 return false;
223             }
224         }
225         return true;
226     }
227 
228     @Override
229     public boolean isTerminated() {
230         for (EventLoop l: activeChildren) {
231             if (!l.isTerminated()) {
232                 return false;
233             }
234         }
235         for (EventLoop l: idleChildren) {
236             if (!l.isTerminated()) {
237                 return false;
238             }
239         }
240         return true;
241     }
242 
243     @Override
244     public boolean awaitTermination(long timeout, TimeUnit unit)
245             throws InterruptedException {
246         long deadline = System.nanoTime() + unit.toNanos(timeout);
247         for (EventLoop l: activeChildren) {
248             for (;;) {
249                 long timeLeft = deadline - System.nanoTime();
250                 if (timeLeft <= 0) {
251                     return isTerminated();
252                 }
253                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
254                     break;
255                 }
256             }
257         }
258         for (EventLoop l: idleChildren) {
259             for (;;) {
260                 long timeLeft = deadline - System.nanoTime();
261                 if (timeLeft <= 0) {
262                     return isTerminated();
263                 }
264                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
265                     break;
266                 }
267             }
268         }
269         return isTerminated();
270     }
271 
272     @Override
273     public ChannelFuture register(Channel channel) {
274         if (channel == null) {
275             throw new NullPointerException("channel");
276         }
277         try {
278             EventLoop l = nextChild();
279             return l.register(new DefaultChannelPromise(channel, l));
280         } catch (Throwable t) {
281             return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
282         }
283     }
284 
285     @Override
286     public ChannelFuture register(ChannelPromise promise) {
287         try {
288             return nextChild().register(promise);
289         } catch (Throwable t) {
290             promise.setFailure(t);
291             return promise;
292         }
293     }
294 
295     @Deprecated
296     @Override
297     public ChannelFuture register(Channel channel, ChannelPromise promise) {
298         if (channel == null) {
299             throw new NullPointerException("channel");
300         }
301         try {
302             return nextChild().register(channel, promise);
303         } catch (Throwable t) {
304             promise.setFailure(t);
305             return promise;
306         }
307     }
308 
309     private EventLoop nextChild() throws Exception {
310         if (shuttingDown) {
311             throw new RejectedExecutionException("shutting down");
312         }
313 
314         EventLoop loop = idleChildren.poll();
315         if (loop == null) {
316             if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
317                 throw tooManyChannels;
318             }
319             loop = newChild(childArgs);
320             loop.terminationFuture().addListener(childTerminationListener);
321         }
322         activeChildren.add(loop);
323         return loop;
324     }
325 }