1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.local;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.ConnectException;
22
23 import org.jboss.netty.channel.AbstractChannelSink;
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelException;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ChannelPipeline;
29 import org.jboss.netty.channel.ChannelState;
30 import org.jboss.netty.channel.ChannelStateEvent;
31 import org.jboss.netty.channel.MessageEvent;
32 import org.jboss.netty.logging.InternalLogger;
33 import org.jboss.netty.logging.InternalLoggerFactory;
34
35
36
37 final class LocalClientChannelSink extends AbstractChannelSink {
38
39 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class);
40
41 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
42 if (e instanceof ChannelStateEvent) {
43 ChannelStateEvent event = (ChannelStateEvent) e;
44
45 DefaultLocalChannel channel =
46 (DefaultLocalChannel) event.getChannel();
47 ChannelFuture future = event.getFuture();
48 ChannelState state = event.getState();
49 Object value = event.getValue();
50 switch (state) {
51 case OPEN:
52 if (Boolean.FALSE.equals(value)) {
53 channel.closeNow(future);
54 }
55 break;
56 case BOUND:
57 if (value != null) {
58 bind(channel, future, (LocalAddress) value);
59 } else {
60 channel.closeNow(future);
61 }
62 break;
63 case CONNECTED:
64 if (value != null) {
65 connect(channel, future, (LocalAddress) value);
66 } else {
67 channel.closeNow(future);
68 }
69 break;
70 case INTEREST_OPS:
71
72 future.setSuccess();
73 break;
74 }
75 } else if (e instanceof MessageEvent) {
76 MessageEvent event = (MessageEvent) e;
77 DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
78 boolean offered = channel.writeBuffer.offer(event);
79 assert offered;
80 channel.flushWriteBuffer();
81 }
82 }
83
84 private static void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
85 try {
86 if (!LocalChannelRegistry.register(localAddress, channel)) {
87 throw new ChannelException("address already in use: " + localAddress);
88 }
89
90 channel.setBound();
91 channel.localAddress = localAddress;
92 future.setSuccess();
93 fireChannelBound(channel, localAddress);
94 } catch (Throwable t) {
95 LocalChannelRegistry.unregister(localAddress);
96 future.setFailure(t);
97 fireExceptionCaught(channel, t);
98 }
99 }
100
101 private void connect(DefaultLocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) {
102 Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress);
103 if (!(remoteChannel instanceof DefaultLocalServerChannel)) {
104 future.setFailure(new ConnectException("connection refused"));
105 return;
106 }
107
108 DefaultLocalServerChannel serverChannel = (DefaultLocalServerChannel) remoteChannel;
109 ChannelPipeline pipeline;
110 try {
111 pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
112 } catch (Exception e) {
113 future.setFailure(e);
114 fireExceptionCaught(channel, e);
115 if (logger.isWarnEnabled()) {
116 logger.warn(
117 "Failed to initialize an accepted socket.", e);
118 }
119 return;
120 }
121
122 future.setSuccess();
123 DefaultLocalChannel acceptedChannel = new DefaultLocalChannel(
124 serverChannel, serverChannel.getFactory(), pipeline, this, channel);
125 channel.pairedChannel = acceptedChannel;
126
127
128 if (!channel.isBound()) {
129 bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));
130 }
131 channel.remoteAddress = serverChannel.getLocalAddress();
132 channel.setConnected();
133 fireChannelConnected(channel, serverChannel.getLocalAddress());
134
135 acceptedChannel.localAddress = serverChannel.getLocalAddress();
136 try {
137 acceptedChannel.setBound();
138 } catch (IOException e) {
139 throw new Error(e);
140 }
141 fireChannelBound(acceptedChannel, channel.getRemoteAddress());
142 acceptedChannel.remoteAddress = channel.getLocalAddress();
143 acceptedChannel.setConnected();
144 fireChannelConnected(acceptedChannel, channel.getLocalAddress());
145
146
147 channel.flushWriteBuffer();
148 acceptedChannel.flushWriteBuffer();
149 }
150 }