View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultChannelConfig;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.PreferHeapByteBufAllocator;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.SingleThreadEventLoop;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.concurrent.Future;
32  import io.netty.util.concurrent.SingleThreadEventExecutor;
33  import io.netty.util.internal.InternalThreadLocalMap;
34  import io.netty.util.internal.PlatformDependent;
35  import io.netty.util.internal.ThrowableUtil;
36  import io.netty.util.internal.logging.InternalLogger;
37  import io.netty.util.internal.logging.InternalLoggerFactory;
38  
39  import java.net.ConnectException;
40  import java.net.SocketAddress;
41  import java.nio.channels.AlreadyConnectedException;
42  import java.nio.channels.ClosedChannelException;
43  import java.nio.channels.ConnectionPendingException;
44  import java.nio.channels.NotYetConnectedException;
45  import java.util.Queue;
46  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
47  
48  /**
49   * A {@link Channel} for the local transport.
50   */
51  public class LocalChannel extends AbstractChannel {
52      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
53      @SuppressWarnings({ "rawtypes" })
54      private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
55              AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
56      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57      private static final int MAX_READER_STACK_DEPTH = 8;
58      private static final ClosedChannelException DO_WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
59              new ClosedChannelException(), LocalChannel.class, "doWrite(...)");
60      private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
61              new ClosedChannelException(), LocalChannel.class, "doClose()");
62  
63      private enum State { OPEN, BOUND, CONNECTED, CLOSED }
64  
65      private final ChannelConfig config = new DefaultChannelConfig(this);
66      // To further optimize this we could write our own SPSC queue.
67      final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
68      private final Runnable readTask = new Runnable() {
69          @Override
70          public void run() {
71              // Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete()
72              if (!inboundBuffer.isEmpty()) {
73                  readInbound();
74              }
75          }
76      };
77  
78      private final Runnable shutdownHook = new Runnable() {
79          @Override
80          public void run() {
81              unsafe().close(unsafe().voidPromise());
82          }
83      };
84  
85      private volatile State state;
86      private volatile LocalChannel peer;
87      private volatile LocalAddress localAddress;
88      private volatile LocalAddress remoteAddress;
89      private volatile ChannelPromise connectPromise;
90      private volatile boolean readInProgress;
91      private volatile boolean writeInProgress;
92      private volatile Future<?> finishReadFuture;
93  
94      public LocalChannel() {
95          super(null);
96          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
97      }
98  
99      protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
100         super(parent);
101         config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
102         this.peer = peer;
103         localAddress = parent.localAddress();
104         remoteAddress = peer.localAddress();
105     }
106 
107     @Override
108     public ChannelMetadata metadata() {
109         return METADATA;
110     }
111 
112     @Override
113     public ChannelConfig config() {
114         return config;
115     }
116 
117     @Override
118     public LocalServerChannel parent() {
119         return (LocalServerChannel) super.parent();
120     }
121 
122     @Override
123     public LocalAddress localAddress() {
124         return (LocalAddress) super.localAddress();
125     }
126 
127     @Override
128     public LocalAddress remoteAddress() {
129         return (LocalAddress) super.remoteAddress();
130     }
131 
132     @Override
133     public boolean isOpen() {
134         return state != State.CLOSED;
135     }
136 
137     @Override
138     public boolean isActive() {
139         return state == State.CONNECTED;
140     }
141 
142     @Override
143     protected AbstractUnsafe newUnsafe() {
144         return new LocalUnsafe();
145     }
146 
147     @Override
148     protected boolean isCompatible(EventLoop loop) {
149         return loop instanceof SingleThreadEventLoop;
150     }
151 
152     @Override
153     protected SocketAddress localAddress0() {
154         return localAddress;
155     }
156 
157     @Override
158     protected SocketAddress remoteAddress0() {
159         return remoteAddress;
160     }
161 
162     @Override
163     protected void doRegister() throws Exception {
164         // Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
165         // This is needed as a peer may not be null also if a LocalChannel was connected before and
166         // deregistered / registered later again.
167         //
168         // See https://github.com/netty/netty/issues/2400
169         if (peer != null && parent() != null) {
170             // Store the peer in a local variable as it may be set to null if doClose() is called.
171             // See https://github.com/netty/netty/issues/2144
172             final LocalChannel peer = this.peer;
173             state = State.CONNECTED;
174 
175             peer.remoteAddress = parent() == null ? null : parent().localAddress();
176             peer.state = State.CONNECTED;
177 
178             // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
179             // This ensures that if both channels are on the same event loop, the peer's channelActive
180             // event is triggered *after* this channel's channelRegistered event, so that this channel's
181             // pipeline is fully initialized by ChannelInitializer before any channelRead events.
182             peer.eventLoop().execute(new Runnable() {
183                 @Override
184                 public void run() {
185                     ChannelPromise promise = peer.connectPromise;
186 
187                     // Only trigger fireChannelActive() if the promise was not null and was not completed yet.
188                     // connectPromise may be set to null if doClose() was called in the meantime.
189                     if (promise != null && promise.trySuccess()) {
190                         peer.pipeline().fireChannelActive();
191                     }
192                 }
193             });
194         }
195         ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
196     }
197 
198     @Override
199     protected void doBind(SocketAddress localAddress) throws Exception {
200         this.localAddress =
201                 LocalChannelRegistry.register(this, this.localAddress,
202                         localAddress);
203         state = State.BOUND;
204     }
205 
206     @Override
207     protected void doDisconnect() throws Exception {
208         doClose();
209     }
210 
211     @Override
212     protected void doClose() throws Exception {
213         final LocalChannel peer = this.peer;
214         State oldState = state;
215         try {
216             if (oldState != State.CLOSED) {
217                 // Update all internal state before the closeFuture is notified.
218                 if (localAddress != null) {
219                     if (parent() == null) {
220                         LocalChannelRegistry.unregister(localAddress);
221                     }
222                     localAddress = null;
223                 }
224 
225                 // State change must happen before finishPeerRead to ensure writes are released either in doWrite or
226                 // channelRead.
227                 state = State.CLOSED;
228 
229                 // Preserve order of event and force a read operation now before the close operation is processed.
230                 if (writeInProgress && peer != null) {
231                     finishPeerRead(peer);
232                 }
233 
234                 ChannelPromise promise = connectPromise;
235                 if (promise != null) {
236                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
237                     promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
238                     connectPromise = null;
239                 }
240             }
241 
242             if (peer != null) {
243                 this.peer = null;
244                 // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
245                 // This ensures that if both channels are on the same event loop, the peer's channelInActive
246                 // event is triggered *after* this peer's channelInActive event
247                 EventLoop peerEventLoop = peer.eventLoop();
248                 final boolean peerIsActive = peer.isActive();
249                 try {
250                     peerEventLoop.execute(new Runnable() {
251                         @Override
252                         public void run() {
253                             peer.tryClose(peerIsActive);
254                         }
255                     });
256                 } catch (Throwable cause) {
257                     logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
258                             this, peer, cause);
259                     if (peerEventLoop.inEventLoop()) {
260                         peer.releaseInboundBuffers();
261                     } else {
262                         // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
263                         // rejects the close Runnable but give a best effort.
264                         peer.close();
265                     }
266                     PlatformDependent.throwException(cause);
267                 }
268             }
269         } finally {
270             // Release all buffers if the Channel was already registered in the past and if it was not closed before.
271             if (oldState != null && oldState != State.CLOSED) {
272                 // We need to release all the buffers that may be put into our inbound queue since we closed the Channel
273                 // to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
274                 // means even if the promise was notified before its not really guaranteed that the "remote peer" will
275                 // see the buffer at all.
276                 releaseInboundBuffers();
277             }
278         }
279     }
280 
281     private void tryClose(boolean isActive) {
282         if (isActive) {
283             unsafe().close(unsafe().voidPromise());
284         } else {
285             releaseInboundBuffers();
286         }
287     }
288 
289     @Override
290     protected void doDeregister() throws Exception {
291         // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
292         ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
293     }
294 
295     private void readInbound() {
296         RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
297         handle.reset(config());
298         ChannelPipeline pipeline = pipeline();
299         do {
300             Object received = inboundBuffer.poll();
301             if (received == null) {
302                 break;
303             }
304             pipeline.fireChannelRead(received);
305         } while (handle.continueReading());
306 
307         pipeline.fireChannelReadComplete();
308     }
309 
310     @Override
311     protected void doBeginRead() throws Exception {
312         if (readInProgress) {
313             return;
314         }
315 
316         Queue<Object> inboundBuffer = this.inboundBuffer;
317         if (inboundBuffer.isEmpty()) {
318             readInProgress = true;
319             return;
320         }
321 
322         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
323         final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
324         if (stackDepth < MAX_READER_STACK_DEPTH) {
325             threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
326             try {
327                 readInbound();
328             } finally {
329                 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
330             }
331         } else {
332             try {
333                 eventLoop().execute(readTask);
334             } catch (Throwable cause) {
335                 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
336                 close();
337                 peer.close();
338                 PlatformDependent.throwException(cause);
339             }
340         }
341     }
342 
343     @Override
344     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
345         switch (state) {
346         case OPEN:
347         case BOUND:
348             throw new NotYetConnectedException();
349         case CLOSED:
350             throw DO_WRITE_CLOSED_CHANNEL_EXCEPTION;
351         case CONNECTED:
352             break;
353         }
354 
355         final LocalChannel peer = this.peer;
356 
357         writeInProgress = true;
358         try {
359             for (;;) {
360                 Object msg = in.current();
361                 if (msg == null) {
362                     break;
363                 }
364                 try {
365                     // It is possible the peer could have closed while we are writing, and in this case we should
366                     // simulate real socket behavior and ensure the write operation is failed.
367                     if (peer.state == State.CONNECTED) {
368                         peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
369                         in.remove();
370                     } else {
371                         in.remove(DO_WRITE_CLOSED_CHANNEL_EXCEPTION);
372                     }
373                 } catch (Throwable cause) {
374                     in.remove(cause);
375                 }
376             }
377         } finally {
378             // The following situation may cause trouble:
379             // 1. Write (with promise X)
380             // 2. promise X is completed when in.remove() is called, and a listener on this promise calls close()
381             // 3. Then the close event will be executed for the peer before the write events, when the write events
382             // actually happened before the close event.
383             writeInProgress = false;
384         }
385 
386         finishPeerRead(peer);
387     }
388 
389     private void finishPeerRead(final LocalChannel peer) {
390         // If the peer is also writing, then we must schedule the event on the event loop to preserve read order.
391         if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
392             finishPeerRead0(peer);
393         } else {
394             runFinishPeerReadTask(peer);
395         }
396     }
397 
398     private void runFinishPeerReadTask(final LocalChannel peer) {
399         // If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
400         // we keep track of the task, and coordinate later that our read can't happen until the peer is done.
401         final Runnable finishPeerReadTask = new Runnable() {
402             @Override
403             public void run() {
404                 finishPeerRead0(peer);
405             }
406         };
407         try {
408             if (peer.writeInProgress) {
409                 peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
410             } else {
411                 peer.eventLoop().execute(finishPeerReadTask);
412             }
413         } catch (Throwable cause) {
414             logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
415             close();
416             peer.close();
417             PlatformDependent.throwException(cause);
418         }
419     }
420 
421     private void releaseInboundBuffers() {
422         assert eventLoop() == null || eventLoop().inEventLoop();
423         readInProgress = false;
424         Queue<Object> inboundBuffer = this.inboundBuffer;
425         Object msg;
426         while ((msg = inboundBuffer.poll()) != null) {
427             ReferenceCountUtil.release(msg);
428         }
429     }
430 
431     private void finishPeerRead0(LocalChannel peer) {
432         Future<?> peerFinishReadFuture = peer.finishReadFuture;
433         if (peerFinishReadFuture != null) {
434             if (!peerFinishReadFuture.isDone()) {
435                 runFinishPeerReadTask(peer);
436                 return;
437             } else {
438                 // Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
439                 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
440             }
441         }
442         // We should only set readInProgress to false if there is any data that was read as otherwise we may miss to
443         // forward data later on.
444         if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
445             peer.readInProgress = false;
446             peer.readInbound();
447         }
448     }
449 
450     private class LocalUnsafe extends AbstractUnsafe {
451 
452         @Override
453         public void connect(final SocketAddress remoteAddress,
454                 SocketAddress localAddress, final ChannelPromise promise) {
455             if (!promise.setUncancellable() || !ensureOpen(promise)) {
456                 return;
457             }
458 
459             if (state == State.CONNECTED) {
460                 Exception cause = new AlreadyConnectedException();
461                 safeSetFailure(promise, cause);
462                 pipeline().fireExceptionCaught(cause);
463                 return;
464             }
465 
466             if (connectPromise != null) {
467                 throw new ConnectionPendingException();
468             }
469 
470             connectPromise = promise;
471 
472             if (state != State.BOUND) {
473                 // Not bound yet and no localAddress specified - get one.
474                 if (localAddress == null) {
475                     localAddress = new LocalAddress(LocalChannel.this);
476                 }
477             }
478 
479             if (localAddress != null) {
480                 try {
481                     doBind(localAddress);
482                 } catch (Throwable t) {
483                     safeSetFailure(promise, t);
484                     close(voidPromise());
485                     return;
486                 }
487             }
488 
489             Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
490             if (!(boundChannel instanceof LocalServerChannel)) {
491                 Exception cause = new ConnectException("connection refused: " + remoteAddress);
492                 safeSetFailure(promise, cause);
493                 close(voidPromise());
494                 return;
495             }
496 
497             LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
498             peer = serverChannel.serve(LocalChannel.this);
499         }
500     }
501 }