View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  
19  import io.netty.util.concurrent.AbstractEventExecutorGroup;
20  import io.netty.util.concurrent.DefaultPromise;
21  import io.netty.util.concurrent.DefaultThreadFactory;
22  import io.netty.util.concurrent.EventExecutor;
23  import io.netty.util.concurrent.Future;
24  import io.netty.util.concurrent.FutureListener;
25  import io.netty.util.concurrent.GlobalEventExecutor;
26  import io.netty.util.concurrent.Promise;
27  import io.netty.util.concurrent.ThreadPerTaskExecutor;
28  import io.netty.util.internal.EmptyArrays;
29  import io.netty.util.internal.ObjectUtil;
30  import io.netty.util.internal.ReadOnlyIterator;
31  
32  import java.util.Collections;
33  import java.util.Iterator;
34  import java.util.Queue;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.ConcurrentLinkedQueue;
38  import java.util.concurrent.Executor;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.TimeUnit;
42  
43  /**
44   * An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}.
45   *
46   * @deprecated this will be remove in the next-major release.
47   */
48  @Deprecated
49  public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
50  
51      private final Object[] childArgs;
52      private final int maxChannels;
53      final Executor executor;
54      final Set<EventLoop> activeChildren = ConcurrentHashMap.newKeySet();
55      final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<>();
56      private final ChannelException tooManyChannels;
57  
58      private volatile boolean shuttingDown;
59      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
60      private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
61          @Override
62          public void operationComplete(Future<Object> future) throws Exception {
63              // Inefficient, but works.
64              if (isTerminated()) {
65                  terminationFuture.trySuccess(null);
66              }
67          }
68      };
69  
70      /**
71       * Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
72       */
73      protected ThreadPerChannelEventLoopGroup() {
74          this(0);
75      }
76  
77      /**
78       * Create a new {@link ThreadPerChannelEventLoopGroup}.
79       *
80       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
81       *                          a new {@link Channel} and the maximum is exceed it will throw an
82       *                          {@link ChannelException}. on the {@link #register(Channel)} and
83       *                          {@link #register(ChannelPromise)} method.
84       *                          Use {@code 0} to use no limit
85       */
86      protected ThreadPerChannelEventLoopGroup(int maxChannels) {
87          this(maxChannels, (ThreadFactory) null);
88      }
89  
90      /**
91       * Create a new {@link ThreadPerChannelEventLoopGroup}.
92       *
93       * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
94       *                          a new {@link Channel} and the maximum is exceed it will throw an
95       *                          {@link ChannelException} on the {@link #register(Channel)} and
96       *                          {@link #register(ChannelPromise)} method.
97       *                          Use {@code 0} to use no limit
98       * @param threadFactory     the {@link ThreadFactory} used to create new {@link Thread} instances that handle the
99       *                          registered {@link Channel}s
100      * @param args              arguments which will passed to each {@link #newChild(Object...)} call.
101      */
102     protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
103         this(maxChannels, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
104     }
105 
106     /**
107      * Create a new {@link ThreadPerChannelEventLoopGroup}.
108      *
109      * @param maxChannels       the maximum number of channels to handle with this instance. Once you try to register
110      *                          a new {@link Channel} and the maximum is exceed it will throw an
111      *                          {@link ChannelException} on the {@link #register(Channel)} and
112      *                          {@link #register(ChannelPromise)} method.
113      *                          Use {@code 0} to use no limit
114      * @param executor          the {@link Executor} used to create new {@link Thread} instances that handle the
115      *                          registered {@link Channel}s
116      * @param args              arguments which will passed to each {@link #newChild(Object...)} call.
117      */
118     protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
119         ObjectUtil.checkPositiveOrZero(maxChannels, "maxChannels");
120         if (executor == null) {
121             executor = new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass()));
122         }
123 
124         if (args == null) {
125             childArgs = EmptyArrays.EMPTY_OBJECTS;
126         } else {
127             childArgs = args.clone();
128         }
129 
130         this.maxChannels = maxChannels;
131         this.executor = executor;
132 
133         tooManyChannels =
134                 ChannelException.newStatic("too many channels (max: " + maxChannels + ')',
135                 ThreadPerChannelEventLoopGroup.class, "nextChild()");
136     }
137 
138     /**
139      * Creates a new {@link EventLoop}.  The default implementation creates a new {@link ThreadPerChannelEventLoop}.
140      */
141     protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
142         return new ThreadPerChannelEventLoop(this);
143     }
144 
145     @Override
146     public Iterator<EventExecutor> iterator() {
147         return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
148     }
149 
150     @Override
151     public EventLoop next() {
152         throw new UnsupportedOperationException();
153     }
154 
155     @Override
156     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
157         shuttingDown = true;
158 
159         for (EventLoop l: activeChildren) {
160             l.shutdownGracefully(quietPeriod, timeout, unit);
161         }
162         for (EventLoop l: idleChildren) {
163             l.shutdownGracefully(quietPeriod, timeout, unit);
164         }
165 
166         // Notify the future if there was no children.
167         if (isTerminated()) {
168             terminationFuture.trySuccess(null);
169         }
170 
171         return terminationFuture();
172     }
173 
174     @Override
175     public Future<?> terminationFuture() {
176         return terminationFuture;
177     }
178 
179     @Override
180     @Deprecated
181     public void shutdown() {
182         shuttingDown = true;
183 
184         for (EventLoop l: activeChildren) {
185             l.shutdown();
186         }
187         for (EventLoop l: idleChildren) {
188             l.shutdown();
189         }
190 
191         // Notify the future if there was no children.
192         if (isTerminated()) {
193             terminationFuture.trySuccess(null);
194         }
195     }
196 
197     @Override
198     public boolean isShuttingDown() {
199         for (EventLoop l: activeChildren) {
200             if (!l.isShuttingDown()) {
201                 return false;
202             }
203         }
204         for (EventLoop l: idleChildren) {
205             if (!l.isShuttingDown()) {
206                 return false;
207             }
208         }
209         return true;
210     }
211 
212     @Override
213     public boolean isShutdown() {
214         for (EventLoop l: activeChildren) {
215             if (!l.isShutdown()) {
216                 return false;
217             }
218         }
219         for (EventLoop l: idleChildren) {
220             if (!l.isShutdown()) {
221                 return false;
222             }
223         }
224         return true;
225     }
226 
227     @Override
228     public boolean isTerminated() {
229         for (EventLoop l: activeChildren) {
230             if (!l.isTerminated()) {
231                 return false;
232             }
233         }
234         for (EventLoop l: idleChildren) {
235             if (!l.isTerminated()) {
236                 return false;
237             }
238         }
239         return true;
240     }
241 
242     @Override
243     public boolean awaitTermination(long timeout, TimeUnit unit)
244             throws InterruptedException {
245         long deadline = System.nanoTime() + unit.toNanos(timeout);
246         for (EventLoop l: activeChildren) {
247             for (;;) {
248                 long timeLeft = deadline - System.nanoTime();
249                 if (timeLeft <= 0) {
250                     return isTerminated();
251                 }
252                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
253                     break;
254                 }
255             }
256         }
257         for (EventLoop l: idleChildren) {
258             for (;;) {
259                 long timeLeft = deadline - System.nanoTime();
260                 if (timeLeft <= 0) {
261                     return isTerminated();
262                 }
263                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
264                     break;
265                 }
266             }
267         }
268         return isTerminated();
269     }
270 
271     @Override
272     public ChannelFuture register(Channel channel) {
273         ObjectUtil.checkNotNull(channel, "channel");
274         try {
275             EventLoop l = nextChild();
276             return l.register(new DefaultChannelPromise(channel, l));
277         } catch (Throwable t) {
278             return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
279         }
280     }
281 
282     @Override
283     public ChannelFuture register(ChannelPromise promise) {
284         try {
285             return nextChild().register(promise);
286         } catch (Throwable t) {
287             promise.setFailure(t);
288             return promise;
289         }
290     }
291 
292     @Deprecated
293     @Override
294     public ChannelFuture register(Channel channel, ChannelPromise promise) {
295         ObjectUtil.checkNotNull(channel, "channel");
296         try {
297             return nextChild().register(channel, promise);
298         } catch (Throwable t) {
299             promise.setFailure(t);
300             return promise;
301         }
302     }
303 
304     private EventLoop nextChild() throws Exception {
305         if (shuttingDown) {
306             throw new RejectedExecutionException("shutting down");
307         }
308 
309         EventLoop loop = idleChildren.poll();
310         if (loop == null) {
311             if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
312                 throw tooManyChannels;
313             }
314             loop = newChild(childArgs);
315             loop.terminationFuture().addListener(childTerminationListener);
316         }
317         activeChildren.add(loop);
318         return loop;
319     }
320 }