View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import java.nio.channels.Selector;
19  import java.util.concurrent.Executor;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.RejectedExecutionException;
22  
23  import org.jboss.netty.channel.ChannelPipeline;
24  import org.jboss.netty.channel.group.ChannelGroup;
25  import org.jboss.netty.channel.socket.DatagramChannel;
26  import org.jboss.netty.channel.socket.DatagramChannelFactory;
27  import org.jboss.netty.channel.socket.InternetProtocolFamily;
28  import org.jboss.netty.channel.socket.Worker;
29  import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
30  import org.jboss.netty.util.ExternalResourceReleasable;
31  
32  /**
33   * A {@link DatagramChannelFactory} that creates a NIO-based connectionless
34   * {@link DatagramChannel}. It utilizes the non-blocking I/O mode which
35   * was introduced with NIO to serve many number of concurrent connections
36   * efficiently.
37   *
38   * <h3>How threads work</h3>
39   * <p>
40   * There is only one thread type in a {@link NioDatagramChannelFactory};
41   * worker threads.
42   *
43   * <h4>Worker threads</h4>
44   * <p>
45   * One {@link NioDatagramChannelFactory} can have one or more worker
46   * threads.  A worker thread performs non-blocking read and write for one or
47   * more {@link DatagramChannel}s in a non-blocking mode.
48   *
49   * <h3>Life cycle of threads and graceful shutdown</h3>
50   * <p>
51   * All worker threads are acquired from the {@link Executor} which was specified
52   * when a {@link NioDatagramChannelFactory} was created.  Therefore, you should
53   * make sure the specified {@link Executor} is able to lend the sufficient
54   * number of threads.  It is the best bet to specify
55   * {@linkplain Executors#newCachedThreadPool() a cached thread pool}.
56   * <p>
57   * All worker threads are acquired lazily, and then released when there's
58   * nothing left to process.  All the related resources such as {@link Selector}
59   * are also released when the worker threads are released.  Therefore, to shut
60   * down a service gracefully, you should do the following:
61   *
62   * <ol>
63   * <li>close all channels created by the factory usually using
64   *     {@link ChannelGroup#close()}, and</li>
65   * <li>call {@link #releaseExternalResources()}.</li>
66   * </ol>
67   *
68   * Please make sure not to shut down the executor until all channels are
69   * closed.  Otherwise, you will end up with a {@link RejectedExecutionException}
70   * and the related resources might not be released properly.
71   *
72   * <h3>Limitation</h3>
73   * <p>
74   * Multicast is not supported.  Please use {@link OioDatagramChannelFactory}
75   * instead.
76   *
77   * @apiviz.landmark
78   */
79  public class NioDatagramChannelFactory implements DatagramChannelFactory {
80  
81      private final NioDatagramPipelineSink sink;
82      private final WorkerPool<NioDatagramWorker> workerPool;
83      private final InternetProtocolFamily family;
84      private boolean releasePool;
85  
86      /**
87       * Create a new {@link NioDatagramChannelFactory} with a {@link Executors#newCachedThreadPool()}
88       * and without preferred {@link InternetProtocolFamily}.  Please note that the {@link InternetProtocolFamily}
89       * of the channel will be platform (and possibly configuration) dependent and therefore
90       * unspecified.  Use {@link #NioDatagramChannelFactory(InternetProtocolFamily)} if unsure.
91       *
92       * See {@link #NioDatagramChannelFactory(Executor)}
93       */
94      public NioDatagramChannelFactory() {
95          this((InternetProtocolFamily) null);
96      }
97  
98      /**
99       * Create a new {@link NioDatagramChannelFactory} with a {@link Executors#newCachedThreadPool()}.
100      *
101      * See {@link #NioDatagramChannelFactory(Executor)}
102      */
103     public NioDatagramChannelFactory(InternetProtocolFamily family) {
104         workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), SelectorUtil.DEFAULT_IO_THREADS);
105         this.family = family;
106         sink = new NioDatagramPipelineSink(workerPool);
107         releasePool = true;
108     }
109 
110     /**
111      * Creates a new instance.  Calling this constructor is same with calling
112      * {@link #NioDatagramChannelFactory(Executor, int)} with 2 * the number of
113      * available processors in the machine.  The number of available processors
114      * is obtained by {@link Runtime#availableProcessors()}.
115      * <p>
116      * Please note that the {@link InternetProtocolFamily} of the channel will be platform (and possibly
117      * configuration) dependent and therefore unspecified.
118      * Use {@link #NioDatagramChannelFactory(Executor, InternetProtocolFamily)} if unsure.
119      *
120      * @param workerExecutor
121      *        the {@link Executor} which will execute the I/O worker threads
122      */
123     public NioDatagramChannelFactory(final Executor workerExecutor) {
124         this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
125     }
126 
127     /**
128      * Creates a new instance.
129      * <p>
130      * Please note that the {@link InternetProtocolFamily} of the channel will be platform (and possibly
131      * configuration) dependent and therefore unspecified.
132      * Use {@link #NioDatagramChannelFactory(Executor, int, InternetProtocolFamily)} if unsure.
133      *
134      * @param workerExecutor
135      *            the {@link Executor} which will execute the I/O worker threads
136      * @param workerCount
137      *            the maximum number of I/O worker threads
138      */
139     public NioDatagramChannelFactory(final Executor workerExecutor, final int workerCount) {
140         this(new NioDatagramWorkerPool(workerExecutor, workerCount));
141     }
142 
143     /**
144     * Creates a new instance.
145     * <p>
146     * Please note that the {@link InternetProtocolFamily} of the channel will be platform (and possibly
147     * configuration) dependent and therefore unspecified.
148     * Use {@link #NioDatagramChannelFactory(WorkerPool, InternetProtocolFamily)} if unsure.
149     *
150     * @param workerPool
151     * the {@link WorkerPool} which will be used to obtain the {@link NioDatagramWorker} that execute
152     * the I/O worker threads
153     */
154     public NioDatagramChannelFactory(WorkerPool<NioDatagramWorker> workerPool) {
155         this(workerPool, null);
156     }
157 
158     /**
159      * Creates a new instance.  Calling this constructor is same with calling
160      * {@link #NioDatagramChannelFactory(Executor, int)} with 2 * the number of
161      * available processors in the machine.  The number of available processors
162      * is obtained by {@link Runtime#availableProcessors()}.
163      *
164      * @param workerExecutor
165      *        the {@link Executor} which will execute the I/O worker threads
166      * @param family
167      *        the {@link InternetProtocolFamily} to use. This should be used for UDP multicast.
168      *        <strong>Be aware that this option is only considered when running on java7+</strong>
169      */
170     public NioDatagramChannelFactory(final Executor workerExecutor, InternetProtocolFamily family) {
171         this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS, family);
172     }
173 
174     /**
175      * Creates a new instance.
176      *
177      * @param workerExecutor
178      *        the {@link Executor} which will execute the I/O worker threads
179      * @param workerCount
180      *        the maximum number of I/O worker threads
181      * @param family
182      *        the {@link InternetProtocolFamily} to use. This should be used for UDP multicast.
183      *        <strong>Be aware that this option is only considered when running on java7+</strong>
184      */
185     public NioDatagramChannelFactory(final Executor workerExecutor,
186             final int workerCount, InternetProtocolFamily family) {
187         this(new NioDatagramWorkerPool(workerExecutor, workerCount), family);
188     }
189 
190     /**
191      * Creates a new instance.
192      *
193      * @param workerPool
194      *        the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute
195      *        the I/O worker threads
196      * @param family
197      *        the {@link InternetProtocolFamily} to use. This should be used for UDP multicast.
198      *        <strong>Be aware that this option is only considered when running on java7+</strong>
199      */
200     public NioDatagramChannelFactory(WorkerPool<NioDatagramWorker> workerPool, InternetProtocolFamily family) {
201         this.workerPool = workerPool;
202         this.family = family;
203         sink = new NioDatagramPipelineSink(workerPool);
204     }
205 
206     public DatagramChannel newChannel(final ChannelPipeline pipeline) {
207         return new NioDatagramChannel(this, pipeline, sink, sink.nextWorker(), family);
208     }
209 
210     public void shutdown() {
211         workerPool.shutdown();
212         if (releasePool) {
213             releasePool();
214         }
215     }
216 
217     public void releaseExternalResources() {
218         workerPool.shutdown();
219         releasePool();
220     }
221 
222     private void releasePool() {
223         if (workerPool instanceof ExternalResourceReleasable) {
224             ((ExternalResourceReleasable) workerPool).releaseExternalResources();
225         }
226     }
227 }