1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.channel.AbstractChannel;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOutboundBuffer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.DefaultChannelConfig;
26 import io.netty.channel.EventLoop;
27 import io.netty.channel.SingleThreadEventLoop;
28 import io.netty.util.ReferenceCountUtil;
29 import io.netty.util.concurrent.Future;
30 import io.netty.util.concurrent.SingleThreadEventExecutor;
31 import io.netty.util.internal.InternalThreadLocalMap;
32 import io.netty.util.internal.PlatformDependent;
33 import io.netty.util.internal.ThrowableUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.net.ConnectException;
38 import java.net.SocketAddress;
39 import java.nio.channels.AlreadyConnectedException;
40 import java.nio.channels.ClosedChannelException;
41 import java.nio.channels.ConnectionPendingException;
42 import java.nio.channels.NotYetConnectedException;
43 import java.util.Queue;
44 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45
46
47
48
49 public class LocalChannel extends AbstractChannel {
50 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
51 @SuppressWarnings({ "rawtypes" })
52 private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
53 AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
54 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
55 private static final int MAX_READER_STACK_DEPTH = 8;
56 private static final ClosedChannelException DO_WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
57 new ClosedChannelException(), LocalChannel.class, "doWrite(...)");
58 private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
59 new ClosedChannelException(), LocalChannel.class, "doClose()");
60
61 private enum State { OPEN, BOUND, CONNECTED, CLOSED }
62
63 private final ChannelConfig config = new DefaultChannelConfig(this);
64
65 final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
66 private final Runnable readTask = new Runnable() {
67 @Override
68 public void run() {
69 ChannelPipeline pipeline = pipeline();
70 for (;;) {
71 Object m = inboundBuffer.poll();
72 if (m == null) {
73 break;
74 }
75 pipeline.fireChannelRead(m);
76 }
77 pipeline.fireChannelReadComplete();
78 }
79 };
80 private final Runnable shutdownHook = new Runnable() {
81 @Override
82 public void run() {
83 unsafe().close(unsafe().voidPromise());
84 }
85 };
86
87 private volatile int state;
88 private volatile LocalChannel peer;
89 private volatile LocalAddress localAddress;
90 private volatile LocalAddress remoteAddress;
91 private volatile ChannelPromise connectPromise;
92 private volatile boolean readInProgress;
93 private volatile boolean registerInProgress;
94 private volatile boolean writeInProgress;
95 private volatile Future<?> finishReadFuture;
96
97 public LocalChannel() {
98 super(null);
99 }
100
101 protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
102 super(parent);
103 this.peer = peer;
104 localAddress = parent.localAddress();
105 remoteAddress = peer.localAddress();
106 }
107
108 @Override
109 public ChannelMetadata metadata() {
110 return METADATA;
111 }
112
113 @Override
114 public ChannelConfig config() {
115 return config;
116 }
117
118 @Override
119 public LocalServerChannel parent() {
120 return (LocalServerChannel) super.parent();
121 }
122
123 @Override
124 public LocalAddress localAddress() {
125 return (LocalAddress) super.localAddress();
126 }
127
128 @Override
129 public LocalAddress remoteAddress() {
130 return (LocalAddress) super.remoteAddress();
131 }
132
133 @Override
134 public boolean isOpen() {
135 return state < 3;
136 }
137
138 @Override
139 public boolean isActive() {
140 return state == 2;
141 }
142
143 @Override
144 protected AbstractUnsafe newUnsafe() {
145 return new LocalUnsafe();
146 }
147
148 @Override
149 protected boolean isCompatible(EventLoop loop) {
150 return loop instanceof SingleThreadEventLoop;
151 }
152
153 @Override
154 protected SocketAddress localAddress0() {
155 return localAddress;
156 }
157
158 @Override
159 protected SocketAddress remoteAddress0() {
160 return remoteAddress;
161 }
162
163 @Override
164 protected void doRegister() throws Exception {
165
166
167
168
169
170 if (peer != null && parent() != null) {
171
172
173
174
175
176
177 final LocalChannel peer = this.peer;
178 registerInProgress = true;
179 state = 2;
180
181 peer.remoteAddress = parent().localAddress();
182 peer.state = 2;
183
184
185
186
187
188 peer.eventLoop().execute(new Runnable() {
189 @Override
190 public void run() {
191 registerInProgress = false;
192 ChannelPromise promise = peer.connectPromise;
193
194
195
196 if (promise != null && promise.trySuccess()) {
197 peer.pipeline().fireChannelActive();
198 }
199 }
200 });
201 }
202 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
203 }
204
205 @Override
206 protected void doBind(SocketAddress localAddress) throws Exception {
207 this.localAddress =
208 LocalChannelRegistry.register(this, this.localAddress,
209 localAddress);
210 state = 1;
211 }
212
213 @Override
214 protected void doDisconnect() throws Exception {
215 doClose();
216 }
217
218 @Override
219 protected void doClose() throws Exception {
220 final LocalChannel peer = this.peer;
221 int oldState = state;
222 try {
223 if (oldState <= 2) {
224
225 if (localAddress != null) {
226 if (parent() == null) {
227 LocalChannelRegistry.unregister(localAddress);
228 }
229 localAddress = null;
230 }
231
232
233
234 state = 3;
235
236
237 finishPeerRead(this);
238
239 ChannelPromise promise = connectPromise;
240 if (promise != null) {
241
242 promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
243 connectPromise = null;
244 }
245 }
246
247 if (peer != null) {
248 this.peer = null;
249
250
251
252
253 EventLoop peerEventLoop = peer.eventLoop();
254 final boolean peerIsActive = peer.isActive();
255 if (peerEventLoop.inEventLoop() && !registerInProgress) {
256 peer.tryClose(peerIsActive);
257 } else {
258 try {
259 peerEventLoop.execute(new Runnable() {
260 @Override
261 public void run() {
262 peer.tryClose(peerIsActive);
263 }
264 });
265 } catch (Throwable cause) {
266 logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
267 this, peer, cause);
268 releaseInboundBuffers();
269 if (peerEventLoop.inEventLoop()) {
270 peer.releaseInboundBuffers();
271 } else {
272
273
274 peer.close();
275 }
276 PlatformDependent.throwException(cause);
277 }
278 }
279 }
280 } finally {
281
282 if (peer != null && oldState != 3) {
283
284
285
286
287 releaseInboundBuffers();
288 }
289 }
290 }
291
292 private void tryClose(boolean isActive) {
293 if (isActive) {
294 unsafe().close(unsafe().voidPromise());
295 } else {
296 releaseInboundBuffers();
297 }
298 }
299
300 @Override
301 protected void doDeregister() throws Exception {
302
303 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
304 }
305
306 @Override
307 protected void doBeginRead() throws Exception {
308 if (readInProgress) {
309 return;
310 }
311
312 ChannelPipeline pipeline = pipeline();
313 Queue<Object> inboundBuffer = this.inboundBuffer;
314 if (inboundBuffer.isEmpty()) {
315 readInProgress = true;
316 return;
317 }
318
319 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
320 final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
321 if (stackDepth < MAX_READER_STACK_DEPTH) {
322 threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
323 try {
324 for (;;) {
325 Object received = inboundBuffer.poll();
326 if (received == null) {
327 break;
328 }
329 pipeline.fireChannelRead(received);
330 }
331 pipeline.fireChannelReadComplete();
332 } finally {
333 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
334 }
335 } else {
336 try {
337 eventLoop().execute(readTask);
338 } catch (Throwable cause) {
339 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
340 close();
341 peer.close();
342 PlatformDependent.throwException(cause);
343 }
344 }
345 }
346
347 @Override
348 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
349 if (state < 2) {
350 throw new NotYetConnectedException();
351 }
352 if (state > 2) {
353 throw DO_WRITE_CLOSED_CHANNEL_EXCEPTION;
354 }
355
356 final LocalChannel peer = this.peer;
357
358 writeInProgress = true;
359 try {
360 for (;;) {
361 Object msg = in.current();
362 if (msg == null) {
363 break;
364 }
365 try {
366
367
368 if (peer.state == 2) {
369 peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
370 in.remove();
371 } else {
372 in.remove(DO_WRITE_CLOSED_CHANNEL_EXCEPTION);
373 }
374 } catch (Throwable cause) {
375 in.remove(cause);
376 }
377 }
378 } finally {
379
380
381
382
383
384 writeInProgress = false;
385 }
386
387 finishPeerRead(peer);
388 }
389
390 private void finishPeerRead(final LocalChannel peer) {
391
392 if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
393 finishPeerRead0(peer);
394 } else {
395 runFinishPeerReadTask(peer);
396 }
397 }
398
399 private void runFinishPeerReadTask(final LocalChannel peer) {
400
401
402 final Runnable finishPeerReadTask = new Runnable() {
403 @Override
404 public void run() {
405 finishPeerRead0(peer);
406 }
407 };
408 try {
409 if (peer.writeInProgress) {
410 peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
411 } else {
412 peer.eventLoop().execute(finishPeerReadTask);
413 }
414 } catch (Throwable cause) {
415 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
416 close();
417 peer.close();
418 PlatformDependent.throwException(cause);
419 }
420 }
421
422 private void releaseInboundBuffers() {
423 assert eventLoop() == null || eventLoop().inEventLoop();
424 readInProgress = false;
425 Queue<Object> inboundBuffer = this.inboundBuffer;
426 Object msg;
427 while ((msg = inboundBuffer.poll()) != null) {
428 ReferenceCountUtil.release(msg);
429 }
430 }
431
432 private void finishPeerRead0(LocalChannel peer) {
433 Future<?> peerFinishReadFuture = peer.finishReadFuture;
434 if (peerFinishReadFuture != null) {
435 if (!peerFinishReadFuture.isDone()) {
436 runFinishPeerReadTask(peer);
437 return;
438 } else {
439
440 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
441 }
442 }
443 ChannelPipeline peerPipeline = peer.pipeline();
444 if (peer.readInProgress) {
445 peer.readInProgress = false;
446 for (;;) {
447 Object received = peer.inboundBuffer.poll();
448 if (received == null) {
449 break;
450 }
451 peerPipeline.fireChannelRead(received);
452 }
453 peerPipeline.fireChannelReadComplete();
454 }
455 }
456
457 private class LocalUnsafe extends AbstractUnsafe {
458
459 @Override
460 public void connect(final SocketAddress remoteAddress,
461 SocketAddress localAddress, final ChannelPromise promise) {
462 if (!promise.setUncancellable() || !ensureOpen(promise)) {
463 return;
464 }
465
466 if (state == 2) {
467 Exception cause = new AlreadyConnectedException();
468 safeSetFailure(promise, cause);
469 pipeline().fireExceptionCaught(cause);
470 return;
471 }
472
473 if (connectPromise != null) {
474 throw new ConnectionPendingException();
475 }
476
477 connectPromise = promise;
478
479 if (state != 1) {
480
481 if (localAddress == null) {
482 localAddress = new LocalAddress(LocalChannel.this);
483 }
484 }
485
486 if (localAddress != null) {
487 try {
488 doBind(localAddress);
489 } catch (Throwable t) {
490 safeSetFailure(promise, t);
491 close(voidPromise());
492 return;
493 }
494 }
495
496 Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
497 if (!(boundChannel instanceof LocalServerChannel)) {
498 Exception cause = new ConnectException("connection refused: " + remoteAddress);
499 safeSetFailure(promise, cause);
500 close(voidPromise());
501 return;
502 }
503
504 LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
505 peer = serverChannel.serve(LocalChannel.this);
506 }
507 }
508 }