View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  
19  import io.netty.util.concurrent.AbstractEventExecutorGroup;
20  import io.netty.util.concurrent.DefaultPromise;
21  import io.netty.util.concurrent.DefaultThreadFactory;
22  import io.netty.util.concurrent.EventExecutor;
23  import io.netty.util.concurrent.Future;
24  import io.netty.util.concurrent.FutureListener;
25  import io.netty.util.concurrent.GlobalEventExecutor;
26  import io.netty.util.concurrent.Promise;
27  import io.netty.util.concurrent.ThreadPerTaskExecutor;
28  import io.netty.util.internal.EmptyArrays;
29  import io.netty.util.internal.ObjectUtil;
30  import io.netty.util.internal.PlatformDependent;
31  import io.netty.util.internal.ReadOnlyIterator;
32  
33  import java.util.Collections;
34  import java.util.Iterator;
35  import java.util.Queue;
36  import java.util.Set;
37  import java.util.concurrent.ConcurrentLinkedQueue;
38  import java.util.concurrent.Executor;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.TimeUnit;
42  
43  /**
44   * An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
45   *
46   * @deprecated this will be remove in the next-major release.
47   */
48  @Deprecated
49  public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
50  
51      private final Object[] childArgs;
52      private final int maxChannels;
53      final Executor executor;
54      final Set<EventLoop> activeChildren =
55              Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
56      final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
57      private final ChannelException tooManyChannels;
58  
59      private volatile boolean shuttingDown;
60      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
61      private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
62          @Override
63          public void operationComplete(Future<Object> future) throws Exception {
64              // Inefficient, but works.
65              if (isTerminated()) {
66                  terminationFuture.trySuccess(null);
67              }
68          }
69      };
70  
71      /**
72       * Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
73       */
74      protected ThreadPerChannelEventLoopGroup() {
75          this(0);
76      }
77  
78      /**
79       * Create a new {@link ThreadPerChannelEventLoopGroup}.
80       *
81       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
82       *                          a new {@link Channel} and the maximum is exceed it will throw an
83       *                          {@link ChannelException}. on the {@link #register(Channel)} and
84       *                          {@link #register(ChannelPromise)} method.
85       *                          Use {@code 0} to use no limit
86       */
87      protected ThreadPerChannelEventLoopGroup(int maxChannels) {
88          this(maxChannels, (ThreadFactory) null);
89      }
90  
91      /**
92       * Create a new {@link ThreadPerChannelEventLoopGroup}.
93       *
94       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
95       *                          a new {@link Channel} and the maximum is exceed it will throw an
96       *                          {@link ChannelException} on the {@link #register(Channel)} and
97       *                          {@link #register(ChannelPromise)} method.
98       *                          Use {@code 0} to use no limit
99       * @param threadFactory     the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
100      *                          registered {@link Channel}s
101      * @param args              arguments which will passed to each {@link #newChild(Object...)} call.
102      */
103     protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
104         this(maxChannels, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
105     }
106 
107     /**
108      * Create a new {@link ThreadPerChannelEventLoopGroup}.
109      *
110      * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
111      *                          a new {@link Channel} and the maximum is exceed it will throw an
112      *                          {@link ChannelException} on the {@link #register(Channel)} and
113      *                          {@link #register(ChannelPromise)} method.
114      *                          Use {@code 0} to use no limit
115      * @param executor          the {@link Executor} used to create new {@link Thread} instances that handle the
116      *                          registered {@link Channel}s
117      * @param args              arguments which will passed to each {@link #newChild(Object...)} call.
118      */
119     protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
120         ObjectUtil.checkPositiveOrZero(maxChannels, "maxChannels");
121         if (executor == null) {
122             executor = new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass()));
123         }
124 
125         if (args == null) {
126             childArgs = EmptyArrays.EMPTY_OBJECTS;
127         } else {
128             childArgs = args.clone();
129         }
130 
131         this.maxChannels = maxChannels;
132         this.executor = executor;
133 
134         tooManyChannels =
135                 ChannelException.newStatic("too many channels (max: " + maxChannels + ')',
136                 ThreadPerChannelEventLoopGroup.class, "nextChild()");
137     }
138 
139     /**
140      * Creates a new {@link EventLoop}.  The default implementation creates a new {@link ThreadPerChannelEventLoop}.
141      */
142     protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
143         return new ThreadPerChannelEventLoop(this);
144     }
145 
146     @Override
147     public Iterator<EventExecutor> iterator() {
148         return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
149     }
150 
151     @Override
152     public EventLoop next() {
153         throw new UnsupportedOperationException();
154     }
155 
156     @Override
157     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
158         shuttingDown = true;
159 
160         for (EventLoop l: activeChildren) {
161             l.shutdownGracefully(quietPeriod, timeout, unit);
162         }
163         for (EventLoop l: idleChildren) {
164             l.shutdownGracefully(quietPeriod, timeout, unit);
165         }
166 
167         // Notify the future if there was no children.
168         if (isTerminated()) {
169             terminationFuture.trySuccess(null);
170         }
171 
172         return terminationFuture();
173     }
174 
175     @Override
176     public Future<?> terminationFuture() {
177         return terminationFuture;
178     }
179 
180     @Override
181     @Deprecated
182     public void shutdown() {
183         shuttingDown = true;
184 
185         for (EventLoop l: activeChildren) {
186             l.shutdown();
187         }
188         for (EventLoop l: idleChildren) {
189             l.shutdown();
190         }
191 
192         // Notify the future if there was no children.
193         if (isTerminated()) {
194             terminationFuture.trySuccess(null);
195         }
196     }
197 
198     @Override
199     public boolean isShuttingDown() {
200         for (EventLoop l: activeChildren) {
201             if (!l.isShuttingDown()) {
202                 return false;
203             }
204         }
205         for (EventLoop l: idleChildren) {
206             if (!l.isShuttingDown()) {
207                 return false;
208             }
209         }
210         return true;
211     }
212 
213     @Override
214     public boolean isShutdown() {
215         for (EventLoop l: activeChildren) {
216             if (!l.isShutdown()) {
217                 return false;
218             }
219         }
220         for (EventLoop l: idleChildren) {
221             if (!l.isShutdown()) {
222                 return false;
223             }
224         }
225         return true;
226     }
227 
228     @Override
229     public boolean isTerminated() {
230         for (EventLoop l: activeChildren) {
231             if (!l.isTerminated()) {
232                 return false;
233             }
234         }
235         for (EventLoop l: idleChildren) {
236             if (!l.isTerminated()) {
237                 return false;
238             }
239         }
240         return true;
241     }
242 
243     @Override
244     public boolean awaitTermination(long timeout, TimeUnit unit)
245             throws InterruptedException {
246         long deadline = System.nanoTime() + unit.toNanos(timeout);
247         for (EventLoop l: activeChildren) {
248             for (;;) {
249                 long timeLeft = deadline - System.nanoTime();
250                 if (timeLeft <= 0) {
251                     return isTerminated();
252                 }
253                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
254                     break;
255                 }
256             }
257         }
258         for (EventLoop l: idleChildren) {
259             for (;;) {
260                 long timeLeft = deadline - System.nanoTime();
261                 if (timeLeft <= 0) {
262                     return isTerminated();
263                 }
264                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
265                     break;
266                 }
267             }
268         }
269         return isTerminated();
270     }
271 
272     @Override
273     public ChannelFuture register(Channel channel) {
274         ObjectUtil.checkNotNull(channel, "channel");
275         try {
276             EventLoop l = nextChild();
277             return l.register(new DefaultChannelPromise(channel, l));
278         } catch (Throwable t) {
279             return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
280         }
281     }
282 
283     @Override
284     public ChannelFuture register(ChannelPromise promise) {
285         try {
286             return nextChild().register(promise);
287         } catch (Throwable t) {
288             promise.setFailure(t);
289             return promise;
290         }
291     }
292 
293     @Deprecated
294     @Override
295     public ChannelFuture register(Channel channel, ChannelPromise promise) {
296         ObjectUtil.checkNotNull(channel, "channel");
297         try {
298             return nextChild().register(channel, promise);
299         } catch (Throwable t) {
300             promise.setFailure(t);
301             return promise;
302         }
303     }
304 
305     private EventLoop nextChild() throws Exception {
306         if (shuttingDown) {
307             throw new RejectedExecutionException("shutting down");
308         }
309 
310         EventLoop loop = idleChildren.poll();
311         if (loop == null) {
312             if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
313                 throw tooManyChannels;
314             }
315             loop = newChild(childArgs);
316             loop.terminationFuture().addListener(childTerminationListener);
317         }
318         activeChildren.add(loop);
319         return loop;
320     }
321 }