View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AbstractChannel;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultChannelConfig;
27  import io.netty.channel.EventLoop;
28  import io.netty.channel.PreferHeapByteBufAllocator;
29  import io.netty.channel.RecvByteBufAllocator;
30  import io.netty.channel.SingleThreadEventLoop;
31  import io.netty.util.ReferenceCountUtil;
32  import io.netty.util.concurrent.Future;
33  import io.netty.util.concurrent.SingleThreadEventExecutor;
34  import io.netty.util.internal.InternalThreadLocalMap;
35  import io.netty.util.internal.PlatformDependent;
36  import io.netty.util.internal.logging.InternalLogger;
37  import io.netty.util.internal.logging.InternalLoggerFactory;
38  
39  import java.net.ConnectException;
40  import java.net.SocketAddress;
41  import java.nio.channels.AlreadyConnectedException;
42  import java.nio.channels.ClosedChannelException;
43  import java.nio.channels.ConnectionPendingException;
44  import java.nio.channels.NotYetConnectedException;
45  import java.util.Queue;
46  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
47  
48  /**
49   * A {@link Channel} for the local transport.
50   */
51  public class LocalChannel extends AbstractChannel {
52      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
53      @SuppressWarnings({ "rawtypes" })
54      private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
55              AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
56      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57      private static final int MAX_READER_STACK_DEPTH = 8;
58  
59      private enum State { OPEN, BOUND, CONNECTED, CLOSED }
60  
61      private final ChannelConfig config = new DefaultChannelConfig(this);
62      // To further optimize this we could write our own SPSC queue.
63      final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
64      private final Runnable readTask = new Runnable() {
65          @Override
66          public void run() {
67              // Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete()
68              if (!inboundBuffer.isEmpty()) {
69                  readInbound();
70              }
71          }
72      };
73  
74      private final Runnable shutdownHook = new Runnable() {
75          @Override
76          public void run() {
77              unsafe().close(unsafe().voidPromise());
78          }
79      };
80  
81      private final Runnable finishReadTask = new Runnable() {
82          @Override
83          public void run() {
84              finishPeerRead0(LocalChannel.this);
85          }
86      };
87  
88      private volatile State state;
89      private volatile LocalChannel peer;
90      private volatile LocalAddress localAddress;
91      private volatile LocalAddress remoteAddress;
92      private volatile ChannelPromise connectPromise;
93      private volatile boolean readInProgress;
94      private volatile boolean writeInProgress;
95      private volatile Future<?> finishReadFuture;
96  
97      public LocalChannel() {
98          super(null);
99          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
100     }
101 
102     protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
103         super(parent);
104         config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
105         this.peer = peer;
106         localAddress = parent.localAddress();
107         remoteAddress = peer.localAddress();
108     }
109 
110     @Override
111     public ChannelMetadata metadata() {
112         return METADATA;
113     }
114 
115     @Override
116     public ChannelConfig config() {
117         return config;
118     }
119 
120     @Override
121     public LocalServerChannel parent() {
122         return (LocalServerChannel) super.parent();
123     }
124 
125     @Override
126     public LocalAddress localAddress() {
127         return (LocalAddress) super.localAddress();
128     }
129 
130     @Override
131     public LocalAddress remoteAddress() {
132         return (LocalAddress) super.remoteAddress();
133     }
134 
135     @Override
136     public boolean isOpen() {
137         return state != State.CLOSED;
138     }
139 
140     @Override
141     public boolean isActive() {
142         return state == State.CONNECTED;
143     }
144 
145     @Override
146     protected AbstractUnsafe newUnsafe() {
147         return new LocalUnsafe();
148     }
149 
150     @Override
151     protected boolean isCompatible(EventLoop loop) {
152         return loop instanceof SingleThreadEventLoop;
153     }
154 
155     @Override
156     protected SocketAddress localAddress0() {
157         return localAddress;
158     }
159 
160     @Override
161     protected SocketAddress remoteAddress0() {
162         return remoteAddress;
163     }
164 
165     @Override
166     protected void doRegister() throws Exception {
167         // Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
168         // This is needed as a peer may not be null also if a LocalChannel was connected before and
169         // deregistered / registered later again.
170         //
171         // See https://github.com/netty/netty/issues/2400
172         if (peer != null && parent() != null) {
173             // Store the peer in a local variable as it may be set to null if doClose() is called.
174             // See https://github.com/netty/netty/issues/2144
175             final LocalChannel peer = this.peer;
176             state = State.CONNECTED;
177 
178             peer.remoteAddress = parent() == null ? null : parent().localAddress();
179             peer.state = State.CONNECTED;
180 
181             // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
182             // This ensures that if both channels are on the same event loop, the peer's channelActive
183             // event is triggered *after* this channel's channelRegistered event, so that this channel's
184             // pipeline is fully initialized by ChannelInitializer before any channelRead events.
185             peer.eventLoop().execute(new Runnable() {
186                 @Override
187                 public void run() {
188                     ChannelPromise promise = peer.connectPromise;
189 
190                     // Only trigger fireChannelActive() if the promise was not null and was not completed yet.
191                     // connectPromise may be set to null if doClose() was called in the meantime.
192                     if (promise != null && promise.trySuccess()) {
193                         peer.pipeline().fireChannelActive();
194                     }
195                 }
196             });
197         }
198         ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
199     }
200 
201     @Override
202     protected void doBind(SocketAddress localAddress) throws Exception {
203         this.localAddress =
204                 LocalChannelRegistry.register(this, this.localAddress,
205                         localAddress);
206         state = State.BOUND;
207     }
208 
209     @Override
210     protected void doDisconnect() throws Exception {
211         doClose();
212     }
213 
214     @Override
215     protected void doClose() throws Exception {
216         final LocalChannel peer = this.peer;
217         State oldState = state;
218         try {
219             if (oldState != State.CLOSED) {
220                 // Update all internal state before the closeFuture is notified.
221                 if (localAddress != null) {
222                     if (parent() == null) {
223                         LocalChannelRegistry.unregister(localAddress);
224                     }
225                     localAddress = null;
226                 }
227 
228                 // State change must happen before finishPeerRead to ensure writes are released either in doWrite or
229                 // channelRead.
230                 state = State.CLOSED;
231 
232                 // Preserve order of event and force a read operation now before the close operation is processed.
233                 if (writeInProgress && peer != null) {
234                     finishPeerRead(peer);
235                 }
236 
237                 ChannelPromise promise = connectPromise;
238                 if (promise != null) {
239                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
240                     promise.tryFailure(new ClosedChannelException());
241                     connectPromise = null;
242                 }
243             }
244 
245             if (peer != null) {
246                 this.peer = null;
247                 // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
248                 // This ensures that if both channels are on the same event loop, the peer's channelInActive
249                 // event is triggered *after* this peer's channelInActive event
250                 EventLoop peerEventLoop = peer.eventLoop();
251                 final boolean peerIsActive = peer.isActive();
252                 try {
253                     peerEventLoop.execute(new Runnable() {
254                         @Override
255                         public void run() {
256                             peer.tryClose(peerIsActive);
257                         }
258                     });
259                 } catch (Throwable cause) {
260                     logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
261                             this, peer, cause);
262                     if (peerEventLoop.inEventLoop()) {
263                         peer.releaseInboundBuffers();
264                     } else {
265                         // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
266                         // rejects the close Runnable but give a best effort.
267                         peer.close();
268                     }
269                     PlatformDependent.throwException(cause);
270                 }
271             }
272         } finally {
273             // Release all buffers if the Channel was already registered in the past and if it was not closed before.
274             if (oldState != null && oldState != State.CLOSED) {
275                 // We need to release all the buffers that may be put into our inbound queue since we closed the Channel
276                 // to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
277                 // means even if the promise was notified before its not really guaranteed that the "remote peer" will
278                 // see the buffer at all.
279                 releaseInboundBuffers();
280             }
281         }
282     }
283 
284     private void tryClose(boolean isActive) {
285         if (isActive) {
286             unsafe().close(unsafe().voidPromise());
287         } else {
288             releaseInboundBuffers();
289         }
290     }
291 
292     @Override
293     protected void doDeregister() throws Exception {
294         // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
295         ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
296     }
297 
298     private void readInbound() {
299         RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
300         handle.reset(config());
301         ChannelPipeline pipeline = pipeline();
302         do {
303             Object received = inboundBuffer.poll();
304             if (received == null) {
305                 break;
306             }
307             if (received instanceof ByteBuf && inboundBuffer.peek() instanceof ByteBuf) {
308                 ByteBuf msg = (ByteBuf) received;
309                 ByteBuf output = handle.allocate(alloc());
310                 if (msg.readableBytes() < output.writableBytes()) {
311                     // We have an opportunity to coalesce buffers.
312                     output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
313                     msg.release();
314                     while ((received = inboundBuffer.peek()) instanceof ByteBuf &&
315                             ((ByteBuf) received).readableBytes() < output.writableBytes()) {
316                         inboundBuffer.poll();
317                         msg = (ByteBuf) received;
318                         output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
319                         msg.release();
320                     }
321                     handle.lastBytesRead(output.readableBytes());
322                     received = output; // Send the coalesced buffer down the pipeline.
323                 } else {
324                     // It won't be profitable to coalesce buffers this time around.
325                     handle.lastBytesRead(output.capacity());
326                     output.release();
327                 }
328             }
329             handle.incMessagesRead(1);
330             pipeline.fireChannelRead(received);
331         } while (handle.continueReading());
332         handle.readComplete();
333         pipeline.fireChannelReadComplete();
334     }
335 
336     @Override
337     protected void doBeginRead() throws Exception {
338         if (readInProgress) {
339             return;
340         }
341 
342         Queue<Object> inboundBuffer = this.inboundBuffer;
343         if (inboundBuffer.isEmpty()) {
344             readInProgress = true;
345             return;
346         }
347 
348         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
349         final int stackDepth = threadLocals.localChannelReaderStackDepth();
350         if (stackDepth < MAX_READER_STACK_DEPTH) {
351             threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
352             try {
353                 readInbound();
354             } finally {
355                 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
356             }
357         } else {
358             try {
359                 eventLoop().execute(readTask);
360             } catch (Throwable cause) {
361                 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
362                 close();
363                 peer.close();
364                 PlatformDependent.throwException(cause);
365             }
366         }
367     }
368 
369     @Override
370     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
371         switch (state) {
372         case OPEN:
373         case BOUND:
374             throw new NotYetConnectedException();
375         case CLOSED:
376             throw new ClosedChannelException();
377         case CONNECTED:
378             break;
379         }
380 
381         final LocalChannel peer = this.peer;
382 
383         writeInProgress = true;
384         try {
385             ClosedChannelException exception = null;
386             for (;;) {
387                 Object msg = in.current();
388                 if (msg == null) {
389                     break;
390                 }
391                 try {
392                     // It is possible the peer could have closed while we are writing, and in this case we should
393                     // simulate real socket behavior and ensure the write operation is failed.
394                     if (peer.state == State.CONNECTED) {
395                         peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
396                         in.remove();
397                     } else {
398                         if (exception == null) {
399                             exception = new ClosedChannelException();
400                         }
401                         in.remove(exception);
402                     }
403                 } catch (Throwable cause) {
404                     in.remove(cause);
405                 }
406             }
407         } finally {
408             // The following situation may cause trouble:
409             // 1. Write (with promise X)
410             // 2. promise X is completed when in.remove() is called, and a listener on this promise calls close()
411             // 3. Then the close event will be executed for the peer before the write events, when the write events
412             // actually happened before the close event.
413             writeInProgress = false;
414         }
415 
416         finishPeerRead(peer);
417     }
418 
419     private void finishPeerRead(final LocalChannel peer) {
420         // If the peer is also writing, then we must schedule the event on the event loop to preserve read order.
421         if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
422             finishPeerRead0(peer);
423         } else {
424             runFinishPeerReadTask(peer);
425         }
426     }
427 
428     private void runFinishTask0() {
429         // If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
430         // we keep track of the task, and coordinate later that our read can't happen until the peer is done.
431         if (writeInProgress) {
432             finishReadFuture = eventLoop().submit(finishReadTask);
433         } else {
434             eventLoop().execute(finishReadTask);
435         }
436     }
437 
438     private void runFinishPeerReadTask(final LocalChannel peer) {
439         try {
440             peer.runFinishTask0();
441         } catch (Throwable cause) {
442             logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
443             close();
444             peer.close();
445             PlatformDependent.throwException(cause);
446         }
447     }
448 
449     private void releaseInboundBuffers() {
450         assert eventLoop() == null || eventLoop().inEventLoop();
451         readInProgress = false;
452         Queue<Object> inboundBuffer = this.inboundBuffer;
453         Object msg;
454         while ((msg = inboundBuffer.poll()) != null) {
455             ReferenceCountUtil.release(msg);
456         }
457     }
458 
459     private void finishPeerRead0(LocalChannel peer) {
460         Future<?> peerFinishReadFuture = peer.finishReadFuture;
461         if (peerFinishReadFuture != null) {
462             if (!peerFinishReadFuture.isDone()) {
463                 runFinishPeerReadTask(peer);
464                 return;
465             } else {
466                 // Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
467                 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
468             }
469         }
470         // We should only set readInProgress to false if there is any data that was read as otherwise we may miss to
471         // forward data later on.
472         if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
473             peer.readInProgress = false;
474             peer.readInbound();
475         }
476     }
477 
478     private class LocalUnsafe extends AbstractUnsafe {
479 
480         @Override
481         public void connect(final SocketAddress remoteAddress,
482                 SocketAddress localAddress, final ChannelPromise promise) {
483             if (!promise.setUncancellable() || !ensureOpen(promise)) {
484                 return;
485             }
486 
487             if (state == State.CONNECTED) {
488                 Exception cause = new AlreadyConnectedException();
489                 safeSetFailure(promise, cause);
490                 return;
491             }
492 
493             if (connectPromise != null) {
494                 throw new ConnectionPendingException();
495             }
496 
497             connectPromise = promise;
498 
499             if (state != State.BOUND) {
500                 // Not bound yet and no localAddress specified - get one.
501                 if (localAddress == null) {
502                     localAddress = new LocalAddress(LocalChannel.this);
503                 }
504             }
505 
506             if (localAddress != null) {
507                 try {
508                     doBind(localAddress);
509                 } catch (Throwable t) {
510                     safeSetFailure(promise, t);
511                     close(voidPromise());
512                     return;
513                 }
514             }
515 
516             Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
517             if (!(boundChannel instanceof LocalServerChannel)) {
518                 Exception cause = new ConnectException("connection refused: " + remoteAddress);
519                 safeSetFailure(promise, cause);
520                 close(voidPromise());
521                 return;
522             }
523 
524             LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
525             peer = serverChannel.serve(LocalChannel.this);
526         }
527     }
528 }