View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.local;
17  
18  import io.netty5.buffer.api.DefaultBufferAllocators;
19  import io.netty5.channel.AbstractServerChannel;
20  import io.netty5.channel.ChannelOption;
21  import io.netty5.channel.ChannelPipeline;
22  import io.netty5.channel.ChannelShutdownDirection;
23  import io.netty5.channel.EventLoop;
24  import io.netty5.channel.EventLoopGroup;
25  import io.netty5.channel.RecvBufferAllocator;
26  import io.netty5.channel.ServerChannel;
27  import io.netty5.channel.ServerChannelRecvBufferAllocator;
28  
29  import java.net.SocketAddress;
30  import java.util.ArrayDeque;
31  import java.util.Queue;
32  
33  /**
34   * A {@link ServerChannel} for the local transport which allows in VM communication.
35   */
36  public class LocalServerChannel extends AbstractServerChannel<LocalChannel, LocalAddress, LocalAddress>
37          implements LocalChannelUnsafe {
38  
39      private final Queue<Object> inboundBuffer = new ArrayDeque<>();
40      private volatile int state; // 0 - open, 1 - active, 2 - closed
41      private volatile LocalAddress localAddress;
42      private volatile boolean acceptInProgress;
43  
44      public LocalServerChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup) {
45          super(eventLoop, childEventLoopGroup, LocalChannel.class);
46          setOption(ChannelOption.BUFFER_ALLOCATOR, DefaultBufferAllocators.onHeapAllocator());
47      }
48  
49      @Override
50      public boolean isOpen() {
51          return state < 2;
52      }
53  
54      @Override
55      public boolean isActive() {
56          return state == 1;
57      }
58  
59      @Override
60      protected LocalAddress localAddress0() {
61          return localAddress;
62      }
63  
64      @Override
65      protected void doBind(SocketAddress localAddress) throws Exception {
66          this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
67          state = 1;
68      }
69  
70      @Override
71      protected void doClose() throws Exception {
72          if (state <= 1) {
73              // Update all internal state before the closeFuture is notified.
74              if (localAddress != null) {
75                  LocalChannelRegistry.unregister(localAddress);
76                  localAddress = null;
77              }
78              state = 2;
79          }
80      }
81  
82      @Override
83      protected void doBeginRead() throws Exception {
84          if (acceptInProgress) {
85              return;
86          }
87  
88          Queue<Object> inboundBuffer = this.inboundBuffer;
89          if (inboundBuffer.isEmpty()) {
90              acceptInProgress = true;
91              return;
92          }
93  
94          readInbound();
95      }
96  
97      LocalChannel serve(final LocalChannel peer) {
98          final LocalChannel child = newLocalChannel(peer);
99          if (executor().inEventLoop()) {
100             serve0(child);
101         } else {
102             executor().execute(() -> serve0(child));
103         }
104         return child;
105     }
106 
107     private void readInbound() {
108         RecvBufferAllocator.Handle handle = recvBufAllocHandle();
109         handle.reset();
110         ChannelPipeline pipeline = pipeline();
111         do {
112             Object m = inboundBuffer.poll();
113             if (m == null) {
114                 break;
115             }
116             pipeline.fireChannelRead(m);
117         } while (handle.continueReading(isAutoRead()) && !isShutdown(ChannelShutdownDirection.Inbound));
118 
119         pipeline.fireChannelReadComplete();
120         readIfIsAutoRead();
121     }
122 
123     /**
124      * A factory method for {@link LocalChannel}s. Users may override it
125      * to create custom instances of {@link LocalChannel}s.
126      */
127     protected LocalChannel newLocalChannel(LocalChannel peer) {
128         return new LocalChannel(this, childEventLoopGroup().next(), peer);
129     }
130 
131     private void serve0(final LocalChannel child) {
132         inboundBuffer.add(child);
133         if (acceptInProgress) {
134             acceptInProgress = false;
135 
136             readInbound();
137         }
138     }
139 
140     @Override
141     public void registerTransportNow() {
142     }
143 
144     @Override
145     public void deregisterTransportNow() {
146     }
147 
148     @Override
149     public void closeTransportNow() {
150         closeTransport(newPromise());
151     }
152 }