1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AbstractChannel;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultChannelConfig;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.IoEvent;
29 import io.netty.channel.IoEventLoop;
30 import io.netty.channel.IoRegistration;
31 import io.netty.channel.PreferHeapByteBufAllocator;
32 import io.netty.channel.RecvByteBufAllocator;
33 import io.netty.channel.SingleThreadEventLoop;
34 import io.netty.util.ReferenceCountUtil;
35 import io.netty.util.concurrent.Future;
36 import io.netty.util.concurrent.SingleThreadEventExecutor;
37 import io.netty.util.internal.InternalThreadLocalMap;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.net.ConnectException;
43 import java.net.SocketAddress;
44 import java.nio.channels.AlreadyConnectedException;
45 import java.nio.channels.ClosedChannelException;
46 import java.nio.channels.ConnectionPendingException;
47 import java.nio.channels.NotYetConnectedException;
48 import java.util.Queue;
49 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50
51
52
53
54 public class LocalChannel extends AbstractChannel {
55 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
56 @SuppressWarnings({ "rawtypes" })
57 private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
58 AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
59 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
60 private static final int MAX_READER_STACK_DEPTH = 8;
61
62 private enum State { OPEN, BOUND, CONNECTED, CLOSED }
63
64 private final ChannelConfig config = new DefaultChannelConfig(this);
65
66 final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
67 private final Runnable readTask = new Runnable() {
68 @Override
69 public void run() {
70
71 if (!inboundBuffer.isEmpty()) {
72 readInbound();
73 }
74 }
75 };
76
77 private final Runnable finishReadTask = new Runnable() {
78 @Override
79 public void run() {
80 finishPeerRead0(LocalChannel.this);
81 }
82 };
83
84 private IoRegistration registration;
85
86 private volatile State state;
87 private volatile LocalChannel peer;
88 private volatile LocalAddress localAddress;
89 private volatile LocalAddress remoteAddress;
90 private volatile ChannelPromise connectPromise;
91 private volatile boolean readInProgress;
92 private volatile boolean writeInProgress;
93 private volatile Future<?> finishReadFuture;
94
95 public LocalChannel() {
96 super(null);
97 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
98 }
99
100 protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
101 super(parent);
102 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
103 this.peer = peer;
104 localAddress = parent.localAddress();
105 remoteAddress = peer.localAddress();
106 }
107
108 @Override
109 public ChannelMetadata metadata() {
110 return METADATA;
111 }
112
113 @Override
114 public ChannelConfig config() {
115 return config;
116 }
117
118 @Override
119 public LocalServerChannel parent() {
120 return (LocalServerChannel) super.parent();
121 }
122
123 @Override
124 public LocalAddress localAddress() {
125 return (LocalAddress) super.localAddress();
126 }
127
128 @Override
129 public LocalAddress remoteAddress() {
130 return (LocalAddress) super.remoteAddress();
131 }
132
133 @Override
134 public boolean isOpen() {
135 return state != State.CLOSED;
136 }
137
138 @Override
139 public boolean isActive() {
140 return state == State.CONNECTED;
141 }
142
143 @Override
144 protected AbstractUnsafe newUnsafe() {
145 return new LocalUnsafe();
146 }
147
148 @Override
149 protected boolean isCompatible(EventLoop loop) {
150 return loop instanceof SingleThreadEventLoop ||
151 (loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(LocalUnsafe.class));
152 }
153
154 @Override
155 protected SocketAddress localAddress0() {
156 return localAddress;
157 }
158
159 @Override
160 protected SocketAddress remoteAddress0() {
161 return remoteAddress;
162 }
163
164 @Override
165 protected void doRegister(ChannelPromise promise) {
166 EventLoop loop = eventLoop();
167 if (loop instanceof IoEventLoop) {
168 assert registration == null;
169 ((IoEventLoop) loop).register((LocalUnsafe) unsafe()).addListener(f -> {
170 if (f.isSuccess()) {
171 registration = (IoRegistration) f.getNow();
172 promise.setSuccess();
173 } else {
174 promise.setFailure(f.cause());
175 }
176 });
177 } else {
178 try {
179 ((LocalUnsafe) unsafe()).registered();
180 } catch (Throwable cause) {
181 promise.setFailure(cause);
182 }
183 promise.setSuccess();
184 }
185 }
186
187 @Override
188 protected void doDeregister() throws Exception {
189 EventLoop loop = eventLoop();
190 if (loop instanceof IoEventLoop) {
191 IoRegistration registration = this.registration;
192 if (registration != null) {
193 this.registration = null;
194 registration.cancel();
195 }
196 } else {
197 ((LocalUnsafe) unsafe()).unregistered();
198 }
199 }
200
201 @Override
202 protected void doBind(SocketAddress localAddress) throws Exception {
203 this.localAddress =
204 LocalChannelRegistry.register(this, this.localAddress,
205 localAddress);
206 state = State.BOUND;
207 }
208
209 @Override
210 protected void doDisconnect() throws Exception {
211 doClose();
212 }
213
214 @Override
215 protected void doClose() throws Exception {
216 final LocalChannel peer = this.peer;
217 State oldState = state;
218 try {
219 if (oldState != State.CLOSED) {
220
221 if (localAddress != null) {
222 if (parent() == null) {
223 LocalChannelRegistry.unregister(localAddress);
224 }
225 localAddress = null;
226 }
227
228
229
230 state = State.CLOSED;
231
232
233 if (writeInProgress && peer != null) {
234 finishPeerRead(peer);
235 }
236
237 ChannelPromise promise = connectPromise;
238 if (promise != null) {
239
240 promise.tryFailure(new ClosedChannelException());
241 connectPromise = null;
242 }
243 }
244
245 if (peer != null) {
246 this.peer = null;
247
248
249
250 EventLoop peerEventLoop = peer.eventLoop();
251 final boolean peerIsActive = peer.isActive();
252 try {
253 peerEventLoop.execute(new Runnable() {
254 @Override
255 public void run() {
256 peer.tryClose(peerIsActive);
257 }
258 });
259 } catch (Throwable cause) {
260 logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
261 this, peer, cause);
262 if (peerEventLoop.inEventLoop()) {
263 peer.releaseInboundBuffers();
264 } else {
265
266
267 peer.close();
268 }
269 PlatformDependent.throwException(cause);
270 }
271 }
272 } finally {
273
274 if (oldState != null && oldState != State.CLOSED) {
275
276
277
278
279 releaseInboundBuffers();
280 }
281 }
282 }
283
284 private void tryClose(boolean isActive) {
285 if (isActive) {
286 unsafe().close(unsafe().voidPromise());
287 } else {
288 releaseInboundBuffers();
289 }
290 }
291
292 private void readInbound() {
293 RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
294 handle.reset(config());
295 ChannelPipeline pipeline = pipeline();
296 do {
297 Object received = inboundBuffer.poll();
298 if (received == null) {
299 break;
300 }
301 if (received instanceof ByteBuf && inboundBuffer.peek() instanceof ByteBuf) {
302 ByteBuf msg = (ByteBuf) received;
303 ByteBuf output = handle.allocate(alloc());
304 if (msg.readableBytes() < output.writableBytes()) {
305
306 output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
307 msg.release();
308 while ((received = inboundBuffer.peek()) instanceof ByteBuf &&
309 ((ByteBuf) received).readableBytes() < output.writableBytes()) {
310 inboundBuffer.poll();
311 msg = (ByteBuf) received;
312 output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
313 msg.release();
314 }
315 handle.lastBytesRead(output.readableBytes());
316 received = output;
317 } else {
318
319 handle.lastBytesRead(output.capacity());
320 output.release();
321 }
322 }
323 handle.incMessagesRead(1);
324 pipeline.fireChannelRead(received);
325 } while (handle.continueReading());
326 handle.readComplete();
327 pipeline.fireChannelReadComplete();
328 }
329
330 @Override
331 protected void doBeginRead() throws Exception {
332 if (readInProgress) {
333 return;
334 }
335
336 Queue<Object> inboundBuffer = this.inboundBuffer;
337 if (inboundBuffer.isEmpty()) {
338 readInProgress = true;
339 return;
340 }
341
342 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
343 final int stackDepth = threadLocals.localChannelReaderStackDepth();
344 if (stackDepth < MAX_READER_STACK_DEPTH) {
345 threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
346 try {
347 readInbound();
348 } finally {
349 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
350 }
351 } else {
352 try {
353 eventLoop().execute(readTask);
354 } catch (Throwable cause) {
355 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
356 close();
357 peer.close();
358 PlatformDependent.throwException(cause);
359 }
360 }
361 }
362
363 @Override
364 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
365 switch (state) {
366 case OPEN:
367 case BOUND:
368 throw new NotYetConnectedException();
369 case CLOSED:
370 throw new ClosedChannelException();
371 case CONNECTED:
372 break;
373 }
374
375 final LocalChannel peer = this.peer;
376
377 writeInProgress = true;
378 try {
379 ClosedChannelException exception = null;
380 for (;;) {
381 Object msg = in.current();
382 if (msg == null) {
383 break;
384 }
385 try {
386
387
388 if (peer.state == State.CONNECTED) {
389 peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
390 in.remove();
391 } else {
392 if (exception == null) {
393 exception = new ClosedChannelException();
394 }
395 in.remove(exception);
396 }
397 } catch (Throwable cause) {
398 in.remove(cause);
399 }
400 }
401 } finally {
402
403
404
405
406
407 writeInProgress = false;
408 }
409
410 finishPeerRead(peer);
411 }
412
413 private void finishPeerRead(final LocalChannel peer) {
414
415 if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
416 finishPeerRead0(peer);
417 } else {
418 runFinishPeerReadTask(peer);
419 }
420 }
421
422 private void runFinishTask0() {
423
424
425 if (writeInProgress) {
426 finishReadFuture = eventLoop().submit(finishReadTask);
427 } else {
428 eventLoop().execute(finishReadTask);
429 }
430 }
431
432 private void runFinishPeerReadTask(final LocalChannel peer) {
433 try {
434 peer.runFinishTask0();
435 } catch (Throwable cause) {
436 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
437 close();
438 peer.close();
439 PlatformDependent.throwException(cause);
440 }
441 }
442
443 private void releaseInboundBuffers() {
444 assert eventLoop() == null || eventLoop().inEventLoop();
445 readInProgress = false;
446 Queue<Object> inboundBuffer = this.inboundBuffer;
447 Object msg;
448 while ((msg = inboundBuffer.poll()) != null) {
449 ReferenceCountUtil.release(msg);
450 }
451 }
452
453 private void finishPeerRead0(LocalChannel peer) {
454 Future<?> peerFinishReadFuture = peer.finishReadFuture;
455 if (peerFinishReadFuture != null) {
456 if (!peerFinishReadFuture.isDone()) {
457 runFinishPeerReadTask(peer);
458 return;
459 } else {
460
461 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
462 }
463 }
464
465
466 if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
467 peer.readInProgress = false;
468 peer.readInbound();
469 }
470 }
471
472 private class LocalUnsafe extends AbstractUnsafe implements LocalIoHandle {
473
474 private final Runnable shutdownHook = this::closeNow;
475
476 @Override
477 public void close() {
478 close(voidPromise());
479 }
480
481 @Override
482 public void handle(IoRegistration registration, IoEvent event) {
483
484 }
485
486 @Override
487 public void registered() {
488
489
490
491
492
493 if (peer != null && parent() != null) {
494
495
496 final LocalChannel peer = LocalChannel.this.peer;
497 state = State.CONNECTED;
498
499 peer.remoteAddress = parent() == null ? null : parent().localAddress();
500 peer.state = State.CONNECTED;
501
502
503
504
505
506 peer.eventLoop().execute(new Runnable() {
507 @Override
508 public void run() {
509 ChannelPromise promise = peer.connectPromise;
510
511
512
513 if (promise != null && promise.trySuccess()) {
514 peer.pipeline().fireChannelActive();
515 }
516 }
517 });
518 }
519 EventLoop loop = eventLoop();
520 if (!(loop instanceof IoEventLoop) && loop instanceof SingleThreadEventExecutor) {
521 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
522 }
523 }
524
525 @Override
526 public void unregistered() {
527 EventLoop loop = eventLoop();
528 if (!(loop instanceof IoEventLoop) && loop instanceof SingleThreadEventExecutor) {
529 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
530 }
531 }
532
533 @Override
534 public void closeNow() {
535 close(voidPromise());
536 }
537
538 @Override
539 public void connect(final SocketAddress remoteAddress,
540 SocketAddress localAddress, final ChannelPromise promise) {
541 if (!promise.setUncancellable() || !ensureOpen(promise)) {
542 return;
543 }
544
545 if (state == State.CONNECTED) {
546 Exception cause = new AlreadyConnectedException();
547 safeSetFailure(promise, cause);
548 return;
549 }
550
551 if (connectPromise != null) {
552 throw new ConnectionPendingException();
553 }
554
555 connectPromise = promise;
556
557 if (state != State.BOUND) {
558
559 if (localAddress == null) {
560 localAddress = new LocalAddress(LocalChannel.this);
561 }
562 }
563
564 if (localAddress != null) {
565 try {
566 doBind(localAddress);
567 } catch (Throwable t) {
568 safeSetFailure(promise, t);
569 close(voidPromise());
570 return;
571 }
572 }
573
574 Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
575 if (!(boundChannel instanceof LocalServerChannel)) {
576 Exception cause = new ConnectException("connection refused: " + remoteAddress);
577 safeSetFailure(promise, cause);
578 close(voidPromise());
579 return;
580 }
581
582 LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
583 peer = serverChannel.serve(LocalChannel.this);
584 }
585 }
586 }