View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AbstractChannel;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultChannelConfig;
27  import io.netty.channel.EventLoop;
28  import io.netty.channel.IoEvent;
29  import io.netty.channel.IoEventLoop;
30  import io.netty.channel.IoRegistration;
31  import io.netty.channel.PreferHeapByteBufAllocator;
32  import io.netty.channel.RecvByteBufAllocator;
33  import io.netty.channel.SingleThreadEventLoop;
34  import io.netty.util.ReferenceCountUtil;
35  import io.netty.util.concurrent.Future;
36  import io.netty.util.concurrent.SingleThreadEventExecutor;
37  import io.netty.util.internal.InternalThreadLocalMap;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.net.ConnectException;
43  import java.net.SocketAddress;
44  import java.nio.channels.AlreadyConnectedException;
45  import java.nio.channels.ClosedChannelException;
46  import java.nio.channels.ConnectionPendingException;
47  import java.nio.channels.NotYetConnectedException;
48  import java.util.Queue;
49  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50  
51  /**
52   * A {@link Channel} for the local transport.
53   */
54  public class LocalChannel extends AbstractChannel {
55      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
56      @SuppressWarnings({ "rawtypes" })
57      private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
58              AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
59      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
60      private static final int MAX_READER_STACK_DEPTH = 8;
61  
62      private enum State { OPEN, BOUND, CONNECTED, CLOSED }
63  
64      private final ChannelConfig config = new DefaultChannelConfig(this);
65      // To further optimize this we could write our own SPSC queue.
66      final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
67      private final Runnable readTask = new Runnable() {
68          @Override
69          public void run() {
70              // Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete()
71              if (!inboundBuffer.isEmpty()) {
72                  readInbound();
73              }
74          }
75      };
76  
77      private final Runnable shutdownHook = new Runnable() {
78          @Override
79          public void run() {
80              unsafe().close(unsafe().voidPromise());
81          }
82      };
83  
84      private IoRegistration registration;
85  
86      private volatile State state;
87      private volatile LocalChannel peer;
88      private volatile LocalAddress localAddress;
89      private volatile LocalAddress remoteAddress;
90      private volatile ChannelPromise connectPromise;
91      private volatile boolean readInProgress;
92      private volatile boolean writeInProgress;
93      private volatile Future<?> finishReadFuture;
94  
95      public LocalChannel() {
96          super(null);
97          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
98      }
99  
100     protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
101         super(parent);
102         config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
103         this.peer = peer;
104         localAddress = parent.localAddress();
105         remoteAddress = peer.localAddress();
106     }
107 
108     @Override
109     public ChannelMetadata metadata() {
110         return METADATA;
111     }
112 
113     @Override
114     public ChannelConfig config() {
115         return config;
116     }
117 
118     @Override
119     public LocalServerChannel parent() {
120         return (LocalServerChannel) super.parent();
121     }
122 
123     @Override
124     public LocalAddress localAddress() {
125         return (LocalAddress) super.localAddress();
126     }
127 
128     @Override
129     public LocalAddress remoteAddress() {
130         return (LocalAddress) super.remoteAddress();
131     }
132 
133     @Override
134     public boolean isOpen() {
135         return state != State.CLOSED;
136     }
137 
138     @Override
139     public boolean isActive() {
140         return state == State.CONNECTED;
141     }
142 
143     @Override
144     protected AbstractUnsafe newUnsafe() {
145         return new LocalUnsafe();
146     }
147 
148     @Override
149     protected boolean isCompatible(EventLoop loop) {
150         return loop instanceof SingleThreadEventLoop ||
151                 (loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(LocalUnsafe.class));
152     }
153 
154     @Override
155     protected SocketAddress localAddress0() {
156         return localAddress;
157     }
158 
159     @Override
160     protected SocketAddress remoteAddress0() {
161         return remoteAddress;
162     }
163 
164     @Override
165     protected void doRegister(ChannelPromise promise) {
166         EventLoop loop = eventLoop();
167         if (loop instanceof IoEventLoop) {
168             assert registration == null;
169             ((IoEventLoop) loop).register((LocalUnsafe) unsafe()).addListener(f -> {
170                if (f.isSuccess()) {
171                    registration = (IoRegistration) f.getNow();
172                    promise.setSuccess();
173                } else {
174                    promise.setFailure(f.cause());
175                }
176             });
177         } else {
178             try {
179                 ((LocalUnsafe) unsafe()).registerNow();
180             } catch (Throwable cause) {
181                 promise.setFailure(cause);
182             }
183             promise.setSuccess();
184         }
185     }
186 
187     @Override
188     protected void doDeregister() throws Exception {
189         EventLoop loop = eventLoop();
190         if (loop instanceof IoEventLoop) {
191             IoRegistration registration = this.registration;
192             if (registration != null) {
193                 this.registration = null;
194                 registration.cancel();
195             }
196         } else {
197             ((LocalUnsafe) unsafe()).deregisterNow();
198         }
199     }
200 
201     @Override
202     protected void doBind(SocketAddress localAddress) throws Exception {
203         this.localAddress =
204                 LocalChannelRegistry.register(this, this.localAddress,
205                         localAddress);
206         state = State.BOUND;
207     }
208 
209     @Override
210     protected void doDisconnect() throws Exception {
211         doClose();
212     }
213 
214     @Override
215     protected void doClose() throws Exception {
216         final LocalChannel peer = this.peer;
217         State oldState = state;
218         try {
219             if (oldState != State.CLOSED) {
220                 // Update all internal state before the closeFuture is notified.
221                 if (localAddress != null) {
222                     if (parent() == null) {
223                         LocalChannelRegistry.unregister(localAddress);
224                     }
225                     localAddress = null;
226                 }
227 
228                 // State change must happen before finishPeerRead to ensure writes are released either in doWrite or
229                 // channelRead.
230                 state = State.CLOSED;
231 
232                 // Preserve order of event and force a read operation now before the close operation is processed.
233                 if (writeInProgress && peer != null) {
234                     finishPeerRead(peer);
235                 }
236 
237                 ChannelPromise promise = connectPromise;
238                 if (promise != null) {
239                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
240                     promise.tryFailure(new ClosedChannelException());
241                     connectPromise = null;
242                 }
243             }
244 
245             if (peer != null) {
246                 this.peer = null;
247                 // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
248                 // This ensures that if both channels are on the same event loop, the peer's channelInActive
249                 // event is triggered *after* this peer's channelInActive event
250                 EventLoop peerEventLoop = peer.eventLoop();
251                 final boolean peerIsActive = peer.isActive();
252                 try {
253                     peerEventLoop.execute(new Runnable() {
254                         @Override
255                         public void run() {
256                             peer.tryClose(peerIsActive);
257                         }
258                     });
259                 } catch (Throwable cause) {
260                     logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
261                             this, peer, cause);
262                     if (peerEventLoop.inEventLoop()) {
263                         peer.releaseInboundBuffers();
264                     } else {
265                         // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
266                         // rejects the close Runnable but give a best effort.
267                         peer.close();
268                     }
269                     PlatformDependent.throwException(cause);
270                 }
271             }
272         } finally {
273             // Release all buffers if the Channel was already registered in the past and if it was not closed before.
274             if (oldState != null && oldState != State.CLOSED) {
275                 // We need to release all the buffers that may be put into our inbound queue since we closed the Channel
276                 // to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
277                 // means even if the promise was notified before its not really guaranteed that the "remote peer" will
278                 // see the buffer at all.
279                 releaseInboundBuffers();
280             }
281         }
282     }
283 
284     private void tryClose(boolean isActive) {
285         if (isActive) {
286             unsafe().close(unsafe().voidPromise());
287         } else {
288             releaseInboundBuffers();
289         }
290     }
291 
292     private void readInbound() {
293         RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
294         handle.reset(config());
295         ChannelPipeline pipeline = pipeline();
296         do {
297             Object received = inboundBuffer.poll();
298             if (received == null) {
299                 break;
300             }
301             if (received instanceof ByteBuf && inboundBuffer.peek() instanceof ByteBuf) {
302                 ByteBuf msg = (ByteBuf) received;
303                 ByteBuf output = handle.allocate(alloc());
304                 if (msg.readableBytes() < output.writableBytes()) {
305                     // We have an opportunity to coalesce buffers.
306                     output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
307                     msg.release();
308                     while ((received = inboundBuffer.peek()) instanceof ByteBuf &&
309                             ((ByteBuf) received).readableBytes() < output.writableBytes()) {
310                         inboundBuffer.poll();
311                         msg = (ByteBuf) received;
312                         output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
313                         msg.release();
314                     }
315                     handle.lastBytesRead(output.readableBytes());
316                     received = output; // Send the coalesced buffer down the pipeline.
317                 } else {
318                     // It won't be profitable to coalesce buffers this time around.
319                     handle.lastBytesRead(output.capacity());
320                     output.release();
321                 }
322             }
323             handle.incMessagesRead(1);
324             pipeline.fireChannelRead(received);
325         } while (handle.continueReading());
326         handle.readComplete();
327         pipeline.fireChannelReadComplete();
328     }
329 
330     @Override
331     protected void doBeginRead() throws Exception {
332         if (readInProgress) {
333             return;
334         }
335 
336         Queue<Object> inboundBuffer = this.inboundBuffer;
337         if (inboundBuffer.isEmpty()) {
338             readInProgress = true;
339             return;
340         }
341 
342         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
343         final int stackDepth = threadLocals.localChannelReaderStackDepth();
344         if (stackDepth < MAX_READER_STACK_DEPTH) {
345             threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
346             try {
347                 readInbound();
348             } finally {
349                 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
350             }
351         } else {
352             try {
353                 eventLoop().execute(readTask);
354             } catch (Throwable cause) {
355                 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
356                 close();
357                 peer.close();
358                 PlatformDependent.throwException(cause);
359             }
360         }
361     }
362 
363     @Override
364     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
365         switch (state) {
366         case OPEN:
367         case BOUND:
368             throw new NotYetConnectedException();
369         case CLOSED:
370             throw new ClosedChannelException();
371         case CONNECTED:
372             break;
373         }
374 
375         final LocalChannel peer = this.peer;
376 
377         writeInProgress = true;
378         try {
379             ClosedChannelException exception = null;
380             for (;;) {
381                 Object msg = in.current();
382                 if (msg == null) {
383                     break;
384                 }
385                 try {
386                     // It is possible the peer could have closed while we are writing, and in this case we should
387                     // simulate real socket behavior and ensure the write operation is failed.
388                     if (peer.state == State.CONNECTED) {
389                         peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
390                         in.remove();
391                     } else {
392                         if (exception == null) {
393                             exception = new ClosedChannelException();
394                         }
395                         in.remove(exception);
396                     }
397                 } catch (Throwable cause) {
398                     in.remove(cause);
399                 }
400             }
401         } finally {
402             // The following situation may cause trouble:
403             // 1. Write (with promise X)
404             // 2. promise X is completed when in.remove() is called, and a listener on this promise calls close()
405             // 3. Then the close event will be executed for the peer before the write events, when the write events
406             // actually happened before the close event.
407             writeInProgress = false;
408         }
409 
410         finishPeerRead(peer);
411     }
412 
413     private void finishPeerRead(final LocalChannel peer) {
414         // If the peer is also writing, then we must schedule the event on the event loop to preserve read order.
415         if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
416             finishPeerRead0(peer);
417         } else {
418             runFinishPeerReadTask(peer);
419         }
420     }
421 
422     private void runFinishPeerReadTask(final LocalChannel peer) {
423         // If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
424         // we keep track of the task, and coordinate later that our read can't happen until the peer is done.
425         final Runnable finishPeerReadTask = new Runnable() {
426             @Override
427             public void run() {
428                 finishPeerRead0(peer);
429             }
430         };
431         try {
432             if (peer.writeInProgress) {
433                 peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
434             } else {
435                 peer.eventLoop().execute(finishPeerReadTask);
436             }
437         } catch (Throwable cause) {
438             logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
439             close();
440             peer.close();
441             PlatformDependent.throwException(cause);
442         }
443     }
444 
445     private void releaseInboundBuffers() {
446         assert eventLoop() == null || eventLoop().inEventLoop();
447         readInProgress = false;
448         Queue<Object> inboundBuffer = this.inboundBuffer;
449         Object msg;
450         while ((msg = inboundBuffer.poll()) != null) {
451             ReferenceCountUtil.release(msg);
452         }
453     }
454 
455     private void finishPeerRead0(LocalChannel peer) {
456         Future<?> peerFinishReadFuture = peer.finishReadFuture;
457         if (peerFinishReadFuture != null) {
458             if (!peerFinishReadFuture.isDone()) {
459                 runFinishPeerReadTask(peer);
460                 return;
461             } else {
462                 // Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
463                 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
464             }
465         }
466         // We should only set readInProgress to false if there is any data that was read as otherwise we may miss to
467         // forward data later on.
468         if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
469             peer.readInProgress = false;
470             peer.readInbound();
471         }
472     }
473 
474     private class LocalUnsafe extends AbstractUnsafe implements LocalIoHandle {
475 
476         @Override
477         public void close() {
478             close(voidPromise());
479         }
480 
481         @Override
482         public void handle(IoRegistration registration, IoEvent event) {
483             // NOOP
484         }
485 
486         @Override
487         public void registerNow() {
488             // Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
489             // This is needed as a peer may not be null also if a LocalChannel was connected before and
490             // deregistered / registered later again.
491             //
492             // See https://github.com/netty/netty/issues/2400
493             if (peer != null && parent() != null) {
494                 // Store the peer in a local variable as it may be set to null if doClose() is called.
495                 // See https://github.com/netty/netty/issues/2144
496                 final LocalChannel peer = LocalChannel.this.peer;
497                 state = State.CONNECTED;
498 
499                 peer.remoteAddress = parent() == null ? null : parent().localAddress();
500                 peer.state = State.CONNECTED;
501 
502                 // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
503                 // This ensures that if both channels are on the same event loop, the peer's channelActive
504                 // event is triggered *after* this channel's channelRegistered event, so that this channel's
505                 // pipeline is fully initialized by ChannelInitializer before any channelRead events.
506                 peer.eventLoop().execute(new Runnable() {
507                     @Override
508                     public void run() {
509                         ChannelPromise promise = peer.connectPromise;
510 
511                         // Only trigger fireChannelActive() if the promise was not null and was not completed yet.
512                         // connectPromise may be set to null if doClose() was called in the meantime.
513                         if (promise != null && promise.trySuccess()) {
514                             peer.pipeline().fireChannelActive();
515                         }
516                     }
517                 });
518             }
519             ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
520         }
521 
522         @Override
523         public void deregisterNow() {
524             // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
525             ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
526         }
527 
528         @Override
529         public void closeNow() {
530             close(voidPromise());
531         }
532 
533         @Override
534         public void connect(final SocketAddress remoteAddress,
535                 SocketAddress localAddress, final ChannelPromise promise) {
536             if (!promise.setUncancellable() || !ensureOpen(promise)) {
537                 return;
538             }
539 
540             if (state == State.CONNECTED) {
541                 Exception cause = new AlreadyConnectedException();
542                 safeSetFailure(promise, cause);
543                 pipeline().fireExceptionCaught(cause);
544                 return;
545             }
546 
547             if (connectPromise != null) {
548                 throw new ConnectionPendingException();
549             }
550 
551             connectPromise = promise;
552 
553             if (state != State.BOUND) {
554                 // Not bound yet and no localAddress specified - get one.
555                 if (localAddress == null) {
556                     localAddress = new LocalAddress(LocalChannel.this);
557                 }
558             }
559 
560             if (localAddress != null) {
561                 try {
562                     doBind(localAddress);
563                 } catch (Throwable t) {
564                     safeSetFailure(promise, t);
565                     close(voidPromise());
566                     return;
567                 }
568             }
569 
570             Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
571             if (!(boundChannel instanceof LocalServerChannel)) {
572                 Exception cause = new ConnectException("connection refused: " + remoteAddress);
573                 safeSetFailure(promise, cause);
574                 close(voidPromise());
575                 return;
576             }
577 
578             LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
579             peer = serverChannel.serve(LocalChannel.this);
580         }
581     }
582 }