View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultChannelConfig;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.SingleThreadEventLoop;
28  import io.netty.util.ReferenceCountUtil;
29  import io.netty.util.concurrent.Future;
30  import io.netty.util.concurrent.SingleThreadEventExecutor;
31  import io.netty.util.internal.InternalThreadLocalMap;
32  import io.netty.util.internal.PlatformDependent;
33  import io.netty.util.internal.ThrowableUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.net.ConnectException;
38  import java.net.SocketAddress;
39  import java.nio.channels.AlreadyConnectedException;
40  import java.nio.channels.ClosedChannelException;
41  import java.nio.channels.ConnectionPendingException;
42  import java.nio.channels.NotYetConnectedException;
43  import java.util.Queue;
44  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45  
46  /**
47   * A {@link Channel} for the local transport.
48   */
49  public class LocalChannel extends AbstractChannel {
50      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
51      @SuppressWarnings({ "rawtypes" })
52      private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
53              AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
54      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
55      private static final int MAX_READER_STACK_DEPTH = 8;
56      private static final ClosedChannelException DO_WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
57              new ClosedChannelException(), LocalChannel.class, "doWrite(...)");
58      private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
59              new ClosedChannelException(), LocalChannel.class, "doClose()");
60  
61      private enum State { OPEN, BOUND, CONNECTED, CLOSED }
62  
63      private final ChannelConfig config = new DefaultChannelConfig(this);
64      // To further optimize this we could write our own SPSC queue.
65      final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
66      private final Runnable readTask = new Runnable() {
67          @Override
68          public void run() {
69              ChannelPipeline pipeline = pipeline();
70              for (;;) {
71                  Object m = inboundBuffer.poll();
72                  if (m == null) {
73                      break;
74                  }
75                  pipeline.fireChannelRead(m);
76              }
77              pipeline.fireChannelReadComplete();
78          }
79      };
80      private final Runnable shutdownHook = new Runnable() {
81          @Override
82          public void run() {
83              unsafe().close(unsafe().voidPromise());
84          }
85      };
86  
87      private volatile int state; // 0 - open, 1 - bound, 2 - connected, 3 - closed
88      private volatile LocalChannel peer;
89      private volatile LocalAddress localAddress;
90      private volatile LocalAddress remoteAddress;
91      private volatile ChannelPromise connectPromise;
92      private volatile boolean readInProgress;
93      private volatile boolean registerInProgress;
94      private volatile boolean writeInProgress;
95      private volatile Future<?> finishReadFuture;
96  
97      public LocalChannel() {
98          super(null);
99      }
100 
101     protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
102         super(parent);
103         this.peer = peer;
104         localAddress = parent.localAddress();
105         remoteAddress = peer.localAddress();
106     }
107 
108     @Override
109     public ChannelMetadata metadata() {
110         return METADATA;
111     }
112 
113     @Override
114     public ChannelConfig config() {
115         return config;
116     }
117 
118     @Override
119     public LocalServerChannel parent() {
120         return (LocalServerChannel) super.parent();
121     }
122 
123     @Override
124     public LocalAddress localAddress() {
125         return (LocalAddress) super.localAddress();
126     }
127 
128     @Override
129     public LocalAddress remoteAddress() {
130         return (LocalAddress) super.remoteAddress();
131     }
132 
133     @Override
134     public boolean isOpen() {
135         return state < 3;
136     }
137 
138     @Override
139     public boolean isActive() {
140         return state == 2;
141     }
142 
143     @Override
144     protected AbstractUnsafe newUnsafe() {
145         return new LocalUnsafe();
146     }
147 
148     @Override
149     protected boolean isCompatible(EventLoop loop) {
150         return loop instanceof SingleThreadEventLoop;
151     }
152 
153     @Override
154     protected SocketAddress localAddress0() {
155         return localAddress;
156     }
157 
158     @Override
159     protected SocketAddress remoteAddress0() {
160         return remoteAddress;
161     }
162 
163     @Override
164     protected void doRegister() throws Exception {
165         // Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
166         // This is needed as a peer may not be null also if a LocalChannel was connected before and
167         // deregistered / registered later again.
168         //
169         // See https://github.com/netty/netty/issues/2400
170         if (peer != null && parent() != null) {
171             // Store the peer in a local variable as it may be set to null if doClose() is called.
172             // Because of this we also set registerInProgress to true as we check for this in doClose() and make sure
173             // we delay the fireChannelInactive() to be fired after the fireChannelActive() and so keep the correct
174             // order of events.
175             //
176             // See https://github.com/netty/netty/issues/2144
177             final LocalChannel peer = this.peer;
178             registerInProgress = true;
179             state = 2;
180 
181             peer.remoteAddress = parent().localAddress();
182             peer.state = 2;
183 
184             // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
185             // This ensures that if both channels are on the same event loop, the peer's channelActive
186             // event is triggered *after* this channel's channelRegistered event, so that this channel's
187             // pipeline is fully initialized by ChannelInitializer before any channelRead events.
188             peer.eventLoop().execute(new Runnable() {
189                 @Override
190                 public void run() {
191                     registerInProgress = false;
192                     ChannelPromise promise = peer.connectPromise;
193 
194                     // Only trigger fireChannelActive() if the promise was not null and was not completed yet.
195                     // connectPromise may be set to null if doClose() was called in the meantime.
196                     if (promise != null && promise.trySuccess()) {
197                         peer.pipeline().fireChannelActive();
198                     }
199                 }
200             });
201         }
202         ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
203     }
204 
205     @Override
206     protected void doBind(SocketAddress localAddress) throws Exception {
207         this.localAddress =
208                 LocalChannelRegistry.register(this, this.localAddress,
209                         localAddress);
210         state = 1;
211     }
212 
213     @Override
214     protected void doDisconnect() throws Exception {
215         doClose();
216     }
217 
218     @Override
219     protected void doClose() throws Exception {
220         final LocalChannel peer = this.peer;
221         int oldState = state;
222         try {
223             if (oldState <= 2) {
224                 // Update all internal state before the closeFuture is notified.
225                 if (localAddress != null) {
226                     if (parent() == null) {
227                         LocalChannelRegistry.unregister(localAddress);
228                     }
229                     localAddress = null;
230                 }
231 
232                 // State change must happen before finishPeerRead to ensure writes are released either in doWrite or
233                 // channelRead.
234                 state = 3;
235 
236                 // Preserve order of event and force a read operation now before the close operation is processed.
237                 finishPeerRead(this);
238 
239                 ChannelPromise promise = connectPromise;
240                 if (promise != null) {
241                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
242                     promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
243                     connectPromise = null;
244                 }
245             }
246 
247             if (peer != null) {
248                 this.peer = null;
249                 // Need to execute the close in the correct EventLoop (see https://github.com/netty/netty/issues/1777).
250                 // Also check if the registration was not done yet. In this case we submit the close to the EventLoop
251                 // to make sure its run after the registration completes
252                 // (see https://github.com/netty/netty/issues/2144).
253                 EventLoop peerEventLoop = peer.eventLoop();
254                 final boolean peerIsActive = peer.isActive();
255                 if (peerEventLoop.inEventLoop() && !registerInProgress) {
256                     peer.tryClose(peerIsActive);
257                 } else {
258                     try {
259                         peerEventLoop.execute(new Runnable() {
260                             @Override
261                             public void run() {
262                                 peer.tryClose(peerIsActive);
263                             }
264                         });
265                     } catch (Throwable cause) {
266                         logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
267                                 this, peer, cause);
268                         releaseInboundBuffers();
269                         if (peerEventLoop.inEventLoop()) {
270                             peer.releaseInboundBuffers();
271                         } else {
272                             // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
273                             // rejects the close Runnable but give a best effort.
274                             peer.close();
275                         }
276                         PlatformDependent.throwException(cause);
277                     }
278                 }
279             }
280         } finally {
281             // Release all buffers if the Channel was already registered in the past and if it was not closed before.
282             if (peer != null && oldState != 3) {
283                 // We need to release all the buffers that may be put into our inbound queue since we closed the Channel
284                 // to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
285                 // means even if the promise was notified before its not really guaranteed that the "remote peer" will
286                 // see the buffer at all.
287                 releaseInboundBuffers();
288             }
289         }
290     }
291 
292     private void tryClose(boolean isActive) {
293         if (isActive) {
294             unsafe().close(unsafe().voidPromise());
295         } else {
296             releaseInboundBuffers();
297         }
298     }
299 
300     @Override
301     protected void doDeregister() throws Exception {
302         // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
303         ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
304     }
305 
306     @Override
307     protected void doBeginRead() throws Exception {
308         if (readInProgress) {
309             return;
310         }
311 
312         ChannelPipeline pipeline = pipeline();
313         Queue<Object> inboundBuffer = this.inboundBuffer;
314         if (inboundBuffer.isEmpty()) {
315             readInProgress = true;
316             return;
317         }
318 
319         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
320         final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
321         if (stackDepth < MAX_READER_STACK_DEPTH) {
322             threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
323             try {
324                 for (;;) {
325                     Object received = inboundBuffer.poll();
326                     if (received == null) {
327                         break;
328                     }
329                     pipeline.fireChannelRead(received);
330                 }
331                 pipeline.fireChannelReadComplete();
332             } finally {
333                 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
334             }
335         } else {
336             try {
337                 eventLoop().execute(readTask);
338             } catch (Throwable cause) {
339                 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
340                 close();
341                 peer.close();
342                 PlatformDependent.throwException(cause);
343             }
344         }
345     }
346 
347     @Override
348     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
349         if (state < 2) {
350             throw new NotYetConnectedException();
351         }
352         if (state > 2) {
353             throw DO_WRITE_CLOSED_CHANNEL_EXCEPTION;
354         }
355 
356         final LocalChannel peer = this.peer;
357 
358         writeInProgress = true;
359         try {
360             for (;;) {
361                 Object msg = in.current();
362                 if (msg == null) {
363                     break;
364                 }
365                 try {
366                     // It is possible the peer could have closed while we are writing, and in this case we should
367                     // simulate real socket behavior and ensure the write operation is failed.
368                     if (peer.state == 2) {
369                         peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
370                         in.remove();
371                     } else {
372                         in.remove(DO_WRITE_CLOSED_CHANNEL_EXCEPTION);
373                     }
374                 } catch (Throwable cause) {
375                     in.remove(cause);
376                 }
377             }
378         } finally {
379             // The following situation may cause trouble:
380             // 1. Write (with promise X)
381             // 2. promise X is completed when in.remove() is called, and a listener on this promise calls close()
382             // 3. Then the close event will be executed for the peer before the write events, when the write events
383             // actually happened before the close event.
384             writeInProgress = false;
385         }
386 
387         finishPeerRead(peer);
388     }
389 
390     private void finishPeerRead(final LocalChannel peer) {
391         // If the peer is also writing, then we must schedule the event on the event loop to preserve read order.
392         if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
393             finishPeerRead0(peer);
394         } else {
395             runFinishPeerReadTask(peer);
396         }
397     }
398 
399     private void runFinishPeerReadTask(final LocalChannel peer) {
400         // If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
401         // we keep track of the task, and coordinate later that our read can't happen until the peer is done.
402         final Runnable finishPeerReadTask = new Runnable() {
403             @Override
404             public void run() {
405                 finishPeerRead0(peer);
406             }
407         };
408         try {
409             if (peer.writeInProgress) {
410                 peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
411             } else {
412                 peer.eventLoop().execute(finishPeerReadTask);
413             }
414         } catch (Throwable cause) {
415             logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
416             close();
417             peer.close();
418             PlatformDependent.throwException(cause);
419         }
420     }
421 
422     private void releaseInboundBuffers() {
423         assert eventLoop() == null || eventLoop().inEventLoop();
424         readInProgress = false;
425         Queue<Object> inboundBuffer = this.inboundBuffer;
426         Object msg;
427         while ((msg = inboundBuffer.poll()) != null) {
428             ReferenceCountUtil.release(msg);
429         }
430     }
431 
432     private void finishPeerRead0(LocalChannel peer) {
433         Future<?> peerFinishReadFuture = peer.finishReadFuture;
434         if (peerFinishReadFuture != null) {
435             if (!peerFinishReadFuture.isDone()) {
436                 runFinishPeerReadTask(peer);
437                 return;
438             } else {
439                 // Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
440                 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
441             }
442         }
443         ChannelPipeline peerPipeline = peer.pipeline();
444         if (peer.readInProgress) {
445             peer.readInProgress = false;
446             for (;;) {
447                 Object received = peer.inboundBuffer.poll();
448                 if (received == null) {
449                     break;
450                 }
451                 peerPipeline.fireChannelRead(received);
452             }
453             peerPipeline.fireChannelReadComplete();
454         }
455     }
456 
457     private class LocalUnsafe extends AbstractUnsafe {
458 
459         @Override
460         public void connect(final SocketAddress remoteAddress,
461                 SocketAddress localAddress, final ChannelPromise promise) {
462             if (!promise.setUncancellable() || !ensureOpen(promise)) {
463                 return;
464             }
465 
466             if (state == 2) {
467                 Exception cause = new AlreadyConnectedException();
468                 safeSetFailure(promise, cause);
469                 pipeline().fireExceptionCaught(cause);
470                 return;
471             }
472 
473             if (connectPromise != null) {
474                 throw new ConnectionPendingException();
475             }
476 
477             connectPromise = promise;
478 
479             if (state != 1) {
480                 // Not bound yet and no localAddress specified - get one.
481                 if (localAddress == null) {
482                     localAddress = new LocalAddress(LocalChannel.this);
483                 }
484             }
485 
486             if (localAddress != null) {
487                 try {
488                     doBind(localAddress);
489                 } catch (Throwable t) {
490                     safeSetFailure(promise, t);
491                     close(voidPromise());
492                     return;
493                 }
494             }
495 
496             Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
497             if (!(boundChannel instanceof LocalServerChannel)) {
498                 Exception cause = new ConnectException("connection refused: " + remoteAddress);
499                 safeSetFailure(promise, cause);
500                 close(voidPromise());
501                 return;
502             }
503 
504             LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
505             peer = serverChannel.serve(LocalChannel.this);
506         }
507     }
508 }