View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AbstractChannel;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultChannelConfig;
27  import io.netty.channel.EventLoop;
28  import io.netty.channel.PreferHeapByteBufAllocator;
29  import io.netty.channel.RecvByteBufAllocator;
30  import io.netty.channel.SingleThreadEventLoop;
31  import io.netty.util.ReferenceCountUtil;
32  import io.netty.util.concurrent.Future;
33  import io.netty.util.concurrent.SingleThreadEventExecutor;
34  import io.netty.util.internal.InternalThreadLocalMap;
35  import io.netty.util.internal.PlatformDependent;
36  import io.netty.util.internal.logging.InternalLogger;
37  import io.netty.util.internal.logging.InternalLoggerFactory;
38  
39  import java.net.ConnectException;
40  import java.net.SocketAddress;
41  import java.nio.channels.AlreadyConnectedException;
42  import java.nio.channels.ClosedChannelException;
43  import java.nio.channels.ConnectionPendingException;
44  import java.nio.channels.NotYetConnectedException;
45  import java.util.Queue;
46  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
47  
48  /**
49   * A {@link Channel} for the local transport.
50   */
51  public class LocalChannel extends AbstractChannel {
52      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
53      @SuppressWarnings({ "rawtypes" })
54      private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
55              AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
56      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57      private static final int MAX_READER_STACK_DEPTH = 8;
58  
59      private enum State { OPEN, BOUND, CONNECTED, CLOSED }
60  
61      private final ChannelConfig config = new DefaultChannelConfig(this);
62      // To further optimize this we could write our own SPSC queue.
63      final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
64      private final Runnable readTask = new Runnable() {
65          @Override
66          public void run() {
67              // Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete()
68              if (!inboundBuffer.isEmpty()) {
69                  readInbound();
70              }
71          }
72      };
73  
74      private final Runnable shutdownHook = new Runnable() {
75          @Override
76          public void run() {
77              unsafe().close(unsafe().voidPromise());
78          }
79      };
80  
81      private volatile State state;
82      private volatile LocalChannel peer;
83      private volatile LocalAddress localAddress;
84      private volatile LocalAddress remoteAddress;
85      private volatile ChannelPromise connectPromise;
86      private volatile boolean readInProgress;
87      private volatile boolean writeInProgress;
88      private volatile Future<?> finishReadFuture;
89  
90      public LocalChannel() {
91          super(null);
92          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
93      }
94  
95      protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
96          super(parent);
97          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
98          this.peer = peer;
99          localAddress = parent.localAddress();
100         remoteAddress = peer.localAddress();
101     }
102 
103     @Override
104     public ChannelMetadata metadata() {
105         return METADATA;
106     }
107 
108     @Override
109     public ChannelConfig config() {
110         return config;
111     }
112 
113     @Override
114     public LocalServerChannel parent() {
115         return (LocalServerChannel) super.parent();
116     }
117 
118     @Override
119     public LocalAddress localAddress() {
120         return (LocalAddress) super.localAddress();
121     }
122 
123     @Override
124     public LocalAddress remoteAddress() {
125         return (LocalAddress) super.remoteAddress();
126     }
127 
128     @Override
129     public boolean isOpen() {
130         return state != State.CLOSED;
131     }
132 
133     @Override
134     public boolean isActive() {
135         return state == State.CONNECTED;
136     }
137 
138     @Override
139     protected AbstractUnsafe newUnsafe() {
140         return new LocalUnsafe();
141     }
142 
143     @Override
144     protected boolean isCompatible(EventLoop loop) {
145         return loop instanceof SingleThreadEventLoop;
146     }
147 
148     @Override
149     protected SocketAddress localAddress0() {
150         return localAddress;
151     }
152 
153     @Override
154     protected SocketAddress remoteAddress0() {
155         return remoteAddress;
156     }
157 
158     @Override
159     protected void doRegister() throws Exception {
160         // Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
161         // This is needed as a peer may not be null also if a LocalChannel was connected before and
162         // deregistered / registered later again.
163         //
164         // See https://github.com/netty/netty/issues/2400
165         if (peer != null && parent() != null) {
166             // Store the peer in a local variable as it may be set to null if doClose() is called.
167             // See https://github.com/netty/netty/issues/2144
168             final LocalChannel peer = this.peer;
169             state = State.CONNECTED;
170 
171             peer.remoteAddress = parent() == null ? null : parent().localAddress();
172             peer.state = State.CONNECTED;
173 
174             // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
175             // This ensures that if both channels are on the same event loop, the peer's channelActive
176             // event is triggered *after* this channel's channelRegistered event, so that this channel's
177             // pipeline is fully initialized by ChannelInitializer before any channelRead events.
178             peer.eventLoop().execute(new Runnable() {
179                 @Override
180                 public void run() {
181                     ChannelPromise promise = peer.connectPromise;
182 
183                     // Only trigger fireChannelActive() if the promise was not null and was not completed yet.
184                     // connectPromise may be set to null if doClose() was called in the meantime.
185                     if (promise != null && promise.trySuccess()) {
186                         peer.pipeline().fireChannelActive();
187                     }
188                 }
189             });
190         }
191         ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
192     }
193 
194     @Override
195     protected void doBind(SocketAddress localAddress) throws Exception {
196         this.localAddress =
197                 LocalChannelRegistry.register(this, this.localAddress,
198                         localAddress);
199         state = State.BOUND;
200     }
201 
202     @Override
203     protected void doDisconnect() throws Exception {
204         doClose();
205     }
206 
207     @Override
208     protected void doClose() throws Exception {
209         final LocalChannel peer = this.peer;
210         State oldState = state;
211         try {
212             if (oldState != State.CLOSED) {
213                 // Update all internal state before the closeFuture is notified.
214                 if (localAddress != null) {
215                     if (parent() == null) {
216                         LocalChannelRegistry.unregister(localAddress);
217                     }
218                     localAddress = null;
219                 }
220 
221                 // State change must happen before finishPeerRead to ensure writes are released either in doWrite or
222                 // channelRead.
223                 state = State.CLOSED;
224 
225                 // Preserve order of event and force a read operation now before the close operation is processed.
226                 if (writeInProgress && peer != null) {
227                     finishPeerRead(peer);
228                 }
229 
230                 ChannelPromise promise = connectPromise;
231                 if (promise != null) {
232                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
233                     promise.tryFailure(new ClosedChannelException());
234                     connectPromise = null;
235                 }
236             }
237 
238             if (peer != null) {
239                 this.peer = null;
240                 // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
241                 // This ensures that if both channels are on the same event loop, the peer's channelInActive
242                 // event is triggered *after* this peer's channelInActive event
243                 EventLoop peerEventLoop = peer.eventLoop();
244                 final boolean peerIsActive = peer.isActive();
245                 try {
246                     peerEventLoop.execute(new Runnable() {
247                         @Override
248                         public void run() {
249                             peer.tryClose(peerIsActive);
250                         }
251                     });
252                 } catch (Throwable cause) {
253                     logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
254                             this, peer, cause);
255                     if (peerEventLoop.inEventLoop()) {
256                         peer.releaseInboundBuffers();
257                     } else {
258                         // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
259                         // rejects the close Runnable but give a best effort.
260                         peer.close();
261                     }
262                     PlatformDependent.throwException(cause);
263                 }
264             }
265         } finally {
266             // Release all buffers if the Channel was already registered in the past and if it was not closed before.
267             if (oldState != null && oldState != State.CLOSED) {
268                 // We need to release all the buffers that may be put into our inbound queue since we closed the Channel
269                 // to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
270                 // means even if the promise was notified before its not really guaranteed that the "remote peer" will
271                 // see the buffer at all.
272                 releaseInboundBuffers();
273             }
274         }
275     }
276 
277     private void tryClose(boolean isActive) {
278         if (isActive) {
279             unsafe().close(unsafe().voidPromise());
280         } else {
281             releaseInboundBuffers();
282         }
283     }
284 
285     @Override
286     protected void doDeregister() throws Exception {
287         // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
288         ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
289     }
290 
291     private void readInbound() {
292         RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
293         handle.reset(config());
294         ChannelPipeline pipeline = pipeline();
295         do {
296             Object received = inboundBuffer.poll();
297             if (received == null) {
298                 break;
299             }
300             if (received instanceof ByteBuf && inboundBuffer.peek() instanceof ByteBuf) {
301                 ByteBuf msg = (ByteBuf) received;
302                 ByteBuf output = handle.allocate(alloc());
303                 if (msg.readableBytes() < output.writableBytes()) {
304                     // We have an opportunity to coalesce buffers.
305                     output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
306                     msg.release();
307                     while ((received = inboundBuffer.peek()) instanceof ByteBuf &&
308                             ((ByteBuf) received).readableBytes() < output.writableBytes()) {
309                         inboundBuffer.poll();
310                         msg = (ByteBuf) received;
311                         output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
312                         msg.release();
313                     }
314                     handle.lastBytesRead(output.readableBytes());
315                     received = output; // Send the coalesced buffer down the pipeline.
316                 } else {
317                     // It won't be profitable to coalesce buffers this time around.
318                     handle.lastBytesRead(output.capacity());
319                     output.release();
320                 }
321             }
322             handle.incMessagesRead(1);
323             pipeline.fireChannelRead(received);
324         } while (handle.continueReading());
325         handle.readComplete();
326         pipeline.fireChannelReadComplete();
327     }
328 
329     @Override
330     protected void doBeginRead() throws Exception {
331         if (readInProgress) {
332             return;
333         }
334 
335         Queue<Object> inboundBuffer = this.inboundBuffer;
336         if (inboundBuffer.isEmpty()) {
337             readInProgress = true;
338             return;
339         }
340 
341         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
342         final int stackDepth = threadLocals.localChannelReaderStackDepth();
343         if (stackDepth < MAX_READER_STACK_DEPTH) {
344             threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
345             try {
346                 readInbound();
347             } finally {
348                 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
349             }
350         } else {
351             try {
352                 eventLoop().execute(readTask);
353             } catch (Throwable cause) {
354                 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
355                 close();
356                 peer.close();
357                 PlatformDependent.throwException(cause);
358             }
359         }
360     }
361 
362     @Override
363     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
364         switch (state) {
365         case OPEN:
366         case BOUND:
367             throw new NotYetConnectedException();
368         case CLOSED:
369             throw new ClosedChannelException();
370         case CONNECTED:
371             break;
372         }
373 
374         final LocalChannel peer = this.peer;
375 
376         writeInProgress = true;
377         try {
378             ClosedChannelException exception = null;
379             for (;;) {
380                 Object msg = in.current();
381                 if (msg == null) {
382                     break;
383                 }
384                 try {
385                     // It is possible the peer could have closed while we are writing, and in this case we should
386                     // simulate real socket behavior and ensure the write operation is failed.
387                     if (peer.state == State.CONNECTED) {
388                         peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
389                         in.remove();
390                     } else {
391                         if (exception == null) {
392                             exception = new ClosedChannelException();
393                         }
394                         in.remove(exception);
395                     }
396                 } catch (Throwable cause) {
397                     in.remove(cause);
398                 }
399             }
400         } finally {
401             // The following situation may cause trouble:
402             // 1. Write (with promise X)
403             // 2. promise X is completed when in.remove() is called, and a listener on this promise calls close()
404             // 3. Then the close event will be executed for the peer before the write events, when the write events
405             // actually happened before the close event.
406             writeInProgress = false;
407         }
408 
409         finishPeerRead(peer);
410     }
411 
412     private void finishPeerRead(final LocalChannel peer) {
413         // If the peer is also writing, then we must schedule the event on the event loop to preserve read order.
414         if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
415             finishPeerRead0(peer);
416         } else {
417             runFinishPeerReadTask(peer);
418         }
419     }
420 
421     private void runFinishPeerReadTask(final LocalChannel peer) {
422         // If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
423         // we keep track of the task, and coordinate later that our read can't happen until the peer is done.
424         final Runnable finishPeerReadTask = new Runnable() {
425             @Override
426             public void run() {
427                 finishPeerRead0(peer);
428             }
429         };
430         try {
431             if (peer.writeInProgress) {
432                 peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
433             } else {
434                 peer.eventLoop().execute(finishPeerReadTask);
435             }
436         } catch (Throwable cause) {
437             logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
438             close();
439             peer.close();
440             PlatformDependent.throwException(cause);
441         }
442     }
443 
444     private void releaseInboundBuffers() {
445         assert eventLoop() == null || eventLoop().inEventLoop();
446         readInProgress = false;
447         Queue<Object> inboundBuffer = this.inboundBuffer;
448         Object msg;
449         while ((msg = inboundBuffer.poll()) != null) {
450             ReferenceCountUtil.release(msg);
451         }
452     }
453 
454     private void finishPeerRead0(LocalChannel peer) {
455         Future<?> peerFinishReadFuture = peer.finishReadFuture;
456         if (peerFinishReadFuture != null) {
457             if (!peerFinishReadFuture.isDone()) {
458                 runFinishPeerReadTask(peer);
459                 return;
460             } else {
461                 // Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
462                 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
463             }
464         }
465         // We should only set readInProgress to false if there is any data that was read as otherwise we may miss to
466         // forward data later on.
467         if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
468             peer.readInProgress = false;
469             peer.readInbound();
470         }
471     }
472 
473     private class LocalUnsafe extends AbstractUnsafe {
474 
475         @Override
476         public void connect(final SocketAddress remoteAddress,
477                 SocketAddress localAddress, final ChannelPromise promise) {
478             if (!promise.setUncancellable() || !ensureOpen(promise)) {
479                 return;
480             }
481 
482             if (state == State.CONNECTED) {
483                 Exception cause = new AlreadyConnectedException();
484                 safeSetFailure(promise, cause);
485                 pipeline().fireExceptionCaught(cause);
486                 return;
487             }
488 
489             if (connectPromise != null) {
490                 throw new ConnectionPendingException();
491             }
492 
493             connectPromise = promise;
494 
495             if (state != State.BOUND) {
496                 // Not bound yet and no localAddress specified - get one.
497                 if (localAddress == null) {
498                     localAddress = new LocalAddress(LocalChannel.this);
499                 }
500             }
501 
502             if (localAddress != null) {
503                 try {
504                     doBind(localAddress);
505                 } catch (Throwable t) {
506                     safeSetFailure(promise, t);
507                     close(voidPromise());
508                     return;
509                 }
510             }
511 
512             Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
513             if (!(boundChannel instanceof LocalServerChannel)) {
514                 Exception cause = new ConnectException("connection refused: " + remoteAddress);
515                 safeSetFailure(promise, cause);
516                 close(voidPromise());
517                 return;
518             }
519 
520             LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
521             peer = serverChannel.serve(LocalChannel.this);
522         }
523     }
524 }