View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.local;
17  
18  import org.jboss.netty.channel.AbstractChannelSink;
19  import org.jboss.netty.channel.Channel;
20  import org.jboss.netty.channel.ChannelEvent;
21  import org.jboss.netty.channel.ChannelException;
22  import org.jboss.netty.channel.ChannelFuture;
23  import org.jboss.netty.channel.ChannelPipeline;
24  import org.jboss.netty.channel.ChannelState;
25  import org.jboss.netty.channel.ChannelStateEvent;
26  import org.jboss.netty.channel.MessageEvent;
27  import org.jboss.netty.logging.InternalLogger;
28  import org.jboss.netty.logging.InternalLoggerFactory;
29  
30  import java.io.IOException;
31  import java.net.ConnectException;
32  
33  import static org.jboss.netty.channel.Channels.*;
34  
35  /**
36   */
37  final class LocalClientChannelSink extends AbstractChannelSink {
38  
39      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class);
40  
41      public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
42          if (e instanceof ChannelStateEvent) {
43              ChannelStateEvent event = (ChannelStateEvent) e;
44  
45              DefaultLocalChannel channel =
46                    (DefaultLocalChannel) event.getChannel();
47              ChannelFuture future = event.getFuture();
48              ChannelState state = event.getState();
49              Object value = event.getValue();
50              switch (state) {
51              case OPEN:
52                  if (Boolean.FALSE.equals(value)) {
53                      channel.closeNow(future);
54                  }
55                  break;
56              case BOUND:
57                  if (value != null) {
58                      bind(channel, future, (LocalAddress) value);
59                  } else {
60                      channel.closeNow(future);
61                  }
62                  break;
63              case CONNECTED:
64                  if (value != null) {
65                      connect(channel, future, (LocalAddress) value);
66                  } else {
67                      channel.closeNow(future);
68                  }
69                  break;
70              case INTEREST_OPS:
71                  // Unsupported - discard silently.
72                  future.setSuccess();
73                  break;
74              }
75          } else if (e instanceof MessageEvent) {
76              MessageEvent event = (MessageEvent) e;
77              DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
78              boolean offered = channel.writeBuffer.offer(event);
79              assert offered;
80              channel.flushWriteBuffer();
81          }
82      }
83  
84      private static void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
85          try {
86              if (!LocalChannelRegistry.register(localAddress, channel)) {
87                  throw new ChannelException("address already in use: " + localAddress);
88              }
89  
90              channel.setBound();
91              channel.localAddress = localAddress;
92              future.setSuccess();
93              fireChannelBound(channel, localAddress);
94          } catch (Throwable t) {
95              LocalChannelRegistry.unregister(localAddress);
96              future.setFailure(t);
97              fireExceptionCaught(channel, t);
98          }
99      }
100 
101     private void connect(DefaultLocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) {
102         Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress);
103         if (!(remoteChannel instanceof DefaultLocalServerChannel)) {
104             future.setFailure(new ConnectException(
105                     "connection refused: " + remoteAddress));
106             return;
107         }
108 
109         DefaultLocalServerChannel serverChannel = (DefaultLocalServerChannel) remoteChannel;
110         ChannelPipeline pipeline;
111         try {
112             pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
113         } catch (Exception e) {
114             future.setFailure(e);
115             fireExceptionCaught(channel, e);
116             if (logger.isWarnEnabled()) {
117                 logger.warn(
118                         "Failed to initialize an accepted socket.", e);
119             }
120             return;
121         }
122 
123         future.setSuccess();
124         DefaultLocalChannel acceptedChannel = new DefaultLocalChannel(
125                 serverChannel, serverChannel.getFactory(), pipeline, this, channel);
126         channel.pairedChannel = acceptedChannel;
127 
128         // check if the channel was bound before. See #276
129         if (!channel.isBound()) {
130             bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));
131         }
132         channel.remoteAddress = serverChannel.getLocalAddress();
133         channel.setConnected();
134         fireChannelConnected(channel, serverChannel.getLocalAddress());
135 
136         acceptedChannel.localAddress = serverChannel.getLocalAddress();
137         try {
138             acceptedChannel.setBound();
139         } catch (IOException e) {
140             throw new Error(e);
141         }
142         fireChannelBound(acceptedChannel, channel.getRemoteAddress());
143         acceptedChannel.remoteAddress = channel.getLocalAddress();
144         acceptedChannel.setConnected();
145         fireChannelConnected(acceptedChannel, channel.getLocalAddress());
146 
147         // Flush something that was written in channelBound / channelConnected
148         channel.flushWriteBuffer();
149         acceptedChannel.flushWriteBuffer();
150     }
151 }