View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.local;
17  
18  import io.netty5.buffer.api.DefaultBufferAllocators;
19  import io.netty5.channel.ChannelOption;
20  import io.netty5.channel.ChannelShutdownDirection;
21  import io.netty5.util.ReferenceCounted;
22  import io.netty5.util.Resource;
23  import io.netty5.buffer.api.internal.ResourceSupport;
24  import io.netty5.buffer.api.internal.Statics;
25  import io.netty5.channel.AbstractChannel;
26  import io.netty5.channel.Channel;
27  import io.netty5.channel.ChannelMetadata;
28  import io.netty5.channel.ChannelOutboundBuffer;
29  import io.netty5.channel.ChannelPipeline;
30  import io.netty5.channel.EventLoop;
31  import io.netty5.channel.RecvBufferAllocator;
32  import io.netty5.util.ReferenceCountUtil;
33  import io.netty5.util.concurrent.FastThreadLocal;
34  import io.netty5.util.concurrent.Future;
35  import io.netty5.util.internal.PlatformDependent;
36  import io.netty5.util.internal.logging.InternalLogger;
37  import io.netty5.util.internal.logging.InternalLoggerFactory;
38  
39  import java.net.ConnectException;
40  import java.net.SocketAddress;
41  import java.nio.channels.AlreadyConnectedException;
42  import java.nio.channels.ClosedChannelException;
43  import java.nio.channels.NotYetConnectedException;
44  import java.util.Queue;
45  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
46  
47  /**
48   * A {@link Channel} for the local transport.
49   */
50  public class LocalChannel extends AbstractChannel<LocalServerChannel, LocalAddress, LocalAddress>
51          implements LocalChannelUnsafe {
52      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
53      @SuppressWarnings("rawtypes")
54      private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
55              AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
56      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57      private static final int MAX_READER_STACK_DEPTH = 8;
58  
59      private enum State { OPEN, BOUND, CONNECTED, CLOSED }
60  
61      // To further optimize this we could write our own SPSC queue.
62      final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
63      private final Runnable readTask = () -> {
64          // Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete()
65          if (!inboundBuffer.isEmpty()) {
66              readInbound();
67          }
68      };
69  
70      private volatile State state;
71      private volatile LocalChannel peer;
72      private volatile LocalAddress localAddress;
73      private volatile LocalAddress remoteAddress;
74      private volatile boolean readInProgress;
75      private volatile boolean writeInProgress;
76      private volatile Future<?> finishReadFuture;
77      private volatile boolean inputShutdown;
78      private volatile boolean outputShutdown;
79  
80      public LocalChannel(EventLoop eventLoop) {
81          this(null, eventLoop, null);
82      }
83  
84      protected LocalChannel(LocalServerChannel parent, EventLoop eventLoop, LocalChannel peer) {
85          super(parent, eventLoop, METADATA);
86          this.peer = peer;
87          if (parent != null) {
88              localAddress = parent.localAddress();
89          }
90          if (peer != null) {
91              remoteAddress = peer.localAddress();
92          }
93          setOption(ChannelOption.BUFFER_ALLOCATOR, DefaultBufferAllocators.onHeapAllocator());
94      }
95  
96      @Override
97      public boolean isOpen() {
98          return state != State.CLOSED;
99      }
100 
101     @Override
102     public boolean isActive() {
103         return state == State.CONNECTED;
104     }
105 
106     @Override
107     protected LocalAddress localAddress0() {
108         return localAddress;
109     }
110 
111     @Override
112     protected LocalAddress remoteAddress0() {
113         return remoteAddress;
114     }
115 
116     @Override
117     protected void doBind(SocketAddress localAddress) throws Exception {
118         this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
119         state = State.BOUND;
120     }
121 
122     @Override
123     protected void doShutdown(ChannelShutdownDirection direction) {
124         switch (direction) {
125             case Inbound:
126                 inputShutdown = true;
127                 break;
128             case Outbound:
129                 outputShutdown = true;
130                 break;
131             default:
132                 throw new AssertionError();
133         }
134     }
135 
136     @Override
137     public boolean isShutdown(ChannelShutdownDirection direction) {
138         if (!isActive()) {
139             return true;
140         }
141         switch (direction) {
142             case Inbound:
143                 return inputShutdown;
144             case Outbound:
145                 return outputShutdown;
146             default:
147                 throw new AssertionError();
148         }
149     }
150 
151     @Override
152     protected void doDisconnect() throws Exception {
153         doClose();
154     }
155 
156     @Override
157     protected void doClose() throws Exception {
158         final LocalChannel peer = this.peer;
159         State oldState = state;
160         try {
161             if (oldState != State.CLOSED) {
162                 // Update all internal state before the closeFuture is notified.
163                 if (localAddress != null) {
164                     if (parent() == null) {
165                         LocalChannelRegistry.unregister(localAddress);
166                     }
167                     localAddress = null;
168                 }
169 
170                 // State change must happen before finishPeerRead to ensure writes are released either in doWrite or
171                 // channelRead.
172                 state = State.CLOSED;
173 
174                 // Preserve order of event and force a read operation now before the close operation is processed.
175                 if (writeInProgress && peer != null) {
176                     finishPeerRead(peer);
177                 }
178             }
179 
180             if (peer != null) {
181                 this.peer = null;
182                 // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
183                 // This ensures that if both channels are on the same event loop, the peer's channelInActive
184                 // event is triggered *after* this peer's channelInActive event
185                 EventLoop peerEventLoop = peer.executor();
186                 final boolean peerIsActive = peer.isActive();
187                 try {
188                     peerEventLoop.execute(() -> peer.tryClose(peerIsActive));
189                 } catch (Throwable cause) {
190                     logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
191                             this, peer, cause);
192                     if (peerEventLoop.inEventLoop()) {
193                         peer.releaseInboundBuffers();
194                     } else {
195                         // inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
196                         // rejects the close Runnable but give a best effort.
197                         peer.close();
198                     }
199                     throw cause;
200                 }
201             }
202         } finally {
203             // Release all buffers if the Channel was already registered in the past and if it was not closed before.
204             if (oldState != null && oldState != State.CLOSED) {
205                 // We need to release all the buffers that may be put into our inbound queue since we closed the Channel
206                 // to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
207                 // means even if the promise was notified before its not really guaranteed that the "remote peer" will
208                 // see the buffer at all.
209                 releaseInboundBuffers();
210             }
211         }
212     }
213 
214     private void tryClose(boolean isActive) {
215         if (isActive) {
216             closeTransport(newPromise());
217         } else {
218             releaseInboundBuffers();
219             closeForciblyTransport();
220         }
221     }
222 
223     private void readInbound() {
224         RecvBufferAllocator.Handle handle = recvBufAllocHandle();
225         handle.reset();
226         ChannelPipeline pipeline = pipeline();
227         do {
228             Object received = inboundBuffer.poll();
229             if (received == null) {
230                 break;
231             }
232             pipeline.fireChannelRead(received);
233         } while (handle.continueReading(isAutoRead()) && !isShutdown(ChannelShutdownDirection.Inbound));
234 
235         pipeline.fireChannelReadComplete();
236         readIfIsAutoRead();
237     }
238 
239     private static final class ReaderStackDepth {
240         private int stackDepth;
241 
242         boolean incrementIfPossible() {
243             if (stackDepth == MAX_READER_STACK_DEPTH) {
244                 return false;
245             }
246             stackDepth++;
247             return true;
248         }
249 
250         void decrement() {
251             stackDepth--;
252         }
253     }
254 
255     private static final FastThreadLocal<ReaderStackDepth> STACK_DEPTH = new FastThreadLocal<>() {
256         @Override
257         protected ReaderStackDepth initialValue() throws Exception {
258             return new ReaderStackDepth();
259         }
260     };
261 
262     @Override
263     protected void doBeginRead() throws Exception {
264         if (readInProgress) {
265             return;
266         }
267 
268         Queue<Object> inboundBuffer = this.inboundBuffer;
269         if (inboundBuffer.isEmpty()) {
270             readInProgress = true;
271             return;
272         }
273 
274         final ReaderStackDepth readerStackDepth = STACK_DEPTH.get();
275         if (readerStackDepth.incrementIfPossible()) {
276             try {
277                 readInbound();
278             } finally {
279                 readerStackDepth.decrement();
280             }
281         } else {
282             try {
283                 executor().execute(readTask);
284             } catch (Throwable cause) {
285                 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
286                 close();
287                 peer.close();
288                 throw cause;
289             }
290         }
291     }
292 
293     @Override
294     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
295         switch (state) {
296         case OPEN:
297         case BOUND:
298             throw new NotYetConnectedException();
299         case CLOSED:
300             throw new ClosedChannelException();
301         case CONNECTED:
302             break;
303         }
304 
305         final LocalChannel peer = this.peer;
306 
307         writeInProgress = true;
308         try {
309             for (;;) {
310                 Object msg = in.current();
311                 if (msg == null) {
312                     break;
313                 }
314                 try {
315                     // It is possible the peer could have closed while we are writing, and in this case we should
316                     // simulate real socket behavior and ensure the write operation is failed.
317                     if (peer.state == State.CONNECTED) {
318                         if (msg instanceof ReferenceCounted) {
319                             peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
320                         } else if (msg instanceof ResourceSupport) {
321                             peer.inboundBuffer.add(Statics.acquire((ResourceSupport<?, ?>) msg));
322                         } else if (msg instanceof Resource) {
323                             peer.inboundBuffer.add(((Resource<?>) msg).send().receive());
324                         } else {
325                             peer.inboundBuffer.add(msg);
326                         }
327                         in.remove();
328                     } else {
329                         break;
330                     }
331                 } catch (Throwable cause) {
332                     in.remove(cause);
333                 }
334             }
335         } finally {
336             // The following situation may cause trouble:
337             // 1. Write (with promise X)
338             // 2. promise X is completed when in.remove() is called, and a listener on this promise calls close()
339             // 3. Then the close event will be executed for the peer before the write events, when the write events
340             // actually happened before the close event.
341             writeInProgress = false;
342         }
343 
344         finishPeerRead(peer);
345     }
346 
347     private void finishPeerRead(final LocalChannel peer) {
348         // If the peer is also writing, then we must schedule the event on the event loop to preserve read order.
349         if (peer.executor() == executor() && !peer.writeInProgress) {
350             finishPeerRead0(peer);
351         } else {
352             runFinishPeerReadTask(peer);
353         }
354     }
355 
356     private void runFinishPeerReadTask(final LocalChannel peer) {
357         // If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
358         // we keep track of the task, and coordinate later that our read can't happen until the peer is done.
359         final Runnable finishPeerReadTask = () -> finishPeerRead0(peer);
360         try {
361             if (peer.writeInProgress) {
362                 peer.finishReadFuture = peer.executor().submit(finishPeerReadTask);
363             } else {
364                 peer.executor().execute(finishPeerReadTask);
365             }
366         } catch (Throwable cause) {
367             logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
368             close();
369             peer.close();
370             throw cause;
371         }
372     }
373 
374     private void releaseInboundBuffers() {
375         assert executor() == null || executor().inEventLoop();
376         readInProgress = false;
377         Queue<Object> inboundBuffer = this.inboundBuffer;
378         Object msg;
379         while ((msg = inboundBuffer.poll()) != null) {
380             Resource.dispose(msg);
381         }
382     }
383 
384     private void finishPeerRead0(LocalChannel peer) {
385         Future<?> peerFinishReadFuture = peer.finishReadFuture;
386         if (peerFinishReadFuture != null) {
387             if (!peerFinishReadFuture.isDone()) {
388                 runFinishPeerReadTask(peer);
389                 return;
390             } else {
391                 // Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
392                 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
393             }
394         }
395         // We should only set readInProgress to false if there is any data that was read as otherwise we may miss to
396         // forward data later on.
397         if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
398             peer.readInProgress = false;
399             peer.readInbound();
400         }
401     }
402 
403     @Override
404     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
405         if (state == State.CONNECTED) {
406             throw new AlreadyConnectedException();
407         }
408 
409         if (state != State.BOUND) {
410             // Not bound yet and no localAddress specified - get one.
411             if (localAddress == null) {
412                 localAddress = new LocalAddress(LocalChannel.this);
413             }
414         }
415 
416         if (localAddress != null) {
417             try {
418                 doBind(localAddress);
419             } catch (Throwable t) {
420                 closeTransport(newPromise());
421                 throw t;
422             }
423         }
424 
425         Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
426         if (!(boundChannel instanceof LocalServerChannel)) {
427             Exception cause = new ConnectException("connection refused: " + remoteAddress);
428             closeTransport(newPromise());
429             throw cause;
430         }
431 
432         LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
433         peer = serverChannel.serve(LocalChannel.this);
434         return false;
435     }
436 
437     @Override
438     protected boolean doFinishConnect(LocalAddress requestedRemoteAddress) throws Exception {
439         final LocalChannel peer = LocalChannel.this.peer;
440         if (peer == null) {
441             return false;
442         }
443         state = State.CONNECTED;
444         remoteAddress = peer.parent().localAddress();
445 
446         // As we changed our state to connected now we also need to try to flush the previous queued messages by the
447         // peer.
448         peer.writeFlushedAsync();
449         return true;
450     }
451 
452     private void writeFlushedAsync() {
453         executor().execute(this::writeFlushed);
454     }
455 
456     private void finishConnectAsync() {
457         // We always dispatch to also ensure correct ordering if the peer Channel is on the same EventLoop
458         executor().execute(() -> {
459             if (isConnectPending()) {
460                 finishConnect();
461             }
462         });
463     }
464 
465     @Override
466     public void registerTransportNow() {
467         // Store the peer in a local variable as it may be set to null if doClose() is called.
468         // See https://github.com/netty/netty/issues/2144
469         LocalChannel peer = this.peer;
470         if (parent() != null && peer != null) {
471             // Mark this Channel as active before finish the connect on the remote peer.
472             state = State.CONNECTED;
473             peer.finishConnectAsync();
474         }
475     }
476 
477     @Override
478     public void deregisterTransportNow() {
479         // Noop
480     }
481 
482     @Override
483     public void closeTransportNow() {
484         closeTransport(newPromise());
485     }
486 }