1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.channel.AbstractServerChannel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelPipeline;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.channel.DefaultChannelConfig;
23 import io.netty.channel.EventLoop;
24 import io.netty.channel.IoEvent;
25 import io.netty.channel.IoEventLoop;
26 import io.netty.channel.IoEventLoopGroup;
27 import io.netty.channel.IoRegistration;
28 import io.netty.channel.PreferHeapByteBufAllocator;
29 import io.netty.channel.RecvByteBufAllocator;
30 import io.netty.channel.ServerChannel;
31 import io.netty.channel.ServerChannelRecvByteBufAllocator;
32 import io.netty.channel.SingleThreadEventLoop;
33 import io.netty.util.concurrent.SingleThreadEventExecutor;
34
35 import java.net.SocketAddress;
36 import java.util.ArrayDeque;
37 import java.util.Queue;
38
39
40
41
42 public class LocalServerChannel extends AbstractServerChannel {
43
44 private final ChannelConfig config =
45 new DefaultChannelConfig(this, new ServerChannelRecvByteBufAllocator()) { };
46 private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
47
48 private IoRegistration registration;
49
50 private volatile int state;
51 private volatile LocalAddress localAddress;
52 private volatile boolean acceptInProgress;
53
54 public LocalServerChannel() {
55 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
56 }
57
58 @Override
59 public ChannelConfig config() {
60 return config;
61 }
62
63 @Override
64 public LocalAddress localAddress() {
65 return (LocalAddress) super.localAddress();
66 }
67
68 @Override
69 public LocalAddress remoteAddress() {
70 return (LocalAddress) super.remoteAddress();
71 }
72
73 @Override
74 public boolean isOpen() {
75 return state < 2;
76 }
77
78 @Override
79 public boolean isActive() {
80 return state == 1;
81 }
82
83 @Override
84 protected boolean isCompatible(EventLoop loop) {
85 return loop instanceof SingleThreadEventLoop ||
86 (loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(LocalServerUnsafe.class));
87 }
88
89 @Override
90 protected SocketAddress localAddress0() {
91 return localAddress;
92 }
93
94 @Override
95 protected void doRegister(ChannelPromise promise) {
96 EventLoop loop = eventLoop();
97 if (loop instanceof IoEventLoop) {
98 assert registration == null;
99 ((IoEventLoop) loop).register((LocalServerUnsafe) unsafe()).addListener(f -> {
100 if (f.isSuccess()) {
101 registration = (IoRegistration) f.getNow();
102 promise.setSuccess();
103 } else {
104 promise.setFailure(f.cause());
105 }
106 });
107 } else {
108 try {
109 ((LocalServerUnsafe) unsafe()).registered();
110 } catch (Throwable cause) {
111 promise.setFailure(cause);
112 return;
113 }
114 promise.setSuccess();
115 }
116 }
117
118 @Override
119 protected void doBind(SocketAddress localAddress) throws Exception {
120 this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
121 state = 1;
122 }
123
124 @Override
125 protected void doClose() throws Exception {
126 if (state <= 1) {
127
128 if (localAddress != null) {
129 LocalChannelRegistry.unregister(localAddress);
130 localAddress = null;
131 }
132 state = 2;
133 }
134 }
135
136 @Override
137 protected void doDeregister() throws Exception {
138 EventLoop loop = eventLoop();
139 if (loop instanceof IoEventLoop) {
140 IoRegistration registration = this.registration;
141 if (registration != null) {
142 this.registration = null;
143 registration.cancel();
144 }
145 } else {
146 ((LocalServerUnsafe) unsafe()).unregistered();
147 }
148 }
149
150 @Override
151 protected void doBeginRead() throws Exception {
152 if (acceptInProgress) {
153 return;
154 }
155
156 Queue<Object> inboundBuffer = this.inboundBuffer;
157 if (inboundBuffer.isEmpty()) {
158 acceptInProgress = true;
159 return;
160 }
161
162 readInbound();
163 }
164
165 LocalChannel serve(final LocalChannel peer) {
166 final LocalChannel child = newLocalChannel(peer);
167 if (eventLoop().inEventLoop()) {
168 serve0(child);
169 } else {
170 eventLoop().execute(new Runnable() {
171 @Override
172 public void run() {
173 serve0(child);
174 }
175 });
176 }
177 return child;
178 }
179
180 private void readInbound() {
181 RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
182 handle.reset(config());
183 ChannelPipeline pipeline = pipeline();
184 do {
185 Object m = inboundBuffer.poll();
186 if (m == null) {
187 break;
188 }
189 pipeline.fireChannelRead(m);
190 } while (handle.continueReading());
191 handle.readComplete();
192 pipeline.fireChannelReadComplete();
193 }
194
195
196
197
198
199 protected LocalChannel newLocalChannel(LocalChannel peer) {
200 return new LocalChannel(this, peer);
201 }
202
203 private void serve0(final LocalChannel child) {
204 inboundBuffer.add(child);
205 if (acceptInProgress) {
206 acceptInProgress = false;
207
208 readInbound();
209 }
210 }
211
212 @Override
213 protected AbstractUnsafe newUnsafe() {
214 return new LocalServerUnsafe();
215 }
216
217 private class LocalServerUnsafe extends AbstractUnsafe implements LocalIoHandle {
218 private final Runnable shutdownHook = this::closeNow;
219
220 @Override
221 public void close() {
222 close(voidPromise());
223 }
224
225 @Override
226 public void handle(IoRegistration registration, IoEvent event) {
227
228 }
229
230 @Override
231 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
232 safeSetFailure(promise, new UnsupportedOperationException());
233 }
234
235 @Override
236 public void registered() {
237 EventLoop loop = eventLoop();
238 if (!(loop instanceof IoEventLoop) && loop instanceof SingleThreadEventExecutor) {
239 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
240 }
241 }
242
243 @Override
244 public void unregistered() {
245 EventLoop loop = eventLoop();
246 if (!(loop instanceof IoEventLoop) && loop instanceof SingleThreadEventExecutor) {
247 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
248 }
249 }
250
251 @Override
252 public void closeNow() {
253 close(voidPromise());
254 }
255 }
256 }