View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  
19  import io.netty.util.concurrent.AbstractEventExecutorGroup;
20  import io.netty.util.concurrent.DefaultPromise;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.Future;
23  import io.netty.util.concurrent.FutureListener;
24  import io.netty.util.concurrent.GlobalEventExecutor;
25  import io.netty.util.concurrent.Promise;
26  import io.netty.util.internal.EmptyArrays;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.ReadOnlyIterator;
29  import io.netty.util.internal.ThrowableUtil;
30  
31  import java.util.Collections;
32  import java.util.Iterator;
33  import java.util.Queue;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentLinkedQueue;
36  import java.util.concurrent.Executors;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.ThreadFactory;
39  import java.util.concurrent.TimeUnit;
40  
41  /**
42   * An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
43   */
44  public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
45  
46      private final Object[] childArgs;
47      private final int maxChannels;
48      final ThreadFactory threadFactory;
49      final Set<ThreadPerChannelEventLoop> activeChildren =
50              Collections.newSetFromMap(PlatformDependent.<ThreadPerChannelEventLoop, Boolean>newConcurrentHashMap());
51      final Queue<ThreadPerChannelEventLoop> idleChildren = new ConcurrentLinkedQueue<ThreadPerChannelEventLoop>();
52      private final ChannelException tooManyChannels;
53  
54      private volatile boolean shuttingDown;
55      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
56      private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
57          @Override
58          public void operationComplete(Future<Object> future) throws Exception {
59              // Inefficient, but works.
60              if (isTerminated()) {
61                  terminationFuture.trySuccess(null);
62              }
63          }
64      };
65  
66      /**
67       * Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
68       */
69      protected ThreadPerChannelEventLoopGroup() {
70          this(0);
71      }
72  
73      /**
74       * Create a new {@link ThreadPerChannelEventLoopGroup}.
75       *
76       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
77       *                          a new {@link Channel} and the maximum is exceed it will throw an
78       *                          {@link ChannelException} on the {@link #register(Channel)} and
79       *                          {@link #register(Channel, ChannelPromise)} method.
80       *                          Use {@code 0} to use no limit
81       */
82      protected ThreadPerChannelEventLoopGroup(int maxChannels) {
83          this(maxChannels, Executors.defaultThreadFactory());
84      }
85  
86      /**
87       * Create a new {@link ThreadPerChannelEventLoopGroup}.
88       *
89       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
90       *                          a new {@link Channel} and the maximum is exceed it will throw an
91       *                          {@link ChannelException} on the {@link #register(Channel)} and
92       *                          {@link #register(Channel, ChannelPromise)} method.
93       *                          Use {@code 0} to use no limit
94       * @param threadFactory     the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
95       *                          registered {@link Channel}s
96       * @param args              arguments which will passed to each {@link #newChild(Object...)} call.
97       */
98      protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
99          if (maxChannels < 0) {
100             throw new IllegalArgumentException(String.format(
101                     "maxChannels: %d (expected: >= 0)", maxChannels));
102         }
103         if (threadFactory == null) {
104             throw new NullPointerException("threadFactory");
105         }
106 
107         if (args == null) {
108             childArgs = EmptyArrays.EMPTY_OBJECTS;
109         } else {
110             childArgs = args.clone();
111         }
112 
113         this.maxChannels = maxChannels;
114         this.threadFactory = threadFactory;
115 
116         tooManyChannels = ThrowableUtil.unknownStackTrace(
117                 new ChannelException("too many channels (max: " + maxChannels + ')'),
118                 ThreadPerChannelEventLoopGroup.class, "nextChild()");
119     }
120 
121     /**
122      * Creates a new {@link EventLoop}.  The default implementation creates a new {@link ThreadPerChannelEventLoop}.
123      */
124     protected ThreadPerChannelEventLoop newChild(
125             @SuppressWarnings("UnusedParameters") Object... args) throws Exception {
126         return new ThreadPerChannelEventLoop(this);
127     }
128 
129     @Override
130     public Iterator<EventExecutor> iterator() {
131         return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
132     }
133 
134     @Override
135     public EventLoop next() {
136         throw new UnsupportedOperationException();
137     }
138 
139     @Override
140     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
141         shuttingDown = true;
142 
143         for (EventLoop l: activeChildren) {
144             l.shutdownGracefully(quietPeriod, timeout, unit);
145         }
146         for (EventLoop l: idleChildren) {
147             l.shutdownGracefully(quietPeriod, timeout, unit);
148         }
149 
150         // Notify the future if there was no children.
151         if (isTerminated()) {
152             terminationFuture.trySuccess(null);
153         }
154 
155         return terminationFuture();
156     }
157 
158     @Override
159     public Future<?> terminationFuture() {
160         return terminationFuture;
161     }
162 
163     @Override
164     @Deprecated
165     public void shutdown() {
166         shuttingDown = true;
167 
168         for (EventLoop l: activeChildren) {
169             l.shutdown();
170         }
171         for (EventLoop l: idleChildren) {
172             l.shutdown();
173         }
174 
175         // Notify the future if there was no children.
176         if (isTerminated()) {
177             terminationFuture.trySuccess(null);
178         }
179     }
180 
181     @Override
182     public boolean isShuttingDown() {
183         for (EventLoop l: activeChildren) {
184             if (!l.isShuttingDown()) {
185                 return false;
186             }
187         }
188         for (EventLoop l: idleChildren) {
189             if (!l.isShuttingDown()) {
190                 return false;
191             }
192         }
193         return true;
194     }
195 
196     @Override
197     public boolean isShutdown() {
198         for (EventLoop l: activeChildren) {
199             if (!l.isShutdown()) {
200                 return false;
201             }
202         }
203         for (EventLoop l: idleChildren) {
204             if (!l.isShutdown()) {
205                 return false;
206             }
207         }
208         return true;
209     }
210 
211     @Override
212     public boolean isTerminated() {
213         for (EventLoop l: activeChildren) {
214             if (!l.isTerminated()) {
215                 return false;
216             }
217         }
218         for (EventLoop l: idleChildren) {
219             if (!l.isTerminated()) {
220                 return false;
221             }
222         }
223         return true;
224     }
225 
226     @Override
227     public boolean awaitTermination(long timeout, TimeUnit unit)
228             throws InterruptedException {
229         long deadline = System.nanoTime() + unit.toNanos(timeout);
230         for (EventLoop l: activeChildren) {
231             for (;;) {
232                 long timeLeft = deadline - System.nanoTime();
233                 if (timeLeft <= 0) {
234                     return isTerminated();
235                 }
236                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
237                     break;
238                 }
239             }
240         }
241         for (EventLoop l: idleChildren) {
242             for (;;) {
243                 long timeLeft = deadline - System.nanoTime();
244                 if (timeLeft <= 0) {
245                     return isTerminated();
246                 }
247                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
248                     break;
249                 }
250             }
251         }
252         return isTerminated();
253     }
254 
255     @Override
256     public ChannelFuture register(Channel channel) {
257         if (channel == null) {
258             throw new NullPointerException("channel");
259         }
260         try {
261             EventLoop l = nextChild();
262             return l.register(channel, new DefaultChannelPromise(channel, l));
263         } catch (Throwable t) {
264             return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
265         }
266     }
267 
268     @Override
269     public ChannelFuture register(Channel channel, ChannelPromise promise) {
270         if (channel == null) {
271             throw new NullPointerException("channel");
272         }
273         try {
274             return nextChild().register(channel, promise);
275         } catch (Throwable t) {
276             promise.setFailure(t);
277             return promise;
278         }
279     }
280 
281     private EventLoop nextChild() throws Exception {
282         if (shuttingDown) {
283             throw new RejectedExecutionException("shutting down");
284         }
285 
286         ThreadPerChannelEventLoop loop = idleChildren.poll();
287         if (loop == null) {
288             if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
289                 throw tooManyChannels;
290             }
291             loop = newChild(childArgs);
292             loop.terminationFuture().addListener(childTerminationListener);
293         }
294         activeChildren.add(loop);
295         return loop;
296     }
297 }