View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.net.InetSocketAddress;
21  import java.util.concurrent.Executor;
22  
23  import org.jboss.netty.channel.ChannelEvent;
24  import org.jboss.netty.channel.ChannelFuture;
25  import org.jboss.netty.channel.ChannelFutureListener;
26  import org.jboss.netty.channel.ChannelPipeline;
27  import org.jboss.netty.channel.ChannelState;
28  import org.jboss.netty.channel.ChannelStateEvent;
29  import org.jboss.netty.channel.MessageEvent;
30  
31  /**
32   * Receives downstream events from a {@link ChannelPipeline}.  It contains
33   * an array of I/O workers.
34   */
35  class NioDatagramPipelineSink extends AbstractNioChannelSink {
36  
37      private final WorkerPool<NioDatagramWorker> workerPool;
38  
39      /**
40       * Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s
41       * specified in workerCount.  The {@link NioDatagramWorker}s take care of reading and writing
42       * for the {@link NioDatagramChannel}.
43       *
44       * @param workerExecutor
45       *        the {@link Executor} that will run the {@link NioDatagramWorker}s
46       *        for this sink
47       * @param workerCount
48       *        the number of {@link NioDatagramWorker}s for this sink
49       */
50      NioDatagramPipelineSink(final WorkerPool<NioDatagramWorker> workerPool) {
51          this.workerPool = workerPool;
52      }
53  
54      /**
55       * Handle downstream event.
56       *
57       * @param pipeline the {@link ChannelPipeline} that passes down the
58       *                 downstream event.
59       * @param e The downstream event.
60       */
61      public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e)
62              throws Exception {
63          final NioDatagramChannel channel = (NioDatagramChannel) e.getChannel();
64          final ChannelFuture future = e.getFuture();
65          if (e instanceof ChannelStateEvent) {
66              final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
67              final ChannelState state = stateEvent.getState();
68              final Object value = stateEvent.getValue();
69              switch (state) {
70              case OPEN:
71                  if (Boolean.FALSE.equals(value)) {
72                      channel.worker.close(channel, future);
73                  }
74                  break;
75              case BOUND:
76                  if (value != null) {
77                      bind(channel, future, (InetSocketAddress) value);
78                  } else {
79                      channel.worker.close(channel, future);
80                  }
81                  break;
82              case CONNECTED:
83                  if (value != null) {
84                      connect(channel, future, (InetSocketAddress) value);
85                  } else {
86                      NioDatagramWorker.disconnect(channel, future);
87                  }
88                  break;
89              case INTEREST_OPS:
90                  channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
91                  break;
92              }
93          } else if (e instanceof MessageEvent) {
94              final MessageEvent event = (MessageEvent) e;
95              final boolean offered = channel.writeBufferQueue.offer(event);
96              assert offered;
97              channel.worker.writeFromUserCode(channel);
98          }
99      }
100 
101     private static void close(NioDatagramChannel channel, ChannelFuture future) {
102         try {
103             channel.getDatagramChannel().socket().close();
104             if (channel.setClosed()) {
105                 future.setSuccess();
106                 if (channel.isBound()) {
107                     fireChannelUnbound(channel);
108                 }
109                 fireChannelClosed(channel);
110             } else {
111                 future.setSuccess();
112             }
113         } catch (final Throwable t) {
114             future.setFailure(t);
115             fireExceptionCaught(channel, t);
116         }
117     }
118 
119     /**
120      * Will bind the DatagramSocket to the passed-in address.
121      * Every call bind will spawn a new thread using the that basically in turn
122      */
123     private static void bind(final NioDatagramChannel channel,
124             final ChannelFuture future, final InetSocketAddress address) {
125         boolean bound = false;
126         boolean started = false;
127         try {
128             // First bind the DatagramSocket the specified port.
129             channel.getDatagramChannel().socket().bind(address);
130             bound = true;
131 
132             future.setSuccess();
133             fireChannelBound(channel, address);
134 
135             channel.worker.register(channel, null);
136             started = true;
137         } catch (final Throwable t) {
138             future.setFailure(t);
139             fireExceptionCaught(channel, t);
140         } finally {
141             if (!started && bound) {
142                 close(channel, future);
143             }
144         }
145     }
146 
147     private static void connect(
148             NioDatagramChannel channel, ChannelFuture future,
149             InetSocketAddress remoteAddress) {
150 
151         boolean bound = channel.isBound();
152         boolean connected = false;
153         boolean workerStarted = false;
154 
155         future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
156 
157         // Clear the cached address so that the next getRemoteAddress() call
158         // updates the cache.
159         channel.remoteAddress = null;
160 
161         try {
162             channel.getDatagramChannel().connect(remoteAddress);
163             connected = true;
164 
165             // Fire events.
166             future.setSuccess();
167             if (!bound) {
168                 fireChannelBound(channel, channel.getLocalAddress());
169             }
170             fireChannelConnected(channel, channel.getRemoteAddress());
171 
172             if (!bound) {
173                 channel.worker.register(channel, future);
174             }
175 
176             workerStarted = true;
177         } catch (Throwable t) {
178             future.setFailure(t);
179             fireExceptionCaught(channel, t);
180         } finally {
181             if (connected && !workerStarted) {
182                 channel.worker.close(channel, future);
183             }
184         }
185     }
186 
187     NioDatagramWorker nextWorker() {
188         return workerPool.nextWorker();
189     }
190 
191 }