1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AbstractChannel;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultChannelConfig;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.IoEvent;
29 import io.netty.channel.IoEventLoop;
30 import io.netty.channel.IoRegistration;
31 import io.netty.channel.PreferHeapByteBufAllocator;
32 import io.netty.channel.RecvByteBufAllocator;
33 import io.netty.channel.SingleThreadEventLoop;
34 import io.netty.util.ReferenceCountUtil;
35 import io.netty.util.concurrent.Future;
36 import io.netty.util.concurrent.SingleThreadEventExecutor;
37 import io.netty.util.internal.InternalThreadLocalMap;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.net.ConnectException;
43 import java.net.SocketAddress;
44 import java.nio.channels.AlreadyConnectedException;
45 import java.nio.channels.ClosedChannelException;
46 import java.nio.channels.ConnectionPendingException;
47 import java.nio.channels.NotYetConnectedException;
48 import java.util.Queue;
49 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50
51
52
53
54 public class LocalChannel extends AbstractChannel {
55 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
56 @SuppressWarnings({ "rawtypes" })
57 private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
58 AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
59 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
60 private static final int MAX_READER_STACK_DEPTH = 8;
61
62 private enum State { OPEN, BOUND, CONNECTED, CLOSED }
63
64 private final ChannelConfig config = new DefaultChannelConfig(this);
65
66 final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
67 private final Runnable readTask = new Runnable() {
68 @Override
69 public void run() {
70
71 if (!inboundBuffer.isEmpty()) {
72 readInbound();
73 }
74 }
75 };
76
77 private final Runnable shutdownHook = new Runnable() {
78 @Override
79 public void run() {
80 unsafe().close(unsafe().voidPromise());
81 }
82 };
83
84 private final Runnable finishReadTask = new Runnable() {
85 @Override
86 public void run() {
87 finishPeerRead0(LocalChannel.this);
88 }
89 };
90
91 private IoRegistration registration;
92
93 private volatile State state;
94 private volatile LocalChannel peer;
95 private volatile LocalAddress localAddress;
96 private volatile LocalAddress remoteAddress;
97 private volatile ChannelPromise connectPromise;
98 private volatile boolean readInProgress;
99 private volatile boolean writeInProgress;
100 private volatile Future<?> finishReadFuture;
101
102 public LocalChannel() {
103 super(null);
104 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
105 }
106
107 protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
108 super(parent);
109 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
110 this.peer = peer;
111 localAddress = parent.localAddress();
112 remoteAddress = peer.localAddress();
113 }
114
115 @Override
116 public ChannelMetadata metadata() {
117 return METADATA;
118 }
119
120 @Override
121 public ChannelConfig config() {
122 return config;
123 }
124
125 @Override
126 public LocalServerChannel parent() {
127 return (LocalServerChannel) super.parent();
128 }
129
130 @Override
131 public LocalAddress localAddress() {
132 return (LocalAddress) super.localAddress();
133 }
134
135 @Override
136 public LocalAddress remoteAddress() {
137 return (LocalAddress) super.remoteAddress();
138 }
139
140 @Override
141 public boolean isOpen() {
142 return state != State.CLOSED;
143 }
144
145 @Override
146 public boolean isActive() {
147 return state == State.CONNECTED;
148 }
149
150 @Override
151 protected AbstractUnsafe newUnsafe() {
152 return new LocalUnsafe();
153 }
154
155 @Override
156 protected boolean isCompatible(EventLoop loop) {
157 return loop instanceof SingleThreadEventLoop ||
158 (loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(LocalUnsafe.class));
159 }
160
161 @Override
162 protected SocketAddress localAddress0() {
163 return localAddress;
164 }
165
166 @Override
167 protected SocketAddress remoteAddress0() {
168 return remoteAddress;
169 }
170
171 @Override
172 protected void doRegister(ChannelPromise promise) {
173 EventLoop loop = eventLoop();
174 if (loop instanceof IoEventLoop) {
175 assert registration == null;
176 ((IoEventLoop) loop).register((LocalUnsafe) unsafe()).addListener(f -> {
177 if (f.isSuccess()) {
178 registration = (IoRegistration) f.getNow();
179 promise.setSuccess();
180 } else {
181 promise.setFailure(f.cause());
182 }
183 });
184 } else {
185 try {
186 ((LocalUnsafe) unsafe()).registered();
187 } catch (Throwable cause) {
188 promise.setFailure(cause);
189 }
190 promise.setSuccess();
191 }
192 }
193
194 @Override
195 protected void doDeregister() throws Exception {
196 EventLoop loop = eventLoop();
197 if (loop instanceof IoEventLoop) {
198 IoRegistration registration = this.registration;
199 if (registration != null) {
200 this.registration = null;
201 registration.cancel();
202 }
203 } else {
204 ((LocalUnsafe) unsafe()).unregistered();
205 }
206 }
207
208 @Override
209 protected void doBind(SocketAddress localAddress) throws Exception {
210 this.localAddress =
211 LocalChannelRegistry.register(this, this.localAddress,
212 localAddress);
213 state = State.BOUND;
214 }
215
216 @Override
217 protected void doDisconnect() throws Exception {
218 doClose();
219 }
220
221 @Override
222 protected void doClose() throws Exception {
223 final LocalChannel peer = this.peer;
224 State oldState = state;
225 try {
226 if (oldState != State.CLOSED) {
227
228 if (localAddress != null) {
229 if (parent() == null) {
230 LocalChannelRegistry.unregister(localAddress);
231 }
232 localAddress = null;
233 }
234
235
236
237 state = State.CLOSED;
238
239
240 if (writeInProgress && peer != null) {
241 finishPeerRead(peer);
242 }
243
244 ChannelPromise promise = connectPromise;
245 if (promise != null) {
246
247 promise.tryFailure(new ClosedChannelException());
248 connectPromise = null;
249 }
250 }
251
252 if (peer != null) {
253 this.peer = null;
254
255
256
257 EventLoop peerEventLoop = peer.eventLoop();
258 final boolean peerIsActive = peer.isActive();
259 try {
260 peerEventLoop.execute(new Runnable() {
261 @Override
262 public void run() {
263 peer.tryClose(peerIsActive);
264 }
265 });
266 } catch (Throwable cause) {
267 logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
268 this, peer, cause);
269 if (peerEventLoop.inEventLoop()) {
270 peer.releaseInboundBuffers();
271 } else {
272
273
274 peer.close();
275 }
276 PlatformDependent.throwException(cause);
277 }
278 }
279 } finally {
280
281 if (oldState != null && oldState != State.CLOSED) {
282
283
284
285
286 releaseInboundBuffers();
287 }
288 }
289 }
290
291 private void tryClose(boolean isActive) {
292 if (isActive) {
293 unsafe().close(unsafe().voidPromise());
294 } else {
295 releaseInboundBuffers();
296 }
297 }
298
299 private void readInbound() {
300 RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
301 handle.reset(config());
302 ChannelPipeline pipeline = pipeline();
303 do {
304 Object received = inboundBuffer.poll();
305 if (received == null) {
306 break;
307 }
308 if (received instanceof ByteBuf && inboundBuffer.peek() instanceof ByteBuf) {
309 ByteBuf msg = (ByteBuf) received;
310 ByteBuf output = handle.allocate(alloc());
311 if (msg.readableBytes() < output.writableBytes()) {
312
313 output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
314 msg.release();
315 while ((received = inboundBuffer.peek()) instanceof ByteBuf &&
316 ((ByteBuf) received).readableBytes() < output.writableBytes()) {
317 inboundBuffer.poll();
318 msg = (ByteBuf) received;
319 output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
320 msg.release();
321 }
322 handle.lastBytesRead(output.readableBytes());
323 received = output;
324 } else {
325
326 handle.lastBytesRead(output.capacity());
327 output.release();
328 }
329 }
330 handle.incMessagesRead(1);
331 pipeline.fireChannelRead(received);
332 } while (handle.continueReading());
333 handle.readComplete();
334 pipeline.fireChannelReadComplete();
335 }
336
337 @Override
338 protected void doBeginRead() throws Exception {
339 if (readInProgress) {
340 return;
341 }
342
343 Queue<Object> inboundBuffer = this.inboundBuffer;
344 if (inboundBuffer.isEmpty()) {
345 readInProgress = true;
346 return;
347 }
348
349 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
350 final int stackDepth = threadLocals.localChannelReaderStackDepth();
351 if (stackDepth < MAX_READER_STACK_DEPTH) {
352 threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
353 try {
354 readInbound();
355 } finally {
356 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
357 }
358 } else {
359 try {
360 eventLoop().execute(readTask);
361 } catch (Throwable cause) {
362 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
363 close();
364 peer.close();
365 PlatformDependent.throwException(cause);
366 }
367 }
368 }
369
370 @Override
371 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
372 switch (state) {
373 case OPEN:
374 case BOUND:
375 throw new NotYetConnectedException();
376 case CLOSED:
377 throw new ClosedChannelException();
378 case CONNECTED:
379 break;
380 }
381
382 final LocalChannel peer = this.peer;
383
384 writeInProgress = true;
385 try {
386 ClosedChannelException exception = null;
387 for (;;) {
388 Object msg = in.current();
389 if (msg == null) {
390 break;
391 }
392 try {
393
394
395 if (peer.state == State.CONNECTED) {
396 peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
397 in.remove();
398 } else {
399 if (exception == null) {
400 exception = new ClosedChannelException();
401 }
402 in.remove(exception);
403 }
404 } catch (Throwable cause) {
405 in.remove(cause);
406 }
407 }
408 } finally {
409
410
411
412
413
414 writeInProgress = false;
415 }
416
417 finishPeerRead(peer);
418 }
419
420 private void finishPeerRead(final LocalChannel peer) {
421
422 if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
423 finishPeerRead0(peer);
424 } else {
425 runFinishPeerReadTask(peer);
426 }
427 }
428
429 private void runFinishTask0() {
430
431
432 if (writeInProgress) {
433 finishReadFuture = eventLoop().submit(finishReadTask);
434 } else {
435 eventLoop().execute(finishReadTask);
436 }
437 }
438
439 private void runFinishPeerReadTask(final LocalChannel peer) {
440 try {
441 peer.runFinishTask0();
442 } catch (Throwable cause) {
443 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
444 close();
445 peer.close();
446 PlatformDependent.throwException(cause);
447 }
448 }
449
450 private void releaseInboundBuffers() {
451 assert eventLoop() == null || eventLoop().inEventLoop();
452 readInProgress = false;
453 Queue<Object> inboundBuffer = this.inboundBuffer;
454 Object msg;
455 while ((msg = inboundBuffer.poll()) != null) {
456 ReferenceCountUtil.release(msg);
457 }
458 }
459
460 private void finishPeerRead0(LocalChannel peer) {
461 Future<?> peerFinishReadFuture = peer.finishReadFuture;
462 if (peerFinishReadFuture != null) {
463 if (!peerFinishReadFuture.isDone()) {
464 runFinishPeerReadTask(peer);
465 return;
466 } else {
467
468 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
469 }
470 }
471
472
473 if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
474 peer.readInProgress = false;
475 peer.readInbound();
476 }
477 }
478
479 private class LocalUnsafe extends AbstractUnsafe implements LocalIoHandle {
480
481 @Override
482 public void close() {
483 close(voidPromise());
484 }
485
486 @Override
487 public void handle(IoRegistration registration, IoEvent event) {
488
489 }
490
491 @Override
492 public void registered() {
493
494
495
496
497
498 if (peer != null && parent() != null) {
499
500
501 final LocalChannel peer = LocalChannel.this.peer;
502 state = State.CONNECTED;
503
504 peer.remoteAddress = parent() == null ? null : parent().localAddress();
505 peer.state = State.CONNECTED;
506
507
508
509
510
511 peer.eventLoop().execute(new Runnable() {
512 @Override
513 public void run() {
514 ChannelPromise promise = peer.connectPromise;
515
516
517
518 if (promise != null && promise.trySuccess()) {
519 peer.pipeline().fireChannelActive();
520 }
521 }
522 });
523 }
524 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
525 }
526
527 @Override
528 public void unregistered() {
529
530 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
531 }
532
533 @Override
534 public void closeNow() {
535 close(voidPromise());
536 }
537
538 @Override
539 public void connect(final SocketAddress remoteAddress,
540 SocketAddress localAddress, final ChannelPromise promise) {
541 if (!promise.setUncancellable() || !ensureOpen(promise)) {
542 return;
543 }
544
545 if (state == State.CONNECTED) {
546 Exception cause = new AlreadyConnectedException();
547 safeSetFailure(promise, cause);
548 return;
549 }
550
551 if (connectPromise != null) {
552 throw new ConnectionPendingException();
553 }
554
555 connectPromise = promise;
556
557 if (state != State.BOUND) {
558
559 if (localAddress == null) {
560 localAddress = new LocalAddress(LocalChannel.this);
561 }
562 }
563
564 if (localAddress != null) {
565 try {
566 doBind(localAddress);
567 } catch (Throwable t) {
568 safeSetFailure(promise, t);
569 close(voidPromise());
570 return;
571 }
572 }
573
574 Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
575 if (!(boundChannel instanceof LocalServerChannel)) {
576 Exception cause = new ConnectException("connection refused: " + remoteAddress);
577 safeSetFailure(promise, cause);
578 close(voidPromise());
579 return;
580 }
581
582 LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
583 peer = serverChannel.serve(LocalChannel.this);
584 }
585 }
586 }