1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.channel.AbstractServerChannel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelPipeline;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.channel.DefaultChannelConfig;
23 import io.netty.channel.EventLoop;
24 import io.netty.channel.IoEvent;
25 import io.netty.channel.IoEventLoop;
26 import io.netty.channel.IoEventLoopGroup;
27 import io.netty.channel.IoRegistration;
28 import io.netty.channel.PreferHeapByteBufAllocator;
29 import io.netty.channel.RecvByteBufAllocator;
30 import io.netty.channel.ServerChannel;
31 import io.netty.channel.ServerChannelRecvByteBufAllocator;
32 import io.netty.channel.SingleThreadEventLoop;
33 import io.netty.util.concurrent.SingleThreadEventExecutor;
34
35 import java.net.SocketAddress;
36 import java.util.ArrayDeque;
37 import java.util.Queue;
38
39
40
41
42 public class LocalServerChannel extends AbstractServerChannel {
43
44 private final ChannelConfig config =
45 new DefaultChannelConfig(this, new ServerChannelRecvByteBufAllocator()) { };
46 private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
47 private final Runnable shutdownHook = new Runnable() {
48 @Override
49 public void run() {
50 unsafe().close(unsafe().voidPromise());
51 }
52 };
53
54 private IoRegistration registration;
55
56 private volatile int state;
57 private volatile LocalAddress localAddress;
58 private volatile boolean acceptInProgress;
59
60 public LocalServerChannel() {
61 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
62 }
63
64 @Override
65 public ChannelConfig config() {
66 return config;
67 }
68
69 @Override
70 public LocalAddress localAddress() {
71 return (LocalAddress) super.localAddress();
72 }
73
74 @Override
75 public LocalAddress remoteAddress() {
76 return (LocalAddress) super.remoteAddress();
77 }
78
79 @Override
80 public boolean isOpen() {
81 return state < 2;
82 }
83
84 @Override
85 public boolean isActive() {
86 return state == 1;
87 }
88
89 @Override
90 protected boolean isCompatible(EventLoop loop) {
91 return loop instanceof SingleThreadEventLoop ||
92 (loop instanceof IoEventLoopGroup && ((IoEventLoopGroup) loop).isCompatible(LocalServerUnsafe.class));
93 }
94
95 @Override
96 protected SocketAddress localAddress0() {
97 return localAddress;
98 }
99
100 @Override
101 protected void doRegister(ChannelPromise promise) {
102 EventLoop loop = eventLoop();
103 if (loop instanceof IoEventLoop) {
104 assert registration == null;
105 ((IoEventLoop) loop).register((LocalServerUnsafe) unsafe()).addListener(f -> {
106 if (f.isSuccess()) {
107 registration = (IoRegistration) f.getNow();
108 promise.setSuccess();
109 } else {
110 promise.setFailure(f.cause());
111 }
112 });
113 } else {
114 try {
115 ((LocalServerUnsafe) unsafe()).registerNow();
116 } catch (Throwable cause) {
117 promise.setFailure(cause);
118 return;
119 }
120 promise.setSuccess();
121 }
122 }
123
124 @Override
125 protected void doBind(SocketAddress localAddress) throws Exception {
126 this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
127 state = 1;
128 }
129
130 @Override
131 protected void doClose() throws Exception {
132 if (state <= 1) {
133
134 if (localAddress != null) {
135 LocalChannelRegistry.unregister(localAddress);
136 localAddress = null;
137 }
138 state = 2;
139 }
140 }
141
142 @Override
143 protected void doDeregister() throws Exception {
144 EventLoop loop = eventLoop();
145 if (loop instanceof IoEventLoop) {
146 IoRegistration registration = this.registration;
147 if (registration != null) {
148 this.registration = null;
149 registration.cancel();
150 }
151 } else {
152 ((LocalServerUnsafe) unsafe()).deregisterNow();
153 }
154 }
155
156 @Override
157 protected void doBeginRead() throws Exception {
158 if (acceptInProgress) {
159 return;
160 }
161
162 Queue<Object> inboundBuffer = this.inboundBuffer;
163 if (inboundBuffer.isEmpty()) {
164 acceptInProgress = true;
165 return;
166 }
167
168 readInbound();
169 }
170
171 LocalChannel serve(final LocalChannel peer) {
172 final LocalChannel child = newLocalChannel(peer);
173 if (eventLoop().inEventLoop()) {
174 serve0(child);
175 } else {
176 eventLoop().execute(new Runnable() {
177 @Override
178 public void run() {
179 serve0(child);
180 }
181 });
182 }
183 return child;
184 }
185
186 private void readInbound() {
187 RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
188 handle.reset(config());
189 ChannelPipeline pipeline = pipeline();
190 do {
191 Object m = inboundBuffer.poll();
192 if (m == null) {
193 break;
194 }
195 pipeline.fireChannelRead(m);
196 } while (handle.continueReading());
197 handle.readComplete();
198 pipeline.fireChannelReadComplete();
199 }
200
201
202
203
204
205 protected LocalChannel newLocalChannel(LocalChannel peer) {
206 return new LocalChannel(this, peer);
207 }
208
209 private void serve0(final LocalChannel child) {
210 inboundBuffer.add(child);
211 if (acceptInProgress) {
212 acceptInProgress = false;
213
214 readInbound();
215 }
216 }
217
218 @Override
219 protected AbstractUnsafe newUnsafe() {
220 return new LocalServerUnsafe();
221 }
222
223 private class LocalServerUnsafe extends AbstractUnsafe implements LocalIoHandle {
224 @Override
225 public void close() {
226 close(voidPromise());
227 }
228
229 @Override
230 public void handle(IoRegistration registration, IoEvent event) {
231
232 }
233
234 @Override
235 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
236 safeSetFailure(promise, new UnsupportedOperationException());
237 }
238
239 @Override
240 public void registerNow() {
241 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
242 }
243
244 @Override
245 public void deregisterNow() {
246 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
247 }
248
249 @Override
250 public void closeNow() {
251 close(voidPromise());
252 }
253 }
254 }