1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.local;
17
18 import org.jboss.netty.channel.AbstractChannelSink;
19 import org.jboss.netty.channel.Channel;
20 import org.jboss.netty.channel.ChannelEvent;
21 import org.jboss.netty.channel.ChannelException;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.ChannelPipeline;
24 import org.jboss.netty.channel.ChannelState;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.MessageEvent;
27 import org.jboss.netty.logging.InternalLogger;
28 import org.jboss.netty.logging.InternalLoggerFactory;
29
30 import java.io.IOException;
31 import java.net.ConnectException;
32
33 import static org.jboss.netty.channel.Channels.*;
34
35
36
37 final class LocalClientChannelSink extends AbstractChannelSink {
38
39 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class);
40
41 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
42 if (e instanceof ChannelStateEvent) {
43 ChannelStateEvent event = (ChannelStateEvent) e;
44
45 DefaultLocalChannel channel =
46 (DefaultLocalChannel) event.getChannel();
47 ChannelFuture future = event.getFuture();
48 ChannelState state = event.getState();
49 Object value = event.getValue();
50 switch (state) {
51 case OPEN:
52 if (Boolean.FALSE.equals(value)) {
53 channel.closeNow(future);
54 }
55 break;
56 case BOUND:
57 if (value != null) {
58 bind(channel, future, (LocalAddress) value);
59 } else {
60 channel.closeNow(future);
61 }
62 break;
63 case CONNECTED:
64 if (value != null) {
65 connect(channel, future, (LocalAddress) value);
66 } else {
67 channel.closeNow(future);
68 }
69 break;
70 case INTEREST_OPS:
71
72 future.setSuccess();
73 break;
74 }
75 } else if (e instanceof MessageEvent) {
76 MessageEvent event = (MessageEvent) e;
77 DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
78 boolean offered = channel.writeBuffer.offer(event);
79 assert offered;
80 channel.flushWriteBuffer();
81 }
82 }
83
84 private static void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
85 try {
86 if (!LocalChannelRegistry.register(localAddress, channel)) {
87 throw new ChannelException("address already in use: " + localAddress);
88 }
89
90 channel.setBound();
91 channel.localAddress = localAddress;
92 future.setSuccess();
93 fireChannelBound(channel, localAddress);
94 } catch (Throwable t) {
95 LocalChannelRegistry.unregister(localAddress);
96 future.setFailure(t);
97 fireExceptionCaught(channel, t);
98 }
99 }
100
101 private void connect(DefaultLocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) {
102 Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress);
103 if (!(remoteChannel instanceof DefaultLocalServerChannel)) {
104 future.setFailure(new ConnectException(
105 "connection refused: " + remoteAddress));
106 return;
107 }
108
109 DefaultLocalServerChannel serverChannel = (DefaultLocalServerChannel) remoteChannel;
110 ChannelPipeline pipeline;
111 try {
112 pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
113 } catch (Exception e) {
114 future.setFailure(e);
115 fireExceptionCaught(channel, e);
116 if (logger.isWarnEnabled()) {
117 logger.warn(
118 "Failed to initialize an accepted socket.", e);
119 }
120 return;
121 }
122
123 future.setSuccess();
124 DefaultLocalChannel acceptedChannel = new DefaultLocalChannel(
125 serverChannel, serverChannel.getFactory(), pipeline, this, channel);
126 channel.pairedChannel = acceptedChannel;
127
128
129 if (!channel.isBound()) {
130 bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));
131 }
132 channel.remoteAddress = serverChannel.getLocalAddress();
133 channel.setConnected();
134 fireChannelConnected(channel, serverChannel.getLocalAddress());
135
136 acceptedChannel.localAddress = serverChannel.getLocalAddress();
137 try {
138 acceptedChannel.setBound();
139 } catch (IOException e) {
140 throw new Error(e);
141 }
142 fireChannelBound(acceptedChannel, channel.getRemoteAddress());
143 acceptedChannel.remoteAddress = channel.getLocalAddress();
144 acceptedChannel.setConnected();
145 fireChannelConnected(acceptedChannel, channel.getLocalAddress());
146
147
148 channel.flushWriteBuffer();
149 acceptedChannel.flushWriteBuffer();
150 }
151 }