View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  
19  import io.netty.util.concurrent.AbstractEventExecutorGroup;
20  import io.netty.util.concurrent.DefaultPromise;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.Future;
23  import io.netty.util.concurrent.FutureListener;
24  import io.netty.util.concurrent.GlobalEventExecutor;
25  import io.netty.util.concurrent.Promise;
26  import io.netty.util.concurrent.ThreadPerTaskExecutor;
27  import io.netty.util.internal.EmptyArrays;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.ReadOnlyIterator;
30  import io.netty.util.internal.ThrowableUtil;
31  
32  import java.util.Collections;
33  import java.util.Iterator;
34  import java.util.Queue;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.Executors;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.TimeUnit;
42  
43  /**
44   * An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
45   *
46   * @deprecated this will be remove in the next-major release.
47   */
48  @Deprecated
49  public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
50  
51      private final Object[] childArgs;
52      private final int maxChannels;
53      final Executor executor;
54      final Set<EventLoop> activeChildren =
55              Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
56      final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
57      private final ChannelException tooManyChannels;
58  
59      private volatile boolean shuttingDown;
60      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
61      private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
62          @Override
63          public void operationComplete(Future<Object> future) throws Exception {
64              // Inefficient, but works.
65              if (isTerminated()) {
66                  terminationFuture.trySuccess(null);
67              }
68          }
69      };
70  
71      /**
72       * Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
73       */
74      protected ThreadPerChannelEventLoopGroup() {
75          this(0);
76      }
77  
78      /**
79       * Create a new {@link ThreadPerChannelEventLoopGroup}.
80       *
81       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
82       *                          a new {@link Channel} and the maximum is exceed it will throw an
83       *                          {@link ChannelException}. on the {@link #register(Channel)} and
84       *                          {@link #register(ChannelPromise)} method.
85       *                          Use {@code 0} to use no limit
86       */
87      protected ThreadPerChannelEventLoopGroup(int maxChannels) {
88          this(maxChannels, Executors.defaultThreadFactory());
89      }
90  
91      /**
92       * Create a new {@link ThreadPerChannelEventLoopGroup}.
93       *
94       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
95       *                          a new {@link Channel} and the maximum is exceed it will throw an
96       *                          {@link ChannelException} on the {@link #register(Channel)} and
97       *                          {@link #register(ChannelPromise)} method.
98       *                          Use {@code 0} to use no limit
99       * @param threadFactory     the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
100      *                          registered {@link Channel}s
101      * @param args              arguments which will passed to each {@link #newChild(Object...)} call.
102      */
103     protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
104         this(maxChannels, new ThreadPerTaskExecutor(threadFactory), args);
105     }
106 
107     /**
108      * Create a new {@link ThreadPerChannelEventLoopGroup}.
109      *
110      * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
111      *                          a new {@link Channel} and the maximum is exceed it will throw an
112      *                          {@link ChannelException} on the {@link #register(Channel)} and
113      *                          {@link #register(ChannelPromise)} method.
114      *                          Use {@code 0} to use no limit
115      * @param executor          the {@link Executor} used to create new {@link Thread} instances that handle the
116      *                          registered {@link Channel}s
117      * @param args              arguments which will passed to each {@link #newChild(Object...)} call.
118      */
119     protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
120         if (maxChannels < 0) {
121             throw new IllegalArgumentException(String.format(
122                     "maxChannels: %d (expected: >= 0)", maxChannels));
123         }
124         if (executor == null) {
125             throw new NullPointerException("executor");
126         }
127 
128         if (args == null) {
129             childArgs = EmptyArrays.EMPTY_OBJECTS;
130         } else {
131             childArgs = args.clone();
132         }
133 
134         this.maxChannels = maxChannels;
135         this.executor = executor;
136 
137         tooManyChannels = ThrowableUtil.unknownStackTrace(
138                 new ChannelException("too many channels (max: " + maxChannels + ')'),
139                 ThreadPerChannelEventLoopGroup.class, "nextChild()");
140     }
141 
142     /**
143      * Creates a new {@link EventLoop}.  The default implementation creates a new {@link ThreadPerChannelEventLoop}.
144      */
145     protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
146         return new ThreadPerChannelEventLoop(this);
147     }
148 
149     @Override
150     public Iterator<EventExecutor> iterator() {
151         return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
152     }
153 
154     @Override
155     public EventLoop next() {
156         throw new UnsupportedOperationException();
157     }
158 
159     @Override
160     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
161         shuttingDown = true;
162 
163         for (EventLoop l: activeChildren) {
164             l.shutdownGracefully(quietPeriod, timeout, unit);
165         }
166         for (EventLoop l: idleChildren) {
167             l.shutdownGracefully(quietPeriod, timeout, unit);
168         }
169 
170         // Notify the future if there was no children.
171         if (isTerminated()) {
172             terminationFuture.trySuccess(null);
173         }
174 
175         return terminationFuture();
176     }
177 
178     @Override
179     public Future<?> terminationFuture() {
180         return terminationFuture;
181     }
182 
183     @Override
184     @Deprecated
185     public void shutdown() {
186         shuttingDown = true;
187 
188         for (EventLoop l: activeChildren) {
189             l.shutdown();
190         }
191         for (EventLoop l: idleChildren) {
192             l.shutdown();
193         }
194 
195         // Notify the future if there was no children.
196         if (isTerminated()) {
197             terminationFuture.trySuccess(null);
198         }
199     }
200 
201     @Override
202     public boolean isShuttingDown() {
203         for (EventLoop l: activeChildren) {
204             if (!l.isShuttingDown()) {
205                 return false;
206             }
207         }
208         for (EventLoop l: idleChildren) {
209             if (!l.isShuttingDown()) {
210                 return false;
211             }
212         }
213         return true;
214     }
215 
216     @Override
217     public boolean isShutdown() {
218         for (EventLoop l: activeChildren) {
219             if (!l.isShutdown()) {
220                 return false;
221             }
222         }
223         for (EventLoop l: idleChildren) {
224             if (!l.isShutdown()) {
225                 return false;
226             }
227         }
228         return true;
229     }
230 
231     @Override
232     public boolean isTerminated() {
233         for (EventLoop l: activeChildren) {
234             if (!l.isTerminated()) {
235                 return false;
236             }
237         }
238         for (EventLoop l: idleChildren) {
239             if (!l.isTerminated()) {
240                 return false;
241             }
242         }
243         return true;
244     }
245 
246     @Override
247     public boolean awaitTermination(long timeout, TimeUnit unit)
248             throws InterruptedException {
249         long deadline = System.nanoTime() + unit.toNanos(timeout);
250         for (EventLoop l: activeChildren) {
251             for (;;) {
252                 long timeLeft = deadline - System.nanoTime();
253                 if (timeLeft <= 0) {
254                     return isTerminated();
255                 }
256                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
257                     break;
258                 }
259             }
260         }
261         for (EventLoop l: idleChildren) {
262             for (;;) {
263                 long timeLeft = deadline - System.nanoTime();
264                 if (timeLeft <= 0) {
265                     return isTerminated();
266                 }
267                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
268                     break;
269                 }
270             }
271         }
272         return isTerminated();
273     }
274 
275     @Override
276     public ChannelFuture register(Channel channel) {
277         if (channel == null) {
278             throw new NullPointerException("channel");
279         }
280         try {
281             EventLoop l = nextChild();
282             return l.register(new DefaultChannelPromise(channel, l));
283         } catch (Throwable t) {
284             return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
285         }
286     }
287 
288     @Override
289     public ChannelFuture register(ChannelPromise promise) {
290         try {
291             return nextChild().register(promise);
292         } catch (Throwable t) {
293             promise.setFailure(t);
294             return promise;
295         }
296     }
297 
298     @Deprecated
299     @Override
300     public ChannelFuture register(Channel channel, ChannelPromise promise) {
301         if (channel == null) {
302             throw new NullPointerException("channel");
303         }
304         try {
305             return nextChild().register(channel, promise);
306         } catch (Throwable t) {
307             promise.setFailure(t);
308             return promise;
309         }
310     }
311 
312     private EventLoop nextChild() throws Exception {
313         if (shuttingDown) {
314             throw new RejectedExecutionException("shutting down");
315         }
316 
317         EventLoop loop = idleChildren.poll();
318         if (loop == null) {
319             if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
320                 throw tooManyChannels;
321             }
322             loop = newChild(childArgs);
323             loop.terminationFuture().addListener(childTerminationListener);
324         }
325         activeChildren.add(loop);
326         return loop;
327     }
328 }