View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.PushbackInputStream;
21  import java.net.SocketAddress;
22  import java.util.concurrent.Executor;
23  
24  import org.jboss.netty.channel.ChannelEvent;
25  import org.jboss.netty.channel.ChannelFuture;
26  import org.jboss.netty.channel.ChannelFutureListener;
27  import org.jboss.netty.channel.ChannelPipeline;
28  import org.jboss.netty.channel.ChannelState;
29  import org.jboss.netty.channel.ChannelStateEvent;
30  import org.jboss.netty.channel.MessageEvent;
31  import org.jboss.netty.util.ThreadRenamingRunnable;
32  import org.jboss.netty.util.internal.DeadLockProofWorker;
33  
34  class OioClientSocketPipelineSink extends AbstractOioChannelSink {
35  
36      private final Executor workerExecutor;
37  
38      OioClientSocketPipelineSink(Executor workerExecutor) {
39          this.workerExecutor = workerExecutor;
40      }
41  
42      public void eventSunk(
43              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
44          OioClientSocketChannel channel = (OioClientSocketChannel) e.getChannel();
45          ChannelFuture future = e.getFuture();
46          if (e instanceof ChannelStateEvent) {
47              ChannelStateEvent stateEvent = (ChannelStateEvent) e;
48              ChannelState state = stateEvent.getState();
49              Object value = stateEvent.getValue();
50              switch (state) {
51              case OPEN:
52                  if (Boolean.FALSE.equals(value)) {
53                      AbstractOioWorker.close(channel, future);
54                  }
55                  break;
56              case BOUND:
57                  if (value != null) {
58                      bind(channel, future, (SocketAddress) value);
59                  } else {
60                      AbstractOioWorker.close(channel, future);
61                  }
62                  break;
63              case CONNECTED:
64                  if (value != null) {
65                      connect(channel, future, (SocketAddress) value);
66                  } else {
67                      AbstractOioWorker.close(channel, future);
68                  }
69                  break;
70              case INTEREST_OPS:
71                  AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
72                  break;
73              }
74          } else if (e instanceof MessageEvent) {
75              OioWorker.write(
76                      channel, future,
77                      ((MessageEvent) e).getMessage());
78          }
79      }
80  
81      private static void bind(
82              OioClientSocketChannel channel, ChannelFuture future,
83              SocketAddress localAddress) {
84          try {
85              channel.socket.bind(localAddress);
86              future.setSuccess();
87              fireChannelBound(channel, channel.getLocalAddress());
88          } catch (Throwable t) {
89              future.setFailure(t);
90              fireExceptionCaught(channel, t);
91          }
92      }
93  
94      private void connect(
95              OioClientSocketChannel channel, ChannelFuture future,
96              SocketAddress remoteAddress) {
97  
98          boolean bound = channel.isBound();
99          boolean connected = false;
100         boolean workerStarted = false;
101 
102         future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
103 
104         try {
105             channel.socket.connect(
106                     remoteAddress, channel.getConfig().getConnectTimeoutMillis());
107             connected = true;
108 
109             // Obtain I/O stream.
110             channel.in = new PushbackInputStream(channel.socket.getInputStream(), 1);
111             channel.out = channel.socket.getOutputStream();
112 
113             // Fire events.
114             future.setSuccess();
115             if (!bound) {
116                 fireChannelBound(channel, channel.getLocalAddress());
117             }
118             fireChannelConnected(channel, channel.getRemoteAddress());
119 
120             // Start the business.
121             DeadLockProofWorker.start(
122                     workerExecutor,
123                     new ThreadRenamingRunnable(
124                             new OioWorker(channel),
125                             "Old I/O client worker (" + channel + ')'));
126             workerStarted = true;
127         } catch (Throwable t) {
128             future.setFailure(t);
129             fireExceptionCaught(channel, t);
130         } finally {
131             if (connected && !workerStarted) {
132                 AbstractOioWorker.close(channel, future);
133             }
134         }
135     }
136 }