View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultChannelConfig;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.PreferHeapByteBufAllocator;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.SingleThreadEventLoop;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.concurrent.Future;
32  import io.netty.util.concurrent.SingleThreadEventExecutor;
33  import io.netty.util.internal.InternalThreadLocalMap;
34  import io.netty.util.internal.PlatformDependent;
35  import io.netty.util.internal.logging.InternalLogger;
36  import io.netty.util.internal.logging.InternalLoggerFactory;
37  
38  import java.net.ConnectException;
39  import java.net.SocketAddress;
40  import java.nio.channels.AlreadyConnectedException;
41  import java.nio.channels.ClosedChannelException;
42  import java.nio.channels.ConnectionPendingException;
43  import java.nio.channels.NotYetConnectedException;
44  import java.util.Queue;
45  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
46  
47  /**
48   * A {@link Channel} for the local transport.
49   */
50  public class LocalChannel extends AbstractChannel {
51      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
52      @SuppressWarnings({ "rawtypes" })
53      private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
54              AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
55      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
56      private static final int MAX_READER_STACK_DEPTH = 8;
57  
58      private enum State { OPEN, BOUND, CONNECTED, CLOSED }
59  
60      private final ChannelConfig config = new DefaultChannelConfig(this);
61      // To further optimize this we could write our own SPSC queue.
62      final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
63      private final Runnable readTask = new Runnable() {
64          @Override
65          public void run() {
66              // Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete()
67              if (!inboundBuffer.isEmpty()) {
68                  readInbound();
69              }
70          }
71      };
72  
73      private final Runnable shutdownHook = new Runnable() {
74          @Override
75          public void run() {
76              unsafe().close(unsafe().voidPromise());
77          }
78      };
79  
80      private volatile State state;
81      private volatile LocalChannel peer;
82      private volatile LocalAddress localAddress;
83      private volatile LocalAddress remoteAddress;
84      private volatile ChannelPromise connectPromise;
85      private volatile boolean readInProgress;
86      private volatile boolean writeInProgress;
87      private volatile Future<?> finishReadFuture;
88  
89      public LocalChannel() {
90          super(null);
91          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
92      }
93  
94      protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
95          super(parent);
96          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
97          this.peer = peer;
98          localAddress = parent.localAddress();
99          remoteAddress = peer.localAddress();
100     }
101 
102     @Override
103     public ChannelMetadata metadata() {
104         return METADATA;
105     }
106 
107     @Override
108     public ChannelConfig config() {
109         return config;
110     }
111 
112     @Override
113     public LocalServerChannel parent() {
114         return (LocalServerChannel) super.parent();
115     }
116 
117     @Override
118     public LocalAddress localAddress() {
119         return (LocalAddress) super.localAddress();
120     }
121 
122     @Override
123     public LocalAddress remoteAddress() {
124         return (LocalAddress) super.remoteAddress();
125     }
126 
127     @Override
128     public boolean isOpen() {
129         return state != State.CLOSED;
130     }
131 
132     @Override
133     public boolean isActive() {
134         return state == State.CONNECTED;
135     }
136 
137     @Override
138     protected AbstractUnsafe newUnsafe() {
139         return new LocalUnsafe();
140     }
141 
142     @Override
143     protected boolean isCompatible(EventLoop loop) {
144         return loop instanceof SingleThreadEventLoop;
145     }
146 
147     @Override
148     protected SocketAddress localAddress0() {
149         return localAddress;
150     }
151 
152     @Override
153     protected SocketAddress remoteAddress0() {
154         return remoteAddress;
155     }
156 
157     @Override
158     protected void doRegister() throws Exception {
159         // Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
160         // This is needed as a peer may not be null also if a LocalChannel was connected before and
161         // deregistered / registered later again.
162         //
163         // See https://github.com/netty/netty/issues/2400
164         if (peer != null && parent() != null) {
165             // Store the peer in a local variable as it may be set to null if doClose() is called.
166             // See https://github.com/netty/netty/issues/2144
167             final LocalChannel peer = this.peer;
168             state = State.CONNECTED;
169 
170             peer.remoteAddress = parent() == null ? null : parent().localAddress();
171             peer.state = State.CONNECTED;
172 
173             // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
174             // This ensures that if both channels are on the same event loop, the peer's channelActive
175             // event is triggered *after* this channel's channelRegistered event, so that this channel's
176             // pipeline is fully initialized by ChannelInitializer before any channelRead events.
177             peer.eventLoop().execute(new Runnable() {
178                 @Override
179                 public void run() {
180                     ChannelPromise promise = peer.connectPromise;
181 
182                     // Only trigger fireChannelActive() if the promise was not null and was not completed yet.
183                     // connectPromise may be set to null if doClose() was called in the meantime.
184                     if (promise != null && promise.trySuccess()) {
185                         peer.pipeline().fireChannelActive();
186                     }
187                 }
188             });
189         }
190         ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
191     }
192 
193     @Override
194     protected void doBind(SocketAddress localAddress) throws Exception {
195         this.localAddress =
196                 LocalChannelRegistry.register(this, this.localAddress,
197                         localAddress);
198         state = State.BOUND;
199     }
200 
201     @Override
202     protected void doDisconnect() throws Exception {
203         doClose();
204     }
205 
206     @Override
207     protected void doClose() throws Exception {
208         final LocalChannel peer = this.peer;
209         State oldState = state;
210         try {
211             if (oldState != State.CLOSED) {
212                 // Update all internal state before the closeFuture is notified.
213                 if (localAddress != null) {
214                     if (parent() == null) {
215                         LocalChannelRegistry.unregister(localAddress);
216                     }
217                     localAddress = null;
218                 }
219 
220                 // State change must happen before finishPeerRead to ensure writes are released either in doWrite or
221                 // channelRead.
222                 state = State.CLOSED;
223 
224                 // Preserve order of event and force a read operation now before the close operation is processed.
225                 if (writeInProgress && peer != null) {
226                     finishPeerRead(peer);
227                 }
228 
229                 ChannelPromise promise = connectPromise;
230                 if (promise != null) {
231                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
232                     promise.tryFailure(new ClosedChannelException());
233                     connectPromise = null;
234                 }
235             }
236 
237             if (peer != null) {
238                 this.peer = null;
239                 // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
240                 // This ensures that if both channels are on the same event loop, the peer's channelInActive
241                 // event is triggered *after* this peer's channelInActive event
242                 EventLoop peerEventLoop = peer.eventLoop();
243                 final boolean peerIsActive = peer.isActive();
244                 try {
245                     peerEventLoop.execute(new Runnable() {
246                         @Override
247                         public void run() {
248                             peer.tryClose(peerIsActive);
249                         }
250                     });
251                 } catch (Throwable cause) {
252                     logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
253                             this, peer, cause);
254                     if (peerEventLoop.inEventLoop()) {
255                         peer.releaseInboundBuffers();
256                     } else {
257                         // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
258                         // rejects the close Runnable but give a best effort.
259                         peer.close();
260                     }
261                     PlatformDependent.throwException(cause);
262                 }
263             }
264         } finally {
265             // Release all buffers if the Channel was already registered in the past and if it was not closed before.
266             if (oldState != null && oldState != State.CLOSED) {
267                 // We need to release all the buffers that may be put into our inbound queue since we closed the Channel
268                 // to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
269                 // means even if the promise was notified before its not really guaranteed that the "remote peer" will
270                 // see the buffer at all.
271                 releaseInboundBuffers();
272             }
273         }
274     }
275 
276     private void tryClose(boolean isActive) {
277         if (isActive) {
278             unsafe().close(unsafe().voidPromise());
279         } else {
280             releaseInboundBuffers();
281         }
282     }
283 
284     @Override
285     protected void doDeregister() throws Exception {
286         // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
287         ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
288     }
289 
290     private void readInbound() {
291         RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
292         handle.reset(config());
293         ChannelPipeline pipeline = pipeline();
294         do {
295             Object received = inboundBuffer.poll();
296             if (received == null) {
297                 break;
298             }
299             pipeline.fireChannelRead(received);
300         } while (handle.continueReading());
301 
302         pipeline.fireChannelReadComplete();
303     }
304 
305     @Override
306     protected void doBeginRead() throws Exception {
307         if (readInProgress) {
308             return;
309         }
310 
311         Queue<Object> inboundBuffer = this.inboundBuffer;
312         if (inboundBuffer.isEmpty()) {
313             readInProgress = true;
314             return;
315         }
316 
317         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
318         final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
319         if (stackDepth < MAX_READER_STACK_DEPTH) {
320             threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
321             try {
322                 readInbound();
323             } finally {
324                 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
325             }
326         } else {
327             try {
328                 eventLoop().execute(readTask);
329             } catch (Throwable cause) {
330                 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
331                 close();
332                 peer.close();
333                 PlatformDependent.throwException(cause);
334             }
335         }
336     }
337 
338     @Override
339     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
340         switch (state) {
341         case OPEN:
342         case BOUND:
343             throw new NotYetConnectedException();
344         case CLOSED:
345             throw new ClosedChannelException();
346         case CONNECTED:
347             break;
348         }
349 
350         final LocalChannel peer = this.peer;
351 
352         writeInProgress = true;
353         try {
354             ClosedChannelException exception = null;
355             for (;;) {
356                 Object msg = in.current();
357                 if (msg == null) {
358                     break;
359                 }
360                 try {
361                     // It is possible the peer could have closed while we are writing, and in this case we should
362                     // simulate real socket behavior and ensure the write operation is failed.
363                     if (peer.state == State.CONNECTED) {
364                         peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
365                         in.remove();
366                     } else {
367                         if (exception == null) {
368                             exception = new ClosedChannelException();
369                         }
370                         in.remove(exception);
371                     }
372                 } catch (Throwable cause) {
373                     in.remove(cause);
374                 }
375             }
376         } finally {
377             // The following situation may cause trouble:
378             // 1. Write (with promise X)
379             // 2. promise X is completed when in.remove() is called, and a listener on this promise calls close()
380             // 3. Then the close event will be executed for the peer before the write events, when the write events
381             // actually happened before the close event.
382             writeInProgress = false;
383         }
384 
385         finishPeerRead(peer);
386     }
387 
388     private void finishPeerRead(final LocalChannel peer) {
389         // If the peer is also writing, then we must schedule the event on the event loop to preserve read order.
390         if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
391             finishPeerRead0(peer);
392         } else {
393             runFinishPeerReadTask(peer);
394         }
395     }
396 
397     private void runFinishPeerReadTask(final LocalChannel peer) {
398         // If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
399         // we keep track of the task, and coordinate later that our read can't happen until the peer is done.
400         final Runnable finishPeerReadTask = new Runnable() {
401             @Override
402             public void run() {
403                 finishPeerRead0(peer);
404             }
405         };
406         try {
407             if (peer.writeInProgress) {
408                 peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
409             } else {
410                 peer.eventLoop().execute(finishPeerReadTask);
411             }
412         } catch (Throwable cause) {
413             logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
414             close();
415             peer.close();
416             PlatformDependent.throwException(cause);
417         }
418     }
419 
420     private void releaseInboundBuffers() {
421         assert eventLoop() == null || eventLoop().inEventLoop();
422         readInProgress = false;
423         Queue<Object> inboundBuffer = this.inboundBuffer;
424         Object msg;
425         while ((msg = inboundBuffer.poll()) != null) {
426             ReferenceCountUtil.release(msg);
427         }
428     }
429 
430     private void finishPeerRead0(LocalChannel peer) {
431         Future<?> peerFinishReadFuture = peer.finishReadFuture;
432         if (peerFinishReadFuture != null) {
433             if (!peerFinishReadFuture.isDone()) {
434                 runFinishPeerReadTask(peer);
435                 return;
436             } else {
437                 // Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
438                 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
439             }
440         }
441         // We should only set readInProgress to false if there is any data that was read as otherwise we may miss to
442         // forward data later on.
443         if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
444             peer.readInProgress = false;
445             peer.readInbound();
446         }
447     }
448 
449     private class LocalUnsafe extends AbstractUnsafe {
450 
451         @Override
452         public void connect(final SocketAddress remoteAddress,
453                 SocketAddress localAddress, final ChannelPromise promise) {
454             if (!promise.setUncancellable() || !ensureOpen(promise)) {
455                 return;
456             }
457 
458             if (state == State.CONNECTED) {
459                 Exception cause = new AlreadyConnectedException();
460                 safeSetFailure(promise, cause);
461                 pipeline().fireExceptionCaught(cause);
462                 return;
463             }
464 
465             if (connectPromise != null) {
466                 throw new ConnectionPendingException();
467             }
468 
469             connectPromise = promise;
470 
471             if (state != State.BOUND) {
472                 // Not bound yet and no localAddress specified - get one.
473                 if (localAddress == null) {
474                     localAddress = new LocalAddress(LocalChannel.this);
475                 }
476             }
477 
478             if (localAddress != null) {
479                 try {
480                     doBind(localAddress);
481                 } catch (Throwable t) {
482                     safeSetFailure(promise, t);
483                     close(voidPromise());
484                     return;
485                 }
486             }
487 
488             Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
489             if (!(boundChannel instanceof LocalServerChannel)) {
490                 Exception cause = new ConnectException("connection refused: " + remoteAddress);
491                 safeSetFailure(promise, cause);
492                 close(voidPromise());
493                 return;
494             }
495 
496             LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
497             peer = serverChannel.serve(LocalChannel.this);
498         }
499     }
500 }