View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultChannelConfig;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.PreferHeapByteBufAllocator;
28  import io.netty.channel.SingleThreadEventLoop;
29  import io.netty.util.ReferenceCountUtil;
30  import io.netty.util.concurrent.Future;
31  import io.netty.util.concurrent.SingleThreadEventExecutor;
32  import io.netty.util.internal.InternalThreadLocalMap;
33  import io.netty.util.internal.PlatformDependent;
34  import io.netty.util.internal.ThrowableUtil;
35  import io.netty.util.internal.logging.InternalLogger;
36  import io.netty.util.internal.logging.InternalLoggerFactory;
37  
38  import java.net.ConnectException;
39  import java.net.SocketAddress;
40  import java.nio.channels.AlreadyConnectedException;
41  import java.nio.channels.ClosedChannelException;
42  import java.nio.channels.ConnectionPendingException;
43  import java.nio.channels.NotYetConnectedException;
44  import java.util.Queue;
45  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
46  
47  /**
48   * A {@link Channel} for the local transport.
49   */
50  public class LocalChannel extends AbstractChannel {
51      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
52      @SuppressWarnings({ "rawtypes" })
53      private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
54              AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
55      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
56      private static final int MAX_READER_STACK_DEPTH = 8;
57      private static final ClosedChannelException DO_WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
58              new ClosedChannelException(), LocalChannel.class, "doWrite(...)");
59      private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
60              new ClosedChannelException(), LocalChannel.class, "doClose()");
61  
62      private enum State { OPEN, BOUND, CONNECTED, CLOSED }
63  
64      private final ChannelConfig config = new DefaultChannelConfig(this);
65      // To further optimize this we could write our own SPSC queue.
66      final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
67      private final Runnable readTask = new Runnable() {
68          @Override
69          public void run() {
70              ChannelPipeline pipeline = pipeline();
71              for (;;) {
72                  Object m = inboundBuffer.poll();
73                  if (m == null) {
74                      break;
75                  }
76                  pipeline.fireChannelRead(m);
77              }
78              pipeline.fireChannelReadComplete();
79          }
80      };
81      private final Runnable shutdownHook = new Runnable() {
82          @Override
83          public void run() {
84              unsafe().close(unsafe().voidPromise());
85          }
86      };
87  
88      private volatile State state;
89      private volatile LocalChannel peer;
90      private volatile LocalAddress localAddress;
91      private volatile LocalAddress remoteAddress;
92      private volatile ChannelPromise connectPromise;
93      private volatile boolean readInProgress;
94      private volatile boolean writeInProgress;
95      private volatile Future<?> finishReadFuture;
96  
97      public LocalChannel() {
98          super(null);
99          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
100     }
101 
102     protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
103         super(parent);
104         config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
105         this.peer = peer;
106         localAddress = parent.localAddress();
107         remoteAddress = peer.localAddress();
108     }
109 
110     @Override
111     public ChannelMetadata metadata() {
112         return METADATA;
113     }
114 
115     @Override
116     public ChannelConfig config() {
117         return config;
118     }
119 
120     @Override
121     public LocalServerChannel parent() {
122         return (LocalServerChannel) super.parent();
123     }
124 
125     @Override
126     public LocalAddress localAddress() {
127         return (LocalAddress) super.localAddress();
128     }
129 
130     @Override
131     public LocalAddress remoteAddress() {
132         return (LocalAddress) super.remoteAddress();
133     }
134 
135     @Override
136     public boolean isOpen() {
137         return state != State.CLOSED;
138     }
139 
140     @Override
141     public boolean isActive() {
142         return state == State.CONNECTED;
143     }
144 
145     @Override
146     protected AbstractUnsafe newUnsafe() {
147         return new LocalUnsafe();
148     }
149 
150     @Override
151     protected boolean isCompatible(EventLoop loop) {
152         return loop instanceof SingleThreadEventLoop;
153     }
154 
155     @Override
156     protected SocketAddress localAddress0() {
157         return localAddress;
158     }
159 
160     @Override
161     protected SocketAddress remoteAddress0() {
162         return remoteAddress;
163     }
164 
165     @Override
166     protected void doRegister() throws Exception {
167         // Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
168         // This is needed as a peer may not be null also if a LocalChannel was connected before and
169         // deregistered / registered later again.
170         //
171         // See https://github.com/netty/netty/issues/2400
172         if (peer != null && parent() != null) {
173             // Store the peer in a local variable as it may be set to null if doClose() is called.
174             // See https://github.com/netty/netty/issues/2144
175             final LocalChannel peer = this.peer;
176             state = State.CONNECTED;
177 
178             peer.remoteAddress = parent() == null ? null : parent().localAddress();
179             peer.state = State.CONNECTED;
180 
181             // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
182             // This ensures that if both channels are on the same event loop, the peer's channelActive
183             // event is triggered *after* this channel's channelRegistered event, so that this channel's
184             // pipeline is fully initialized by ChannelInitializer before any channelRead events.
185             peer.eventLoop().execute(new Runnable() {
186                 @Override
187                 public void run() {
188                     ChannelPromise promise = peer.connectPromise;
189 
190                     // Only trigger fireChannelActive() if the promise was not null and was not completed yet.
191                     // connectPromise may be set to null if doClose() was called in the meantime.
192                     if (promise != null && promise.trySuccess()) {
193                         peer.pipeline().fireChannelActive();
194                     }
195                 }
196             });
197         }
198         ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
199     }
200 
201     @Override
202     protected void doBind(SocketAddress localAddress) throws Exception {
203         this.localAddress =
204                 LocalChannelRegistry.register(this, this.localAddress,
205                         localAddress);
206         state = State.BOUND;
207     }
208 
209     @Override
210     protected void doDisconnect() throws Exception {
211         doClose();
212     }
213 
214     @Override
215     protected void doClose() throws Exception {
216         final LocalChannel peer = this.peer;
217         State oldState = state;
218         try {
219             if (oldState != State.CLOSED) {
220                 // Update all internal state before the closeFuture is notified.
221                 if (localAddress != null) {
222                     if (parent() == null) {
223                         LocalChannelRegistry.unregister(localAddress);
224                     }
225                     localAddress = null;
226                 }
227 
228                 // State change must happen before finishPeerRead to ensure writes are released either in doWrite or
229                 // channelRead.
230                 state = State.CLOSED;
231 
232                 // Preserve order of event and force a read operation now before the close operation is processed.
233                 if (writeInProgress && peer != null) {
234                     finishPeerRead(peer);
235                 }
236 
237                 ChannelPromise promise = connectPromise;
238                 if (promise != null) {
239                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
240                     promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
241                     connectPromise = null;
242                 }
243             }
244 
245             if (peer != null) {
246                 this.peer = null;
247                 // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
248                 // This ensures that if both channels are on the same event loop, the peer's channelInActive
249                 // event is triggered *after* this peer's channelInActive event
250                 EventLoop peerEventLoop = peer.eventLoop();
251                 final boolean peerIsActive = peer.isActive();
252                 try {
253                     peerEventLoop.execute(new Runnable() {
254                         @Override
255                         public void run() {
256                             peer.tryClose(peerIsActive);
257                         }
258                     });
259                 } catch (Throwable cause) {
260                     logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
261                             this, peer, cause);
262                     if (peerEventLoop.inEventLoop()) {
263                         peer.releaseInboundBuffers();
264                     } else {
265                         // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
266                         // rejects the close Runnable but give a best effort.
267                         peer.close();
268                     }
269                     PlatformDependent.throwException(cause);
270                 }
271             }
272         } finally {
273             // Release all buffers if the Channel was already registered in the past and if it was not closed before.
274             if (oldState != null && oldState != State.CLOSED) {
275                 // We need to release all the buffers that may be put into our inbound queue since we closed the Channel
276                 // to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
277                 // means even if the promise was notified before its not really guaranteed that the "remote peer" will
278                 // see the buffer at all.
279                 releaseInboundBuffers();
280             }
281         }
282     }
283 
284     private void tryClose(boolean isActive) {
285         if (isActive) {
286             unsafe().close(unsafe().voidPromise());
287         } else {
288             releaseInboundBuffers();
289         }
290     }
291 
292     @Override
293     protected void doDeregister() throws Exception {
294         // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
295         ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
296     }
297 
298     @Override
299     protected void doBeginRead() throws Exception {
300         if (readInProgress) {
301             return;
302         }
303 
304         ChannelPipeline pipeline = pipeline();
305         Queue<Object> inboundBuffer = this.inboundBuffer;
306         if (inboundBuffer.isEmpty()) {
307             readInProgress = true;
308             return;
309         }
310 
311         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
312         final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
313         if (stackDepth < MAX_READER_STACK_DEPTH) {
314             threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
315             try {
316                 for (;;) {
317                     Object received = inboundBuffer.poll();
318                     if (received == null) {
319                         break;
320                     }
321                     pipeline.fireChannelRead(received);
322                 }
323                 pipeline.fireChannelReadComplete();
324             } finally {
325                 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
326             }
327         } else {
328             try {
329                 eventLoop().execute(readTask);
330             } catch (Throwable cause) {
331                 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
332                 close();
333                 peer.close();
334                 PlatformDependent.throwException(cause);
335             }
336         }
337     }
338 
339     @Override
340     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
341         switch (state) {
342         case OPEN:
343         case BOUND:
344             throw new NotYetConnectedException();
345         case CLOSED:
346             throw DO_WRITE_CLOSED_CHANNEL_EXCEPTION;
347         case CONNECTED:
348             break;
349         }
350 
351         final LocalChannel peer = this.peer;
352 
353         writeInProgress = true;
354         try {
355             for (;;) {
356                 Object msg = in.current();
357                 if (msg == null) {
358                     break;
359                 }
360                 try {
361                     // It is possible the peer could have closed while we are writing, and in this case we should
362                     // simulate real socket behavior and ensure the write operation is failed.
363                     if (peer.state == State.CONNECTED) {
364                         peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
365                         in.remove();
366                     } else {
367                         in.remove(DO_WRITE_CLOSED_CHANNEL_EXCEPTION);
368                     }
369                 } catch (Throwable cause) {
370                     in.remove(cause);
371                 }
372             }
373         } finally {
374             // The following situation may cause trouble:
375             // 1. Write (with promise X)
376             // 2. promise X is completed when in.remove() is called, and a listener on this promise calls close()
377             // 3. Then the close event will be executed for the peer before the write events, when the write events
378             // actually happened before the close event.
379             writeInProgress = false;
380         }
381 
382         finishPeerRead(peer);
383     }
384 
385     private void finishPeerRead(final LocalChannel peer) {
386         // If the peer is also writing, then we must schedule the event on the event loop to preserve read order.
387         if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
388             finishPeerRead0(peer);
389         } else {
390             runFinishPeerReadTask(peer);
391         }
392     }
393 
394     private void runFinishPeerReadTask(final LocalChannel peer) {
395         // If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
396         // we keep track of the task, and coordinate later that our read can't happen until the peer is done.
397         final Runnable finishPeerReadTask = new Runnable() {
398             @Override
399             public void run() {
400                 finishPeerRead0(peer);
401             }
402         };
403         try {
404             if (peer.writeInProgress) {
405                 peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
406             } else {
407                 peer.eventLoop().execute(finishPeerReadTask);
408             }
409         } catch (Throwable cause) {
410             logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
411             close();
412             peer.close();
413             PlatformDependent.throwException(cause);
414         }
415     }
416 
417     private void releaseInboundBuffers() {
418         assert eventLoop() == null || eventLoop().inEventLoop();
419         readInProgress = false;
420         Queue<Object> inboundBuffer = this.inboundBuffer;
421         Object msg;
422         while ((msg = inboundBuffer.poll()) != null) {
423             ReferenceCountUtil.release(msg);
424         }
425     }
426 
427     private void finishPeerRead0(LocalChannel peer) {
428         Future<?> peerFinishReadFuture = peer.finishReadFuture;
429         if (peerFinishReadFuture != null) {
430             if (!peerFinishReadFuture.isDone()) {
431                 runFinishPeerReadTask(peer);
432                 return;
433             } else {
434                 // Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
435                 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
436             }
437         }
438         ChannelPipeline peerPipeline = peer.pipeline();
439         // We should only set readInProgress to false if there is any data that was read as otherwise we may miss to
440         // forward data later on.
441         if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
442             peer.readInProgress = false;
443             for (;;) {
444                 Object received = peer.inboundBuffer.poll();
445                 if (received == null) {
446                     break;
447                 }
448                 peerPipeline.fireChannelRead(received);
449             }
450             peerPipeline.fireChannelReadComplete();
451         }
452     }
453 
454     private class LocalUnsafe extends AbstractUnsafe {
455 
456         @Override
457         public void connect(final SocketAddress remoteAddress,
458                 SocketAddress localAddress, final ChannelPromise promise) {
459             if (!promise.setUncancellable() || !ensureOpen(promise)) {
460                 return;
461             }
462 
463             if (state == State.CONNECTED) {
464                 Exception cause = new AlreadyConnectedException();
465                 safeSetFailure(promise, cause);
466                 pipeline().fireExceptionCaught(cause);
467                 return;
468             }
469 
470             if (connectPromise != null) {
471                 throw new ConnectionPendingException();
472             }
473 
474             connectPromise = promise;
475 
476             if (state != State.BOUND) {
477                 // Not bound yet and no localAddress specified - get one.
478                 if (localAddress == null) {
479                     localAddress = new LocalAddress(LocalChannel.this);
480                 }
481             }
482 
483             if (localAddress != null) {
484                 try {
485                     doBind(localAddress);
486                 } catch (Throwable t) {
487                     safeSetFailure(promise, t);
488                     close(voidPromise());
489                     return;
490                 }
491             }
492 
493             Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
494             if (!(boundChannel instanceof LocalServerChannel)) {
495                 Exception cause = new ConnectException("connection refused: " + remoteAddress);
496                 safeSetFailure(promise, cause);
497                 close(voidPromise());
498                 return;
499             }
500 
501             LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
502             peer = serverChannel.serve(LocalChannel.this);
503         }
504     }
505 }