1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.local;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import org.jboss.netty.channel.AbstractChannelSink;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelEvent;
23 import org.jboss.netty.channel.ChannelException;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelPipeline;
26 import org.jboss.netty.channel.ChannelState;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.MessageEvent;
29
30 final class LocalServerChannelSink extends AbstractChannelSink {
31
32 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
33 Channel channel = e.getChannel();
34 if (channel instanceof DefaultLocalServerChannel) {
35 handleServerChannel(e);
36 } else if (channel instanceof DefaultLocalChannel) {
37 handleAcceptedChannel(e);
38 }
39 }
40
41 private static void handleServerChannel(ChannelEvent e) {
42 if (!(e instanceof ChannelStateEvent)) {
43 return;
44 }
45
46 ChannelStateEvent event = (ChannelStateEvent) e;
47 DefaultLocalServerChannel channel =
48 (DefaultLocalServerChannel) event.getChannel();
49 ChannelFuture future = event.getFuture();
50 ChannelState state = event.getState();
51 Object value = event.getValue();
52 switch (state) {
53 case OPEN:
54 if (Boolean.FALSE.equals(value)) {
55 close(channel, future);
56 }
57 break;
58 case BOUND:
59 if (value != null) {
60 bind(channel, future, (LocalAddress) value);
61 } else {
62 close(channel, future);
63 }
64 break;
65 }
66 }
67
68 private static void handleAcceptedChannel(ChannelEvent e) {
69 if (e instanceof ChannelStateEvent) {
70 ChannelStateEvent event = (ChannelStateEvent) e;
71 DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
72 ChannelFuture future = event.getFuture();
73 ChannelState state = event.getState();
74 Object value = event.getValue();
75
76 switch (state) {
77 case OPEN:
78 if (Boolean.FALSE.equals(value)) {
79 channel.closeNow(future);
80 }
81 break;
82 case BOUND:
83 case CONNECTED:
84 if (value == null) {
85 channel.closeNow(future);
86 }
87 break;
88 case INTEREST_OPS:
89
90 future.setSuccess();
91 break;
92 }
93 } else if (e instanceof MessageEvent) {
94 MessageEvent event = (MessageEvent) e;
95 DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
96 boolean offered = channel.writeBuffer.offer(event);
97 assert offered;
98 channel.flushWriteBuffer();
99 }
100 }
101
102 private static void bind(DefaultLocalServerChannel channel, ChannelFuture future, LocalAddress localAddress) {
103 try {
104 if (!LocalChannelRegistry.register(localAddress, channel)) {
105 throw new ChannelException("address already in use: " + localAddress);
106 }
107 if (!channel.bound.compareAndSet(false, true)) {
108 throw new ChannelException("already bound");
109 }
110
111 channel.localAddress = localAddress;
112 future.setSuccess();
113 fireChannelBound(channel, localAddress);
114 } catch (Throwable t) {
115 LocalChannelRegistry.unregister(localAddress);
116 future.setFailure(t);
117 fireExceptionCaught(channel, t);
118 }
119 }
120
121 private static void close(DefaultLocalServerChannel channel, ChannelFuture future) {
122 try {
123 if (channel.setClosed()) {
124 future.setSuccess();
125 LocalAddress localAddress = channel.localAddress;
126 if (channel.bound.compareAndSet(true, false)) {
127 channel.localAddress = null;
128 LocalChannelRegistry.unregister(localAddress);
129 fireChannelUnbound(channel);
130 }
131 fireChannelClosed(channel);
132 } else {
133 future.setSuccess();
134 }
135 } catch (Throwable t) {
136 future.setFailure(t);
137 fireExceptionCaught(channel, t);
138 }
139 }
140 }