View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  
19  import io.netty.util.concurrent.AbstractEventExecutorGroup;
20  import io.netty.util.concurrent.DefaultPromise;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.Future;
23  import io.netty.util.concurrent.FutureListener;
24  import io.netty.util.concurrent.GlobalEventExecutor;
25  import io.netty.util.concurrent.Promise;
26  import io.netty.util.concurrent.ThreadPerTaskExecutor;
27  import io.netty.util.internal.EmptyArrays;
28  import io.netty.util.internal.PlatformDependent;
29  
30  import java.util.Collections;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import java.util.concurrent.Executor;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.RejectedExecutionException;
37  import java.util.concurrent.ThreadFactory;
38  import java.util.concurrent.TimeUnit;
39  
40  /**
41   * An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
42   */
43  public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
44  
45      private final Object[] childArgs;
46      private final int maxChannels;
47      final Executor executor;
48      final Set<EventLoop> activeChildren =
49              Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
50      private final Set<EventLoop> readOnlyActiveChildren = Collections.unmodifiableSet(activeChildren);
51      final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
52      private final ChannelException tooManyChannels;
53  
54      private volatile boolean shuttingDown;
55      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
56      private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
57          @Override
58          public void operationComplete(Future<Object> future) throws Exception {
59              // Inefficient, but works.
60              if (isTerminated()) {
61                  terminationFuture.trySuccess(null);
62              }
63          }
64      };
65  
66      /**
67       * Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
68       */
69      protected ThreadPerChannelEventLoopGroup() {
70          this(0);
71      }
72  
73      /**
74       * Create a new {@link ThreadPerChannelEventLoopGroup}.
75       *
76       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
77       *                          a new {@link Channel} and the maximum is exceed it will throw an
78       *                          {@link ChannelException}. on the {@link #register(Channel)} and
79       *                          {@link #register(Channel, ChannelPromise)} method.
80       *                          Use {@code 0} to use no limit
81       */
82      protected ThreadPerChannelEventLoopGroup(int maxChannels) {
83          this(maxChannels, Executors.defaultThreadFactory());
84      }
85  
86      /**
87       * Create a new {@link ThreadPerChannelEventLoopGroup}.
88       *
89       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
90       *                          a new {@link Channel} and the maximum is exceed it will throw an
91       *                          {@link ChannelException} on the {@link #register(Channel)} and
92       *                          {@link #register(Channel, ChannelPromise)} method.
93       *                          Use {@code 0} to use no limit
94       * @param threadFactory     the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
95       *                          registered {@link Channel}s
96       * @param args              arguments which will passed to each {@link #newChild(Object...)} call.
97       */
98      protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
99          this(maxChannels, new ThreadPerTaskExecutor(threadFactory), args);
100     }
101 
102     /**
103      * Create a new {@link ThreadPerChannelEventLoopGroup}.
104      *
105      * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
106      *                          a new {@link Channel} and the maximum is exceed it will throw an
107      *                          {@link ChannelException} on the {@link #register(Channel)} and
108      *                          {@link #register(Channel, ChannelPromise)} method.
109      *                          Use {@code 0} to use no limit
110      * @param executor          the {@link Executor} used to create new {@link Thread} instances that handle the
111      *                          registered {@link Channel}s
112      * @param args              arguments which will passed to each {@link #newChild(Object...)} call.
113      */
114     protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
115         if (maxChannels < 0) {
116             throw new IllegalArgumentException(String.format(
117                     "maxChannels: %d (expected: >= 0)", maxChannels));
118         }
119         if (executor == null) {
120             throw new NullPointerException("executor");
121         }
122 
123         if (args == null) {
124             childArgs = EmptyArrays.EMPTY_OBJECTS;
125         } else {
126             childArgs = args.clone();
127         }
128 
129         this.maxChannels = maxChannels;
130         this.executor = executor;
131 
132         tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
133         tooManyChannels.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
134     }
135 
136     /**
137      * Creates a new {@link EventLoop}.  The default implementation creates a new {@link ThreadPerChannelEventLoop}.
138      */
139     protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
140         return new ThreadPerChannelEventLoop(this);
141     }
142 
143     @Override
144     @SuppressWarnings("unchecked")
145     public <E extends EventExecutor> Set<E> children() {
146         return (Set<E>) readOnlyActiveChildren;
147     }
148 
149     @Override
150     public EventLoop next() {
151         throw new UnsupportedOperationException();
152     }
153 
154     @Override
155     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
156         shuttingDown = true;
157 
158         for (EventLoop l: activeChildren) {
159             l.shutdownGracefully(quietPeriod, timeout, unit);
160         }
161         for (EventLoop l: idleChildren) {
162             l.shutdownGracefully(quietPeriod, timeout, unit);
163         }
164 
165         // Notify the future if there was no children.
166         if (isTerminated()) {
167             terminationFuture.trySuccess(null);
168         }
169 
170         return terminationFuture();
171     }
172 
173     @Override
174     public Future<?> terminationFuture() {
175         return terminationFuture;
176     }
177 
178     @Override
179     @Deprecated
180     public void shutdown() {
181         shuttingDown = true;
182 
183         for (EventLoop l: activeChildren) {
184             l.shutdown();
185         }
186         for (EventLoop l: idleChildren) {
187             l.shutdown();
188         }
189 
190         // Notify the future if there was no children.
191         if (isTerminated()) {
192             terminationFuture.trySuccess(null);
193         }
194     }
195 
196     @Override
197     public boolean isShuttingDown() {
198         for (EventLoop l: activeChildren) {
199             if (!l.isShuttingDown()) {
200                 return false;
201             }
202         }
203         for (EventLoop l: idleChildren) {
204             if (!l.isShuttingDown()) {
205                 return false;
206             }
207         }
208         return true;
209     }
210 
211     @Override
212     public boolean isShutdown() {
213         for (EventLoop l: activeChildren) {
214             if (!l.isShutdown()) {
215                 return false;
216             }
217         }
218         for (EventLoop l: idleChildren) {
219             if (!l.isShutdown()) {
220                 return false;
221             }
222         }
223         return true;
224     }
225 
226     @Override
227     public boolean isTerminated() {
228         for (EventLoop l: activeChildren) {
229             if (!l.isTerminated()) {
230                 return false;
231             }
232         }
233         for (EventLoop l: idleChildren) {
234             if (!l.isTerminated()) {
235                 return false;
236             }
237         }
238         return true;
239     }
240 
241     @Override
242     public boolean awaitTermination(long timeout, TimeUnit unit)
243             throws InterruptedException {
244         long deadline = System.nanoTime() + unit.toNanos(timeout);
245         for (EventLoop l: activeChildren) {
246             for (;;) {
247                 long timeLeft = deadline - System.nanoTime();
248                 if (timeLeft <= 0) {
249                     return isTerminated();
250                 }
251                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
252                     break;
253                 }
254             }
255         }
256         for (EventLoop l: idleChildren) {
257             for (;;) {
258                 long timeLeft = deadline - System.nanoTime();
259                 if (timeLeft <= 0) {
260                     return isTerminated();
261                 }
262                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
263                     break;
264                 }
265             }
266         }
267         return isTerminated();
268     }
269 
270     @Override
271     public ChannelFuture register(Channel channel) {
272         if (channel == null) {
273             throw new NullPointerException("channel");
274         }
275         try {
276             EventLoop l = nextChild();
277             return l.register(channel, new DefaultChannelPromise(channel, l));
278         } catch (Throwable t) {
279             return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
280         }
281     }
282 
283     @Override
284     public ChannelFuture register(Channel channel, ChannelPromise promise) {
285         if (channel == null) {
286             throw new NullPointerException("channel");
287         }
288         try {
289             return nextChild().register(channel, promise);
290         } catch (Throwable t) {
291             promise.setFailure(t);
292             return promise;
293         }
294     }
295 
296     private EventLoop nextChild() throws Exception {
297         if (shuttingDown) {
298             throw new RejectedExecutionException("shutting down");
299         }
300 
301         EventLoop loop = idleChildren.poll();
302         if (loop == null) {
303             if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
304                 throw tooManyChannels;
305             }
306             loop = newChild(childArgs);
307             loop.terminationFuture().addListener(childTerminationListener);
308         }
309         activeChildren.add(loop);
310         return loop;
311     }
312 }