1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AbstractChannel;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultChannelConfig;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.IoEvent;
29 import io.netty.channel.IoEventLoop;
30 import io.netty.channel.IoRegistration;
31 import io.netty.channel.PreferHeapByteBufAllocator;
32 import io.netty.channel.RecvByteBufAllocator;
33 import io.netty.channel.SingleThreadEventLoop;
34 import io.netty.util.ReferenceCountUtil;
35 import io.netty.util.concurrent.Future;
36 import io.netty.util.concurrent.SingleThreadEventExecutor;
37 import io.netty.util.internal.InternalThreadLocalMap;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.net.ConnectException;
43 import java.net.SocketAddress;
44 import java.nio.channels.AlreadyConnectedException;
45 import java.nio.channels.ClosedChannelException;
46 import java.nio.channels.ConnectionPendingException;
47 import java.nio.channels.NotYetConnectedException;
48 import java.util.Queue;
49 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50
51
52
53
54 public class LocalChannel extends AbstractChannel {
55 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
56 @SuppressWarnings({ "rawtypes" })
57 private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
58 AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
59 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
60 private static final int MAX_READER_STACK_DEPTH = 8;
61
62 private enum State { OPEN, BOUND, CONNECTED, CLOSED }
63
64 private final ChannelConfig config = new DefaultChannelConfig(this);
65
66 final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
67 private final Runnable readTask = new Runnable() {
68 @Override
69 public void run() {
70
71 if (!inboundBuffer.isEmpty()) {
72 readInbound();
73 }
74 }
75 };
76
77 private final Runnable shutdownHook = new Runnable() {
78 @Override
79 public void run() {
80 unsafe().close(unsafe().voidPromise());
81 }
82 };
83
84 private IoRegistration registration;
85
86 private volatile State state;
87 private volatile LocalChannel peer;
88 private volatile LocalAddress localAddress;
89 private volatile LocalAddress remoteAddress;
90 private volatile ChannelPromise connectPromise;
91 private volatile boolean readInProgress;
92 private volatile boolean writeInProgress;
93 private volatile Future<?> finishReadFuture;
94
95 public LocalChannel() {
96 super(null);
97 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
98 }
99
100 protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
101 super(parent);
102 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
103 this.peer = peer;
104 localAddress = parent.localAddress();
105 remoteAddress = peer.localAddress();
106 }
107
108 @Override
109 public ChannelMetadata metadata() {
110 return METADATA;
111 }
112
113 @Override
114 public ChannelConfig config() {
115 return config;
116 }
117
118 @Override
119 public LocalServerChannel parent() {
120 return (LocalServerChannel) super.parent();
121 }
122
123 @Override
124 public LocalAddress localAddress() {
125 return (LocalAddress) super.localAddress();
126 }
127
128 @Override
129 public LocalAddress remoteAddress() {
130 return (LocalAddress) super.remoteAddress();
131 }
132
133 @Override
134 public boolean isOpen() {
135 return state != State.CLOSED;
136 }
137
138 @Override
139 public boolean isActive() {
140 return state == State.CONNECTED;
141 }
142
143 @Override
144 protected AbstractUnsafe newUnsafe() {
145 return new LocalUnsafe();
146 }
147
148 @Override
149 protected boolean isCompatible(EventLoop loop) {
150 return loop instanceof SingleThreadEventLoop ||
151 (loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(LocalUnsafe.class));
152 }
153
154 @Override
155 protected SocketAddress localAddress0() {
156 return localAddress;
157 }
158
159 @Override
160 protected SocketAddress remoteAddress0() {
161 return remoteAddress;
162 }
163
164 @Override
165 protected void doRegister(ChannelPromise promise) {
166 EventLoop loop = eventLoop();
167 if (loop instanceof IoEventLoop) {
168 assert registration == null;
169 ((IoEventLoop) loop).register((LocalUnsafe) unsafe()).addListener(f -> {
170 if (f.isSuccess()) {
171 registration = (IoRegistration) f.getNow();
172 promise.setSuccess();
173 } else {
174 promise.setFailure(f.cause());
175 }
176 });
177 } else {
178 try {
179 ((LocalUnsafe) unsafe()).registerNow();
180 } catch (Throwable cause) {
181 promise.setFailure(cause);
182 }
183 promise.setSuccess();
184 }
185 }
186
187 @Override
188 protected void doDeregister() throws Exception {
189 EventLoop loop = eventLoop();
190 if (loop instanceof IoEventLoop) {
191 IoRegistration registration = this.registration;
192 if (registration != null) {
193 this.registration = null;
194 registration.cancel();
195 }
196 } else {
197 ((LocalUnsafe) unsafe()).deregisterNow();
198 }
199 }
200
201 @Override
202 protected void doBind(SocketAddress localAddress) throws Exception {
203 this.localAddress =
204 LocalChannelRegistry.register(this, this.localAddress,
205 localAddress);
206 state = State.BOUND;
207 }
208
209 @Override
210 protected void doDisconnect() throws Exception {
211 doClose();
212 }
213
214 @Override
215 protected void doClose() throws Exception {
216 final LocalChannel peer = this.peer;
217 State oldState = state;
218 try {
219 if (oldState != State.CLOSED) {
220
221 if (localAddress != null) {
222 if (parent() == null) {
223 LocalChannelRegistry.unregister(localAddress);
224 }
225 localAddress = null;
226 }
227
228
229
230 state = State.CLOSED;
231
232
233 if (writeInProgress && peer != null) {
234 finishPeerRead(peer);
235 }
236
237 ChannelPromise promise = connectPromise;
238 if (promise != null) {
239
240 promise.tryFailure(new ClosedChannelException());
241 connectPromise = null;
242 }
243 }
244
245 if (peer != null) {
246 this.peer = null;
247
248
249
250 EventLoop peerEventLoop = peer.eventLoop();
251 final boolean peerIsActive = peer.isActive();
252 try {
253 peerEventLoop.execute(new Runnable() {
254 @Override
255 public void run() {
256 peer.tryClose(peerIsActive);
257 }
258 });
259 } catch (Throwable cause) {
260 logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
261 this, peer, cause);
262 if (peerEventLoop.inEventLoop()) {
263 peer.releaseInboundBuffers();
264 } else {
265
266
267 peer.close();
268 }
269 PlatformDependent.throwException(cause);
270 }
271 }
272 } finally {
273
274 if (oldState != null && oldState != State.CLOSED) {
275
276
277
278
279 releaseInboundBuffers();
280 }
281 }
282 }
283
284 private void tryClose(boolean isActive) {
285 if (isActive) {
286 unsafe().close(unsafe().voidPromise());
287 } else {
288 releaseInboundBuffers();
289 }
290 }
291
292 private void readInbound() {
293 RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
294 handle.reset(config());
295 ChannelPipeline pipeline = pipeline();
296 do {
297 Object received = inboundBuffer.poll();
298 if (received == null) {
299 break;
300 }
301 if (received instanceof ByteBuf && inboundBuffer.peek() instanceof ByteBuf) {
302 ByteBuf msg = (ByteBuf) received;
303 ByteBuf output = handle.allocate(alloc());
304 if (msg.readableBytes() < output.writableBytes()) {
305
306 output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
307 msg.release();
308 while ((received = inboundBuffer.peek()) instanceof ByteBuf &&
309 ((ByteBuf) received).readableBytes() < output.writableBytes()) {
310 inboundBuffer.poll();
311 msg = (ByteBuf) received;
312 output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
313 msg.release();
314 }
315 handle.lastBytesRead(output.readableBytes());
316 received = output;
317 } else {
318
319 handle.lastBytesRead(output.capacity());
320 output.release();
321 }
322 }
323 handle.incMessagesRead(1);
324 pipeline.fireChannelRead(received);
325 } while (handle.continueReading());
326 handle.readComplete();
327 pipeline.fireChannelReadComplete();
328 }
329
330 @Override
331 protected void doBeginRead() throws Exception {
332 if (readInProgress) {
333 return;
334 }
335
336 Queue<Object> inboundBuffer = this.inboundBuffer;
337 if (inboundBuffer.isEmpty()) {
338 readInProgress = true;
339 return;
340 }
341
342 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
343 final int stackDepth = threadLocals.localChannelReaderStackDepth();
344 if (stackDepth < MAX_READER_STACK_DEPTH) {
345 threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
346 try {
347 readInbound();
348 } finally {
349 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
350 }
351 } else {
352 try {
353 eventLoop().execute(readTask);
354 } catch (Throwable cause) {
355 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
356 close();
357 peer.close();
358 PlatformDependent.throwException(cause);
359 }
360 }
361 }
362
363 @Override
364 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
365 switch (state) {
366 case OPEN:
367 case BOUND:
368 throw new NotYetConnectedException();
369 case CLOSED:
370 throw new ClosedChannelException();
371 case CONNECTED:
372 break;
373 }
374
375 final LocalChannel peer = this.peer;
376
377 writeInProgress = true;
378 try {
379 ClosedChannelException exception = null;
380 for (;;) {
381 Object msg = in.current();
382 if (msg == null) {
383 break;
384 }
385 try {
386
387
388 if (peer.state == State.CONNECTED) {
389 peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
390 in.remove();
391 } else {
392 if (exception == null) {
393 exception = new ClosedChannelException();
394 }
395 in.remove(exception);
396 }
397 } catch (Throwable cause) {
398 in.remove(cause);
399 }
400 }
401 } finally {
402
403
404
405
406
407 writeInProgress = false;
408 }
409
410 finishPeerRead(peer);
411 }
412
413 private void finishPeerRead(final LocalChannel peer) {
414
415 if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
416 finishPeerRead0(peer);
417 } else {
418 runFinishPeerReadTask(peer);
419 }
420 }
421
422 private void runFinishPeerReadTask(final LocalChannel peer) {
423
424
425 final Runnable finishPeerReadTask = new Runnable() {
426 @Override
427 public void run() {
428 finishPeerRead0(peer);
429 }
430 };
431 try {
432 if (peer.writeInProgress) {
433 peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
434 } else {
435 peer.eventLoop().execute(finishPeerReadTask);
436 }
437 } catch (Throwable cause) {
438 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
439 close();
440 peer.close();
441 PlatformDependent.throwException(cause);
442 }
443 }
444
445 private void releaseInboundBuffers() {
446 assert eventLoop() == null || eventLoop().inEventLoop();
447 readInProgress = false;
448 Queue<Object> inboundBuffer = this.inboundBuffer;
449 Object msg;
450 while ((msg = inboundBuffer.poll()) != null) {
451 ReferenceCountUtil.release(msg);
452 }
453 }
454
455 private void finishPeerRead0(LocalChannel peer) {
456 Future<?> peerFinishReadFuture = peer.finishReadFuture;
457 if (peerFinishReadFuture != null) {
458 if (!peerFinishReadFuture.isDone()) {
459 runFinishPeerReadTask(peer);
460 return;
461 } else {
462
463 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
464 }
465 }
466
467
468 if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
469 peer.readInProgress = false;
470 peer.readInbound();
471 }
472 }
473
474 private class LocalUnsafe extends AbstractUnsafe implements LocalIoHandle {
475
476 @Override
477 public void close() {
478 close(voidPromise());
479 }
480
481 @Override
482 public void handle(IoRegistration registration, IoEvent event) {
483
484 }
485
486 @Override
487 public void registerNow() {
488
489
490
491
492
493 if (peer != null && parent() != null) {
494
495
496 final LocalChannel peer = LocalChannel.this.peer;
497 state = State.CONNECTED;
498
499 peer.remoteAddress = parent() == null ? null : parent().localAddress();
500 peer.state = State.CONNECTED;
501
502
503
504
505
506 peer.eventLoop().execute(new Runnable() {
507 @Override
508 public void run() {
509 ChannelPromise promise = peer.connectPromise;
510
511
512
513 if (promise != null && promise.trySuccess()) {
514 peer.pipeline().fireChannelActive();
515 }
516 }
517 });
518 }
519 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
520 }
521
522 @Override
523 public void deregisterNow() {
524
525 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
526 }
527
528 @Override
529 public void closeNow() {
530 close(voidPromise());
531 }
532
533 @Override
534 public void connect(final SocketAddress remoteAddress,
535 SocketAddress localAddress, final ChannelPromise promise) {
536 if (!promise.setUncancellable() || !ensureOpen(promise)) {
537 return;
538 }
539
540 if (state == State.CONNECTED) {
541 Exception cause = new AlreadyConnectedException();
542 safeSetFailure(promise, cause);
543 pipeline().fireExceptionCaught(cause);
544 return;
545 }
546
547 if (connectPromise != null) {
548 throw new ConnectionPendingException();
549 }
550
551 connectPromise = promise;
552
553 if (state != State.BOUND) {
554
555 if (localAddress == null) {
556 localAddress = new LocalAddress(LocalChannel.this);
557 }
558 }
559
560 if (localAddress != null) {
561 try {
562 doBind(localAddress);
563 } catch (Throwable t) {
564 safeSetFailure(promise, t);
565 close(voidPromise());
566 return;
567 }
568 }
569
570 Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
571 if (!(boundChannel instanceof LocalServerChannel)) {
572 Exception cause = new ConnectException("connection refused: " + remoteAddress);
573 safeSetFailure(promise, cause);
574 close(voidPromise());
575 return;
576 }
577
578 LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
579 peer = serverChannel.serve(LocalChannel.this);
580 }
581 }
582 }