View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.net.SocketAddress;
21  import java.util.concurrent.Executor;
22  
23  import org.jboss.netty.channel.ChannelEvent;
24  import org.jboss.netty.channel.ChannelFuture;
25  import org.jboss.netty.channel.ChannelFutureListener;
26  import org.jboss.netty.channel.ChannelPipeline;
27  import org.jboss.netty.channel.ChannelState;
28  import org.jboss.netty.channel.ChannelStateEvent;
29  import org.jboss.netty.channel.MessageEvent;
30  import org.jboss.netty.util.ThreadNameDeterminer;
31  import org.jboss.netty.util.ThreadRenamingRunnable;
32  import org.jboss.netty.util.internal.DeadLockProofWorker;
33  
34  class OioDatagramPipelineSink extends AbstractOioChannelSink {
35  
36      private final Executor workerExecutor;
37      private final ThreadNameDeterminer determiner;
38  
39      OioDatagramPipelineSink(Executor workerExecutor, ThreadNameDeterminer determiner) {
40          this.workerExecutor = workerExecutor;
41          this.determiner = determiner;
42      }
43  
44      public void eventSunk(
45              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
46          OioDatagramChannel channel = (OioDatagramChannel) e.getChannel();
47          ChannelFuture future = e.getFuture();
48          if (e instanceof ChannelStateEvent) {
49              ChannelStateEvent stateEvent = (ChannelStateEvent) e;
50              ChannelState state = stateEvent.getState();
51              Object value = stateEvent.getValue();
52              switch (state) {
53              case OPEN:
54                  if (Boolean.FALSE.equals(value)) {
55                      AbstractOioWorker.close(channel, future);
56                  }
57                  break;
58              case BOUND:
59                  if (value != null) {
60                      bind(channel, future, (SocketAddress) value);
61                  } else {
62                      AbstractOioWorker.close(channel, future);
63                  }
64                  break;
65              case CONNECTED:
66                  if (value != null) {
67                      connect(channel, future, (SocketAddress) value);
68                  } else {
69                      OioDatagramWorker.disconnect(channel, future);
70                  }
71                  break;
72              case INTEREST_OPS:
73                  AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
74                  break;
75              }
76          } else if (e instanceof MessageEvent) {
77              MessageEvent evt = (MessageEvent) e;
78              OioDatagramWorker.write(
79                      channel, future, evt.getMessage(), evt.getRemoteAddress());
80          }
81      }
82  
83      private void bind(
84              OioDatagramChannel channel, ChannelFuture future,
85              SocketAddress localAddress) {
86          boolean bound = false;
87          boolean workerStarted = false;
88          try {
89              channel.socket.bind(localAddress);
90              bound = true;
91  
92              // Fire events
93              future.setSuccess();
94              fireChannelBound(channel, channel.getLocalAddress());
95  
96              // Start the business.
97              DeadLockProofWorker.start(
98                      workerExecutor,
99                      new ThreadRenamingRunnable(
100                             new OioDatagramWorker(channel),
101                             "Old I/O datagram worker (" + channel + ')',
102                             determiner));
103             workerStarted = true;
104         } catch (Throwable t) {
105             future.setFailure(t);
106             fireExceptionCaught(channel, t);
107         } finally {
108             if (bound && !workerStarted) {
109                 AbstractOioWorker.close(channel, future);
110             }
111         }
112     }
113 
114     private void connect(
115             OioDatagramChannel channel, ChannelFuture future,
116             SocketAddress remoteAddress) {
117 
118         boolean bound = channel.isBound();
119         boolean connected = false;
120         boolean workerStarted = false;
121 
122         future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
123 
124         // Clear the cached address so that the next getRemoteAddress() call
125         // updates the cache.
126         channel.remoteAddress = null;
127 
128         try {
129             channel.socket.connect(remoteAddress);
130             connected = true;
131 
132             // Fire events.
133             future.setSuccess();
134             if (!bound) {
135                 fireChannelBound(channel, channel.getLocalAddress());
136             }
137             fireChannelConnected(channel, channel.getRemoteAddress());
138 
139             String threadName = "Old I/O datagram worker (" + channel + ')';
140             if (!bound) {
141                 // Start the business.
142                 DeadLockProofWorker.start(
143                         workerExecutor,
144                         new ThreadRenamingRunnable(
145                                 new OioDatagramWorker(channel), threadName, determiner));
146             } else {
147                 // Worker started by bind() - just rename.
148                 Thread workerThread = channel.workerThread;
149                 if (workerThread != null) {
150                     try {
151                         workerThread.setName(threadName);
152                     } catch (SecurityException e) {
153                         // Ignore.
154                     }
155                 }
156             }
157 
158             workerStarted = true;
159         } catch (Throwable t) {
160             future.setFailure(t);
161             fireExceptionCaught(channel, t);
162         } finally {
163             if (connected && !workerStarted) {
164                 AbstractOioWorker.close(channel, future);
165             }
166         }
167     }
168 }