1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package io.netty5.channel.local;
17  
18  import io.netty5.buffer.api.DefaultBufferAllocators;
19  import io.netty5.channel.ChannelOption;
20  import io.netty5.channel.ChannelShutdownDirection;
21  import io.netty5.util.ReferenceCounted;
22  import io.netty5.util.Resource;
23  import io.netty5.buffer.api.internal.ResourceSupport;
24  import io.netty5.buffer.api.internal.Statics;
25  import io.netty5.channel.AbstractChannel;
26  import io.netty5.channel.Channel;
27  import io.netty5.channel.ChannelMetadata;
28  import io.netty5.channel.ChannelOutboundBuffer;
29  import io.netty5.channel.ChannelPipeline;
30  import io.netty5.channel.EventLoop;
31  import io.netty5.channel.RecvBufferAllocator;
32  import io.netty5.util.ReferenceCountUtil;
33  import io.netty5.util.concurrent.FastThreadLocal;
34  import io.netty5.util.concurrent.Future;
35  import io.netty5.util.internal.PlatformDependent;
36  import io.netty5.util.internal.logging.InternalLogger;
37  import io.netty5.util.internal.logging.InternalLoggerFactory;
38  
39  import java.net.ConnectException;
40  import java.net.SocketAddress;
41  import java.nio.channels.AlreadyConnectedException;
42  import java.nio.channels.ClosedChannelException;
43  import java.nio.channels.NotYetConnectedException;
44  import java.util.Queue;
45  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
46  
47  
48  
49  
50  public class LocalChannel extends AbstractChannel<LocalServerChannel, LocalAddress, LocalAddress>
51          implements LocalChannelUnsafe {
52      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
53      @SuppressWarnings("rawtypes")
54      private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
55              AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
56      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57      private static final int MAX_READER_STACK_DEPTH = 8;
58  
59      private enum State { OPEN, BOUND, CONNECTED, CLOSED }
60  
61      
62      final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
63      private final Runnable readTask = () -> {
64          
65          if (!inboundBuffer.isEmpty()) {
66              readInbound();
67          }
68      };
69  
70      private volatile State state;
71      private volatile LocalChannel peer;
72      private volatile LocalAddress localAddress;
73      private volatile LocalAddress remoteAddress;
74      private volatile boolean readInProgress;
75      private volatile boolean writeInProgress;
76      private volatile Future<?> finishReadFuture;
77      private volatile boolean inputShutdown;
78      private volatile boolean outputShutdown;
79  
80      public LocalChannel(EventLoop eventLoop) {
81          this(null, eventLoop, null);
82      }
83  
84      protected LocalChannel(LocalServerChannel parent, EventLoop eventLoop, LocalChannel peer) {
85          super(parent, eventLoop, METADATA);
86          this.peer = peer;
87          if (parent != null) {
88              localAddress = parent.localAddress();
89          }
90          if (peer != null) {
91              remoteAddress = peer.localAddress();
92          }
93          setOption(ChannelOption.BUFFER_ALLOCATOR, DefaultBufferAllocators.onHeapAllocator());
94      }
95  
96      @Override
97      public boolean isOpen() {
98          return state != State.CLOSED;
99      }
100 
101     @Override
102     public boolean isActive() {
103         return state == State.CONNECTED;
104     }
105 
106     @Override
107     protected LocalAddress localAddress0() {
108         return localAddress;
109     }
110 
111     @Override
112     protected LocalAddress remoteAddress0() {
113         return remoteAddress;
114     }
115 
116     @Override
117     protected void doBind(SocketAddress localAddress) throws Exception {
118         this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
119         state = State.BOUND;
120     }
121 
122     @Override
123     protected void doShutdown(ChannelShutdownDirection direction) {
124         switch (direction) {
125             case Inbound:
126                 inputShutdown = true;
127                 break;
128             case Outbound:
129                 outputShutdown = true;
130                 break;
131             default:
132                 throw new AssertionError();
133         }
134     }
135 
136     @Override
137     public boolean isShutdown(ChannelShutdownDirection direction) {
138         if (!isActive()) {
139             return true;
140         }
141         switch (direction) {
142             case Inbound:
143                 return inputShutdown;
144             case Outbound:
145                 return outputShutdown;
146             default:
147                 throw new AssertionError();
148         }
149     }
150 
151     @Override
152     protected void doDisconnect() throws Exception {
153         doClose();
154     }
155 
156     @Override
157     protected void doClose() throws Exception {
158         final LocalChannel peer = this.peer;
159         State oldState = state;
160         try {
161             if (oldState != State.CLOSED) {
162                 
163                 if (localAddress != null) {
164                     if (parent() == null) {
165                         LocalChannelRegistry.unregister(localAddress);
166                     }
167                     localAddress = null;
168                 }
169 
170                 
171                 
172                 state = State.CLOSED;
173 
174                 
175                 if (writeInProgress && peer != null) {
176                     finishPeerRead(peer);
177                 }
178             }
179 
180             if (peer != null) {
181                 this.peer = null;
182                 
183                 
184                 
185                 EventLoop peerEventLoop = peer.executor();
186                 final boolean peerIsActive = peer.isActive();
187                 try {
188                     peerEventLoop.execute(() -> peer.tryClose(peerIsActive));
189                 } catch (Throwable cause) {
190                     logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
191                             this, peer, cause);
192                     if (peerEventLoop.inEventLoop()) {
193                         peer.releaseInboundBuffers();
194                     } else {
195                         
196                         
197                         peer.close();
198                     }
199                     throw cause;
200                 }
201             }
202         } finally {
203             
204             if (oldState != null && oldState != State.CLOSED) {
205                 
206                 
207                 
208                 
209                 releaseInboundBuffers();
210             }
211         }
212     }
213 
214     private void tryClose(boolean isActive) {
215         if (isActive) {
216             closeTransport(newPromise());
217         } else {
218             releaseInboundBuffers();
219             closeForciblyTransport();
220         }
221     }
222 
223     private void readInbound() {
224         RecvBufferAllocator.Handle handle = recvBufAllocHandle();
225         handle.reset();
226         ChannelPipeline pipeline = pipeline();
227         do {
228             Object received = inboundBuffer.poll();
229             if (received == null) {
230                 break;
231             }
232             pipeline.fireChannelRead(received);
233         } while (handle.continueReading(isAutoRead()) && !isShutdown(ChannelShutdownDirection.Inbound));
234 
235         pipeline.fireChannelReadComplete();
236         readIfIsAutoRead();
237     }
238 
239     private static final class ReaderStackDepth {
240         private int stackDepth;
241 
242         boolean incrementIfPossible() {
243             if (stackDepth == MAX_READER_STACK_DEPTH) {
244                 return false;
245             }
246             stackDepth++;
247             return true;
248         }
249 
250         void decrement() {
251             stackDepth--;
252         }
253     }
254 
255     private static final FastThreadLocal<ReaderStackDepth> STACK_DEPTH = new FastThreadLocal<>() {
256         @Override
257         protected ReaderStackDepth initialValue() throws Exception {
258             return new ReaderStackDepth();
259         }
260     };
261 
262     @Override
263     protected void doBeginRead() throws Exception {
264         if (readInProgress) {
265             return;
266         }
267 
268         Queue<Object> inboundBuffer = this.inboundBuffer;
269         if (inboundBuffer.isEmpty()) {
270             readInProgress = true;
271             return;
272         }
273 
274         final ReaderStackDepth readerStackDepth = STACK_DEPTH.get();
275         if (readerStackDepth.incrementIfPossible()) {
276             try {
277                 readInbound();
278             } finally {
279                 readerStackDepth.decrement();
280             }
281         } else {
282             try {
283                 executor().execute(readTask);
284             } catch (Throwable cause) {
285                 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
286                 close();
287                 peer.close();
288                 throw cause;
289             }
290         }
291     }
292 
293     @Override
294     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
295         switch (state) {
296         case OPEN:
297         case BOUND:
298             throw new NotYetConnectedException();
299         case CLOSED:
300             throw new ClosedChannelException();
301         case CONNECTED:
302             break;
303         }
304 
305         final LocalChannel peer = this.peer;
306 
307         writeInProgress = true;
308         try {
309             for (;;) {
310                 Object msg = in.current();
311                 if (msg == null) {
312                     break;
313                 }
314                 try {
315                     
316                     
317                     if (peer.state == State.CONNECTED) {
318                         if (msg instanceof ReferenceCounted) {
319                             peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
320                         } else if (msg instanceof ResourceSupport) {
321                             peer.inboundBuffer.add(Statics.acquire((ResourceSupport<?, ?>) msg));
322                         } else if (msg instanceof Resource) {
323                             peer.inboundBuffer.add(((Resource<?>) msg).send().receive());
324                         } else {
325                             peer.inboundBuffer.add(msg);
326                         }
327                         in.remove();
328                     } else {
329                         break;
330                     }
331                 } catch (Throwable cause) {
332                     in.remove(cause);
333                 }
334             }
335         } finally {
336             
337             
338             
339             
340             
341             writeInProgress = false;
342         }
343 
344         finishPeerRead(peer);
345     }
346 
347     private void finishPeerRead(final LocalChannel peer) {
348         
349         if (peer.executor() == executor() && !peer.writeInProgress) {
350             finishPeerRead0(peer);
351         } else {
352             runFinishPeerReadTask(peer);
353         }
354     }
355 
356     private void runFinishPeerReadTask(final LocalChannel peer) {
357         
358         
359         final Runnable finishPeerReadTask = () -> finishPeerRead0(peer);
360         try {
361             if (peer.writeInProgress) {
362                 peer.finishReadFuture = peer.executor().submit(finishPeerReadTask);
363             } else {
364                 peer.executor().execute(finishPeerReadTask);
365             }
366         } catch (Throwable cause) {
367             logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
368             close();
369             peer.close();
370             throw cause;
371         }
372     }
373 
374     private void releaseInboundBuffers() {
375         assert executor() == null || executor().inEventLoop();
376         readInProgress = false;
377         Queue<Object> inboundBuffer = this.inboundBuffer;
378         Object msg;
379         while ((msg = inboundBuffer.poll()) != null) {
380             Resource.dispose(msg);
381         }
382     }
383 
384     private void finishPeerRead0(LocalChannel peer) {
385         Future<?> peerFinishReadFuture = peer.finishReadFuture;
386         if (peerFinishReadFuture != null) {
387             if (!peerFinishReadFuture.isDone()) {
388                 runFinishPeerReadTask(peer);
389                 return;
390             } else {
391                 
392                 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
393             }
394         }
395         
396         
397         if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
398             peer.readInProgress = false;
399             peer.readInbound();
400         }
401     }
402 
403     @Override
404     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
405         if (state == State.CONNECTED) {
406             throw new AlreadyConnectedException();
407         }
408 
409         if (state != State.BOUND) {
410             
411             if (localAddress == null) {
412                 localAddress = new LocalAddress(LocalChannel.this);
413             }
414         }
415 
416         if (localAddress != null) {
417             try {
418                 doBind(localAddress);
419             } catch (Throwable t) {
420                 closeTransport(newPromise());
421                 throw t;
422             }
423         }
424 
425         Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
426         if (!(boundChannel instanceof LocalServerChannel)) {
427             Exception cause = new ConnectException("connection refused: " + remoteAddress);
428             closeTransport(newPromise());
429             throw cause;
430         }
431 
432         LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
433         peer = serverChannel.serve(LocalChannel.this);
434         return false;
435     }
436 
437     @Override
438     protected boolean doFinishConnect(LocalAddress requestedRemoteAddress) throws Exception {
439         final LocalChannel peer = LocalChannel.this.peer;
440         if (peer == null) {
441             return false;
442         }
443         state = State.CONNECTED;
444         remoteAddress = peer.parent().localAddress();
445 
446         
447         
448         peer.writeFlushedAsync();
449         return true;
450     }
451 
452     private void writeFlushedAsync() {
453         executor().execute(this::writeFlushed);
454     }
455 
456     private void finishConnectAsync() {
457         
458         executor().execute(() -> {
459             if (isConnectPending()) {
460                 finishConnect();
461             }
462         });
463     }
464 
465     @Override
466     public void registerTransportNow() {
467         
468         
469         LocalChannel peer = this.peer;
470         if (parent() != null && peer != null) {
471             
472             state = State.CONNECTED;
473             peer.finishConnectAsync();
474         }
475     }
476 
477     @Override
478     public void deregisterTransportNow() {
479         
480     }
481 
482     @Override
483     public void closeTransportNow() {
484         closeTransport(newPromise());
485     }
486 }