View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.net.SocketAddress;
21  import java.util.concurrent.Executor;
22  
23  import org.jboss.netty.channel.ChannelEvent;
24  import org.jboss.netty.channel.ChannelFuture;
25  import org.jboss.netty.channel.ChannelFutureListener;
26  import org.jboss.netty.channel.ChannelPipeline;
27  import org.jboss.netty.channel.ChannelState;
28  import org.jboss.netty.channel.ChannelStateEvent;
29  import org.jboss.netty.channel.MessageEvent;
30  import org.jboss.netty.util.ThreadRenamingRunnable;
31  import org.jboss.netty.util.internal.DeadLockProofWorker;
32  
33  class OioDatagramPipelineSink extends AbstractOioChannelSink {
34  
35      private final Executor workerExecutor;
36  
37      OioDatagramPipelineSink(Executor workerExecutor) {
38          this.workerExecutor = workerExecutor;
39      }
40  
41      public void eventSunk(
42              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
43          OioDatagramChannel channel = (OioDatagramChannel) e.getChannel();
44          ChannelFuture future = e.getFuture();
45          if (e instanceof ChannelStateEvent) {
46              ChannelStateEvent stateEvent = (ChannelStateEvent) e;
47              ChannelState state = stateEvent.getState();
48              Object value = stateEvent.getValue();
49              switch (state) {
50              case OPEN:
51                  if (Boolean.FALSE.equals(value)) {
52                      AbstractOioWorker.close(channel, future);
53                  }
54                  break;
55              case BOUND:
56                  if (value != null) {
57                      bind(channel, future, (SocketAddress) value);
58                  } else {
59                      AbstractOioWorker.close(channel, future);
60                  }
61                  break;
62              case CONNECTED:
63                  if (value != null) {
64                      connect(channel, future, (SocketAddress) value);
65                  } else {
66                      OioDatagramWorker.disconnect(channel, future);
67                  }
68                  break;
69              case INTEREST_OPS:
70                  AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
71                  break;
72              }
73          } else if (e instanceof MessageEvent) {
74              MessageEvent evt = (MessageEvent) e;
75              OioDatagramWorker.write(
76                      channel, future, evt.getMessage(), evt.getRemoteAddress());
77          }
78      }
79  
80      private void bind(
81              OioDatagramChannel channel, ChannelFuture future,
82              SocketAddress localAddress) {
83          boolean bound = false;
84          boolean workerStarted = false;
85          try {
86              channel.socket.bind(localAddress);
87              bound = true;
88  
89              // Fire events
90              future.setSuccess();
91              fireChannelBound(channel, channel.getLocalAddress());
92  
93              // Start the business.
94              DeadLockProofWorker.start(
95                      workerExecutor,
96                      new ThreadRenamingRunnable(
97                              new OioDatagramWorker(channel),
98                              "Old I/O datagram worker (" + channel + ')'));
99              workerStarted = true;
100         } catch (Throwable t) {
101             future.setFailure(t);
102             fireExceptionCaught(channel, t);
103         } finally {
104             if (bound && !workerStarted) {
105                 AbstractOioWorker.close(channel, future);
106             }
107         }
108     }
109 
110     private void connect(
111             OioDatagramChannel channel, ChannelFuture future,
112             SocketAddress remoteAddress) {
113 
114         boolean bound = channel.isBound();
115         boolean connected = false;
116         boolean workerStarted = false;
117 
118         future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
119 
120         // Clear the cached address so that the next getRemoteAddress() call
121         // updates the cache.
122         channel.remoteAddress = null;
123 
124         try {
125             channel.socket.connect(remoteAddress);
126             connected = true;
127 
128             // Fire events.
129             future.setSuccess();
130             if (!bound) {
131                 fireChannelBound(channel, channel.getLocalAddress());
132             }
133             fireChannelConnected(channel, channel.getRemoteAddress());
134 
135             String threadName = "Old I/O datagram worker (" + channel + ')';
136             if (!bound) {
137                 // Start the business.
138                 DeadLockProofWorker.start(
139                         workerExecutor,
140                         new ThreadRenamingRunnable(
141                                 new OioDatagramWorker(channel), threadName));
142             } else {
143                 // Worker started by bind() - just rename.
144                 Thread workerThread = channel.workerThread;
145                 if (workerThread != null) {
146                     try {
147                         workerThread.setName(threadName);
148                     } catch (SecurityException e) {
149                         // Ignore.
150                     }
151                 }
152             }
153 
154             workerStarted = true;
155         } catch (Throwable t) {
156             future.setFailure(t);
157             fireExceptionCaught(channel, t);
158         } finally {
159             if (connected && !workerStarted) {
160                 AbstractOioWorker.close(channel, future);
161             }
162         }
163     }
164 }