View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.AbstractChannel;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultChannelConfig;
27  import io.netty.channel.EventLoop;
28  import io.netty.channel.IoEvent;
29  import io.netty.channel.IoEventLoop;
30  import io.netty.channel.IoRegistration;
31  import io.netty.channel.PreferHeapByteBufAllocator;
32  import io.netty.channel.RecvByteBufAllocator;
33  import io.netty.channel.SingleThreadEventLoop;
34  import io.netty.util.ReferenceCountUtil;
35  import io.netty.util.concurrent.Future;
36  import io.netty.util.concurrent.SingleThreadEventExecutor;
37  import io.netty.util.internal.InternalThreadLocalMap;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.net.ConnectException;
43  import java.net.SocketAddress;
44  import java.nio.channels.AlreadyConnectedException;
45  import java.nio.channels.ClosedChannelException;
46  import java.nio.channels.ConnectionPendingException;
47  import java.nio.channels.NotYetConnectedException;
48  import java.util.Queue;
49  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50  
51  /**
52   * A {@link Channel} for the local transport.
53   */
54  public class LocalChannel extends AbstractChannel {
55      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
56      @SuppressWarnings({ "rawtypes" })
57      private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
58              AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
59      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
60      private static final int MAX_READER_STACK_DEPTH = 8;
61  
62      private enum State { OPEN, BOUND, CONNECTED, CLOSED }
63  
64      private final ChannelConfig config = new DefaultChannelConfig(this);
65      // To further optimize this we could write our own SPSC queue.
66      final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
67      private final Runnable readTask = new Runnable() {
68          @Override
69          public void run() {
70              // Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete()
71              if (!inboundBuffer.isEmpty()) {
72                  readInbound();
73              }
74          }
75      };
76  
77      private final Runnable shutdownHook = new Runnable() {
78          @Override
79          public void run() {
80              unsafe().close(unsafe().voidPromise());
81          }
82      };
83  
84      private final Runnable finishReadTask = new Runnable() {
85          @Override
86          public void run() {
87              finishPeerRead0(LocalChannel.this);
88          }
89      };
90  
91      private IoRegistration registration;
92  
93      private volatile State state;
94      private volatile LocalChannel peer;
95      private volatile LocalAddress localAddress;
96      private volatile LocalAddress remoteAddress;
97      private volatile ChannelPromise connectPromise;
98      private volatile boolean readInProgress;
99      private volatile boolean writeInProgress;
100     private volatile Future<?> finishReadFuture;
101 
102     public LocalChannel() {
103         super(null);
104         config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
105     }
106 
107     protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
108         super(parent);
109         config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
110         this.peer = peer;
111         localAddress = parent.localAddress();
112         remoteAddress = peer.localAddress();
113     }
114 
115     @Override
116     public ChannelMetadata metadata() {
117         return METADATA;
118     }
119 
120     @Override
121     public ChannelConfig config() {
122         return config;
123     }
124 
125     @Override
126     public LocalServerChannel parent() {
127         return (LocalServerChannel) super.parent();
128     }
129 
130     @Override
131     public LocalAddress localAddress() {
132         return (LocalAddress) super.localAddress();
133     }
134 
135     @Override
136     public LocalAddress remoteAddress() {
137         return (LocalAddress) super.remoteAddress();
138     }
139 
140     @Override
141     public boolean isOpen() {
142         return state != State.CLOSED;
143     }
144 
145     @Override
146     public boolean isActive() {
147         return state == State.CONNECTED;
148     }
149 
150     @Override
151     protected AbstractUnsafe newUnsafe() {
152         return new LocalUnsafe();
153     }
154 
155     @Override
156     protected boolean isCompatible(EventLoop loop) {
157         return loop instanceof SingleThreadEventLoop ||
158                 (loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(LocalUnsafe.class));
159     }
160 
161     @Override
162     protected SocketAddress localAddress0() {
163         return localAddress;
164     }
165 
166     @Override
167     protected SocketAddress remoteAddress0() {
168         return remoteAddress;
169     }
170 
171     @Override
172     protected void doRegister(ChannelPromise promise) {
173         EventLoop loop = eventLoop();
174         if (loop instanceof IoEventLoop) {
175             assert registration == null;
176             ((IoEventLoop) loop).register((LocalUnsafe) unsafe()).addListener(f -> {
177                if (f.isSuccess()) {
178                    registration = (IoRegistration) f.getNow();
179                    promise.setSuccess();
180                } else {
181                    promise.setFailure(f.cause());
182                }
183             });
184         } else {
185             try {
186                 ((LocalUnsafe) unsafe()).registered();
187             } catch (Throwable cause) {
188                 promise.setFailure(cause);
189             }
190             promise.setSuccess();
191         }
192     }
193 
194     @Override
195     protected void doDeregister() throws Exception {
196         EventLoop loop = eventLoop();
197         if (loop instanceof IoEventLoop) {
198             IoRegistration registration = this.registration;
199             if (registration != null) {
200                 this.registration = null;
201                 registration.cancel();
202             }
203         } else {
204             ((LocalUnsafe) unsafe()).unregistered();
205         }
206     }
207 
208     @Override
209     protected void doBind(SocketAddress localAddress) throws Exception {
210         this.localAddress =
211                 LocalChannelRegistry.register(this, this.localAddress,
212                         localAddress);
213         state = State.BOUND;
214     }
215 
216     @Override
217     protected void doDisconnect() throws Exception {
218         doClose();
219     }
220 
221     @Override
222     protected void doClose() throws Exception {
223         final LocalChannel peer = this.peer;
224         State oldState = state;
225         try {
226             if (oldState != State.CLOSED) {
227                 // Update all internal state before the closeFuture is notified.
228                 if (localAddress != null) {
229                     if (parent() == null) {
230                         LocalChannelRegistry.unregister(localAddress);
231                     }
232                     localAddress = null;
233                 }
234 
235                 // State change must happen before finishPeerRead to ensure writes are released either in doWrite or
236                 // channelRead.
237                 state = State.CLOSED;
238 
239                 // Preserve order of event and force a read operation now before the close operation is processed.
240                 if (writeInProgress && peer != null) {
241                     finishPeerRead(peer);
242                 }
243 
244                 ChannelPromise promise = connectPromise;
245                 if (promise != null) {
246                     // Use tryFailure() instead of setFailure() to avoid the race against cancel().
247                     promise.tryFailure(new ClosedChannelException());
248                     connectPromise = null;
249                 }
250             }
251 
252             if (peer != null) {
253                 this.peer = null;
254                 // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
255                 // This ensures that if both channels are on the same event loop, the peer's channelInActive
256                 // event is triggered *after* this peer's channelInActive event
257                 EventLoop peerEventLoop = peer.eventLoop();
258                 final boolean peerIsActive = peer.isActive();
259                 try {
260                     peerEventLoop.execute(new Runnable() {
261                         @Override
262                         public void run() {
263                             peer.tryClose(peerIsActive);
264                         }
265                     });
266                 } catch (Throwable cause) {
267                     logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
268                             this, peer, cause);
269                     if (peerEventLoop.inEventLoop()) {
270                         peer.releaseInboundBuffers();
271                     } else {
272                         // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
273                         // rejects the close Runnable but give a best effort.
274                         peer.close();
275                     }
276                     PlatformDependent.throwException(cause);
277                 }
278             }
279         } finally {
280             // Release all buffers if the Channel was already registered in the past and if it was not closed before.
281             if (oldState != null && oldState != State.CLOSED) {
282                 // We need to release all the buffers that may be put into our inbound queue since we closed the Channel
283                 // to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
284                 // means even if the promise was notified before its not really guaranteed that the "remote peer" will
285                 // see the buffer at all.
286                 releaseInboundBuffers();
287             }
288         }
289     }
290 
291     private void tryClose(boolean isActive) {
292         if (isActive) {
293             unsafe().close(unsafe().voidPromise());
294         } else {
295             releaseInboundBuffers();
296         }
297     }
298 
299     private void readInbound() {
300         RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
301         handle.reset(config());
302         ChannelPipeline pipeline = pipeline();
303         do {
304             Object received = inboundBuffer.poll();
305             if (received == null) {
306                 break;
307             }
308             if (received instanceof ByteBuf && inboundBuffer.peek() instanceof ByteBuf) {
309                 ByteBuf msg = (ByteBuf) received;
310                 ByteBuf output = handle.allocate(alloc());
311                 if (msg.readableBytes() < output.writableBytes()) {
312                     // We have an opportunity to coalesce buffers.
313                     output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
314                     msg.release();
315                     while ((received = inboundBuffer.peek()) instanceof ByteBuf &&
316                             ((ByteBuf) received).readableBytes() < output.writableBytes()) {
317                         inboundBuffer.poll();
318                         msg = (ByteBuf) received;
319                         output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
320                         msg.release();
321                     }
322                     handle.lastBytesRead(output.readableBytes());
323                     received = output; // Send the coalesced buffer down the pipeline.
324                 } else {
325                     // It won't be profitable to coalesce buffers this time around.
326                     handle.lastBytesRead(output.capacity());
327                     output.release();
328                 }
329             }
330             handle.incMessagesRead(1);
331             pipeline.fireChannelRead(received);
332         } while (handle.continueReading());
333         handle.readComplete();
334         pipeline.fireChannelReadComplete();
335     }
336 
337     @Override
338     protected void doBeginRead() throws Exception {
339         if (readInProgress) {
340             return;
341         }
342 
343         Queue<Object> inboundBuffer = this.inboundBuffer;
344         if (inboundBuffer.isEmpty()) {
345             readInProgress = true;
346             return;
347         }
348 
349         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
350         final int stackDepth = threadLocals.localChannelReaderStackDepth();
351         if (stackDepth < MAX_READER_STACK_DEPTH) {
352             threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
353             try {
354                 readInbound();
355             } finally {
356                 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
357             }
358         } else {
359             try {
360                 eventLoop().execute(readTask);
361             } catch (Throwable cause) {
362                 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
363                 close();
364                 peer.close();
365                 PlatformDependent.throwException(cause);
366             }
367         }
368     }
369 
370     @Override
371     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
372         switch (state) {
373         case OPEN:
374         case BOUND:
375             throw new NotYetConnectedException();
376         case CLOSED:
377             throw new ClosedChannelException();
378         case CONNECTED:
379             break;
380         }
381 
382         final LocalChannel peer = this.peer;
383 
384         writeInProgress = true;
385         try {
386             ClosedChannelException exception = null;
387             for (;;) {
388                 Object msg = in.current();
389                 if (msg == null) {
390                     break;
391                 }
392                 try {
393                     // It is possible the peer could have closed while we are writing, and in this case we should
394                     // simulate real socket behavior and ensure the write operation is failed.
395                     if (peer.state == State.CONNECTED) {
396                         peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
397                         in.remove();
398                     } else {
399                         if (exception == null) {
400                             exception = new ClosedChannelException();
401                         }
402                         in.remove(exception);
403                     }
404                 } catch (Throwable cause) {
405                     in.remove(cause);
406                 }
407             }
408         } finally {
409             // The following situation may cause trouble:
410             // 1. Write (with promise X)
411             // 2. promise X is completed when in.remove() is called, and a listener on this promise calls close()
412             // 3. Then the close event will be executed for the peer before the write events, when the write events
413             // actually happened before the close event.
414             writeInProgress = false;
415         }
416 
417         finishPeerRead(peer);
418     }
419 
420     private void finishPeerRead(final LocalChannel peer) {
421         // If the peer is also writing, then we must schedule the event on the event loop to preserve read order.
422         if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
423             finishPeerRead0(peer);
424         } else {
425             runFinishPeerReadTask(peer);
426         }
427     }
428 
429     private void runFinishTask0() {
430         // If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
431         // we keep track of the task, and coordinate later that our read can't happen until the peer is done.
432         if (writeInProgress) {
433             finishReadFuture = eventLoop().submit(finishReadTask);
434         } else {
435             eventLoop().execute(finishReadTask);
436         }
437     }
438 
439     private void runFinishPeerReadTask(final LocalChannel peer) {
440         try {
441             peer.runFinishTask0();
442         } catch (Throwable cause) {
443             logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
444             close();
445             peer.close();
446             PlatformDependent.throwException(cause);
447         }
448     }
449 
450     private void releaseInboundBuffers() {
451         assert eventLoop() == null || eventLoop().inEventLoop();
452         readInProgress = false;
453         Queue<Object> inboundBuffer = this.inboundBuffer;
454         Object msg;
455         while ((msg = inboundBuffer.poll()) != null) {
456             ReferenceCountUtil.release(msg);
457         }
458     }
459 
460     private void finishPeerRead0(LocalChannel peer) {
461         Future<?> peerFinishReadFuture = peer.finishReadFuture;
462         if (peerFinishReadFuture != null) {
463             if (!peerFinishReadFuture.isDone()) {
464                 runFinishPeerReadTask(peer);
465                 return;
466             } else {
467                 // Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
468                 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
469             }
470         }
471         // We should only set readInProgress to false if there is any data that was read as otherwise we may miss to
472         // forward data later on.
473         if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
474             peer.readInProgress = false;
475             peer.readInbound();
476         }
477     }
478 
479     private class LocalUnsafe extends AbstractUnsafe implements LocalIoHandle {
480 
481         @Override
482         public void close() {
483             close(voidPromise());
484         }
485 
486         @Override
487         public void handle(IoRegistration registration, IoEvent event) {
488             // NOOP
489         }
490 
491         @Override
492         public void registered() {
493             // Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
494             // This is needed as a peer may not be null also if a LocalChannel was connected before and
495             // deregistered / registered later again.
496             //
497             // See https://github.com/netty/netty/issues/2400
498             if (peer != null && parent() != null) {
499                 // Store the peer in a local variable as it may be set to null if doClose() is called.
500                 // See https://github.com/netty/netty/issues/2144
501                 final LocalChannel peer = LocalChannel.this.peer;
502                 state = State.CONNECTED;
503 
504                 peer.remoteAddress = parent() == null ? null : parent().localAddress();
505                 peer.state = State.CONNECTED;
506 
507                 // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
508                 // This ensures that if both channels are on the same event loop, the peer's channelActive
509                 // event is triggered *after* this channel's channelRegistered event, so that this channel's
510                 // pipeline is fully initialized by ChannelInitializer before any channelRead events.
511                 peer.eventLoop().execute(new Runnable() {
512                     @Override
513                     public void run() {
514                         ChannelPromise promise = peer.connectPromise;
515 
516                         // Only trigger fireChannelActive() if the promise was not null and was not completed yet.
517                         // connectPromise may be set to null if doClose() was called in the meantime.
518                         if (promise != null && promise.trySuccess()) {
519                             peer.pipeline().fireChannelActive();
520                         }
521                     }
522                 });
523             }
524             ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
525         }
526 
527         @Override
528         public void unregistered() {
529             // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
530             ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
531         }
532 
533         @Override
534         public void closeNow() {
535             close(voidPromise());
536         }
537 
538         @Override
539         public void connect(final SocketAddress remoteAddress,
540                 SocketAddress localAddress, final ChannelPromise promise) {
541             if (!promise.setUncancellable() || !ensureOpen(promise)) {
542                 return;
543             }
544 
545             if (state == State.CONNECTED) {
546                 Exception cause = new AlreadyConnectedException();
547                 safeSetFailure(promise, cause);
548                 return;
549             }
550 
551             if (connectPromise != null) {
552                 throw new ConnectionPendingException();
553             }
554 
555             connectPromise = promise;
556 
557             if (state != State.BOUND) {
558                 // Not bound yet and no localAddress specified - get one.
559                 if (localAddress == null) {
560                     localAddress = new LocalAddress(LocalChannel.this);
561                 }
562             }
563 
564             if (localAddress != null) {
565                 try {
566                     doBind(localAddress);
567                 } catch (Throwable t) {
568                     safeSetFailure(promise, t);
569                     close(voidPromise());
570                     return;
571                 }
572             }
573 
574             Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
575             if (!(boundChannel instanceof LocalServerChannel)) {
576                 Exception cause = new ConnectException("connection refused: " + remoteAddress);
577                 safeSetFailure(promise, cause);
578                 close(voidPromise());
579                 return;
580             }
581 
582             LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
583             peer = serverChannel.serve(LocalChannel.this);
584         }
585     }
586 }