View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.AbstractServerChannel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelPipeline;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.DefaultChannelConfig;
23  import io.netty.channel.EventLoop;
24  import io.netty.channel.IoEvent;
25  import io.netty.channel.IoEventLoop;
26  import io.netty.channel.IoEventLoopGroup;
27  import io.netty.channel.IoRegistration;
28  import io.netty.channel.PreferHeapByteBufAllocator;
29  import io.netty.channel.RecvByteBufAllocator;
30  import io.netty.channel.ServerChannel;
31  import io.netty.channel.ServerChannelRecvByteBufAllocator;
32  import io.netty.channel.SingleThreadEventLoop;
33  import io.netty.util.concurrent.SingleThreadEventExecutor;
34  
35  import java.net.SocketAddress;
36  import java.util.ArrayDeque;
37  import java.util.Queue;
38  
39  /**
40   * A {@link ServerChannel} for the local transport which allows in VM communication.
41   */
42  public class LocalServerChannel extends AbstractServerChannel {
43  
44      private final ChannelConfig config =
45              new DefaultChannelConfig(this, new ServerChannelRecvByteBufAllocator()) { };
46      private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
47  
48      private IoRegistration registration;
49  
50      private volatile int state; // 0 - open, 1 - active, 2 - closed
51      private volatile LocalAddress localAddress;
52      private volatile boolean acceptInProgress;
53  
54      public LocalServerChannel() {
55          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
56      }
57  
58      @Override
59      public ChannelConfig config() {
60          return config;
61      }
62  
63      @Override
64      public LocalAddress localAddress() {
65          return (LocalAddress) super.localAddress();
66      }
67  
68      @Override
69      public LocalAddress remoteAddress() {
70          return (LocalAddress) super.remoteAddress();
71      }
72  
73      @Override
74      public boolean isOpen() {
75          return state < 2;
76      }
77  
78      @Override
79      public boolean isActive() {
80          return state == 1;
81      }
82  
83      @Override
84      protected boolean isCompatible(EventLoop loop) {
85          return loop instanceof SingleThreadEventLoop ||
86                  (loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(LocalServerUnsafe.class));
87      }
88  
89      @Override
90      protected SocketAddress localAddress0() {
91          return localAddress;
92      }
93  
94      @Override
95      protected void doRegister(ChannelPromise promise) {
96          EventLoop loop = eventLoop();
97          if (loop instanceof IoEventLoop) {
98              assert registration == null;
99              ((IoEventLoop) loop).register((LocalServerUnsafe) unsafe()).addListener(f -> {
100                 if (f.isSuccess()) {
101                     registration = (IoRegistration) f.getNow();
102                     promise.setSuccess();
103                 } else {
104                     promise.setFailure(f.cause());
105                 }
106             });
107         } else {
108             try {
109                 ((LocalServerUnsafe) unsafe()).registered();
110             } catch (Throwable cause) {
111                 promise.setFailure(cause);
112                 return;
113             }
114             promise.setSuccess();
115         }
116     }
117 
118     @Override
119     protected void doBind(SocketAddress localAddress) throws Exception {
120         this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
121         state = 1;
122     }
123 
124     @Override
125     protected void doClose() throws Exception {
126         if (state <= 1) {
127             // Update all internal state before the closeFuture is notified.
128             if (localAddress != null) {
129                 LocalChannelRegistry.unregister(localAddress);
130                 localAddress = null;
131             }
132             state = 2;
133         }
134     }
135 
136     @Override
137     protected void doDeregister() throws Exception {
138         EventLoop loop = eventLoop();
139         if (loop instanceof IoEventLoop) {
140             IoRegistration registration = this.registration;
141             if (registration != null) {
142                 this.registration = null;
143                 registration.cancel();
144             }
145         } else {
146             ((LocalServerUnsafe) unsafe()).unregistered();
147         }
148     }
149 
150     @Override
151     protected void doBeginRead() throws Exception {
152         if (acceptInProgress) {
153             return;
154         }
155 
156         Queue<Object> inboundBuffer = this.inboundBuffer;
157         if (inboundBuffer.isEmpty()) {
158             acceptInProgress = true;
159             return;
160         }
161 
162         readInbound();
163     }
164 
165     LocalChannel serve(final LocalChannel peer) {
166         final LocalChannel child = newLocalChannel(peer);
167         if (eventLoop().inEventLoop()) {
168             serve0(child);
169         } else {
170             eventLoop().execute(new Runnable() {
171                 @Override
172                 public void run() {
173                     serve0(child);
174                 }
175             });
176         }
177         return child;
178     }
179 
180     private void readInbound() {
181         RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
182         handle.reset(config());
183         ChannelPipeline pipeline = pipeline();
184         do {
185             Object m = inboundBuffer.poll();
186             if (m == null) {
187                 break;
188             }
189             pipeline.fireChannelRead(m);
190         } while (handle.continueReading());
191         handle.readComplete();
192         pipeline.fireChannelReadComplete();
193     }
194 
195     /**
196      * A factory method for {@link LocalChannel}s. Users may override it
197      * to create custom instances of {@link LocalChannel}s.
198      */
199     protected LocalChannel newLocalChannel(LocalChannel peer) {
200         return new LocalChannel(this, peer);
201     }
202 
203     private void serve0(final LocalChannel child) {
204         inboundBuffer.add(child);
205         if (acceptInProgress) {
206             acceptInProgress = false;
207 
208             readInbound();
209         }
210     }
211 
212     @Override
213     protected AbstractUnsafe newUnsafe() {
214         return new LocalServerUnsafe();
215     }
216 
217     private class LocalServerUnsafe extends AbstractUnsafe implements LocalIoHandle {
218         private final Runnable shutdownHook = this::closeNow;
219 
220         @Override
221         public void close() {
222             close(voidPromise());
223         }
224 
225         @Override
226         public void handle(IoRegistration registration, IoEvent event) {
227             // NOOP
228         }
229 
230         @Override
231         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
232             safeSetFailure(promise, new UnsupportedOperationException());
233         }
234 
235         @Override
236         public void registered() {
237             EventLoop loop = eventLoop();
238             if (!(loop instanceof IoEventLoop) && loop instanceof SingleThreadEventExecutor) {
239                 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
240             }
241         }
242 
243         @Override
244         public void unregistered() {
245             EventLoop loop = eventLoop();
246             if (!(loop instanceof IoEventLoop) && loop instanceof SingleThreadEventExecutor) {
247                 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
248             }
249         }
250 
251         @Override
252         public void closeNow() {
253             close(voidPromise());
254         }
255     }
256 }