View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.AbstractServerChannel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelPipeline;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.DefaultChannelConfig;
23  import io.netty.channel.EventLoop;
24  import io.netty.channel.IoEvent;
25  import io.netty.channel.IoEventLoop;
26  import io.netty.channel.IoEventLoopGroup;
27  import io.netty.channel.IoRegistration;
28  import io.netty.channel.PreferHeapByteBufAllocator;
29  import io.netty.channel.RecvByteBufAllocator;
30  import io.netty.channel.ServerChannel;
31  import io.netty.channel.ServerChannelRecvByteBufAllocator;
32  import io.netty.channel.SingleThreadEventLoop;
33  import io.netty.util.concurrent.SingleThreadEventExecutor;
34  
35  import java.net.SocketAddress;
36  import java.util.ArrayDeque;
37  import java.util.Queue;
38  
39  /**
40   * A {@link ServerChannel} for the local transport which allows in VM communication.
41   */
42  public class LocalServerChannel extends AbstractServerChannel {
43  
44      private final ChannelConfig config =
45              new DefaultChannelConfig(this, new ServerChannelRecvByteBufAllocator()) { };
46      private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
47      private final Runnable shutdownHook = new Runnable() {
48          @Override
49          public void run() {
50              unsafe().close(unsafe().voidPromise());
51          }
52      };
53  
54      private IoRegistration registration;
55  
56      private volatile int state; // 0 - open, 1 - active, 2 - closed
57      private volatile LocalAddress localAddress;
58      private volatile boolean acceptInProgress;
59  
60      public LocalServerChannel() {
61          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
62      }
63  
64      @Override
65      public ChannelConfig config() {
66          return config;
67      }
68  
69      @Override
70      public LocalAddress localAddress() {
71          return (LocalAddress) super.localAddress();
72      }
73  
74      @Override
75      public LocalAddress remoteAddress() {
76          return (LocalAddress) super.remoteAddress();
77      }
78  
79      @Override
80      public boolean isOpen() {
81          return state < 2;
82      }
83  
84      @Override
85      public boolean isActive() {
86          return state == 1;
87      }
88  
89      @Override
90      protected boolean isCompatible(EventLoop loop) {
91          return loop instanceof SingleThreadEventLoop ||
92                  (loop instanceof IoEventLoopGroup && ((IoEventLoopGroup) loop).isCompatible(LocalServerUnsafe.class));
93      }
94  
95      @Override
96      protected SocketAddress localAddress0() {
97          return localAddress;
98      }
99  
100     @Override
101     protected void doRegister(ChannelPromise promise) {
102         EventLoop loop = eventLoop();
103         if (loop instanceof IoEventLoop) {
104             assert registration == null;
105             ((IoEventLoop) loop).register((LocalServerUnsafe) unsafe()).addListener(f -> {
106                 if (f.isSuccess()) {
107                     registration = (IoRegistration) f.getNow();
108                     promise.setSuccess();
109                 } else {
110                     promise.setFailure(f.cause());
111                 }
112             });
113         } else {
114             try {
115                 ((LocalServerUnsafe) unsafe()).registerNow();
116             } catch (Throwable cause) {
117                 promise.setFailure(cause);
118                 return;
119             }
120             promise.setSuccess();
121         }
122     }
123 
124     @Override
125     protected void doBind(SocketAddress localAddress) throws Exception {
126         this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
127         state = 1;
128     }
129 
130     @Override
131     protected void doClose() throws Exception {
132         if (state <= 1) {
133             // Update all internal state before the closeFuture is notified.
134             if (localAddress != null) {
135                 LocalChannelRegistry.unregister(localAddress);
136                 localAddress = null;
137             }
138             state = 2;
139         }
140     }
141 
142     @Override
143     protected void doDeregister() throws Exception {
144         EventLoop loop = eventLoop();
145         if (loop instanceof IoEventLoop) {
146             IoRegistration registration = this.registration;
147             if (registration != null) {
148                 this.registration = null;
149                 registration.cancel();
150             }
151         } else {
152             ((LocalServerUnsafe) unsafe()).deregisterNow();
153         }
154     }
155 
156     @Override
157     protected void doBeginRead() throws Exception {
158         if (acceptInProgress) {
159             return;
160         }
161 
162         Queue<Object> inboundBuffer = this.inboundBuffer;
163         if (inboundBuffer.isEmpty()) {
164             acceptInProgress = true;
165             return;
166         }
167 
168         readInbound();
169     }
170 
171     LocalChannel serve(final LocalChannel peer) {
172         final LocalChannel child = newLocalChannel(peer);
173         if (eventLoop().inEventLoop()) {
174             serve0(child);
175         } else {
176             eventLoop().execute(new Runnable() {
177                 @Override
178                 public void run() {
179                     serve0(child);
180                 }
181             });
182         }
183         return child;
184     }
185 
186     private void readInbound() {
187         RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
188         handle.reset(config());
189         ChannelPipeline pipeline = pipeline();
190         do {
191             Object m = inboundBuffer.poll();
192             if (m == null) {
193                 break;
194             }
195             pipeline.fireChannelRead(m);
196         } while (handle.continueReading());
197         handle.readComplete();
198         pipeline.fireChannelReadComplete();
199     }
200 
201     /**
202      * A factory method for {@link LocalChannel}s. Users may override it
203      * to create custom instances of {@link LocalChannel}s.
204      */
205     protected LocalChannel newLocalChannel(LocalChannel peer) {
206         return new LocalChannel(this, peer);
207     }
208 
209     private void serve0(final LocalChannel child) {
210         inboundBuffer.add(child);
211         if (acceptInProgress) {
212             acceptInProgress = false;
213 
214             readInbound();
215         }
216     }
217 
218     @Override
219     protected AbstractUnsafe newUnsafe() {
220         return new LocalServerUnsafe();
221     }
222 
223     private class LocalServerUnsafe extends AbstractUnsafe implements LocalIoHandle {
224         @Override
225         public void close() {
226             close(voidPromise());
227         }
228 
229         @Override
230         public void handle(IoRegistration registration, IoEvent event) {
231             // NOOP
232         }
233 
234         @Override
235         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
236             safeSetFailure(promise, new UnsupportedOperationException());
237         }
238 
239         @Override
240         public void registerNow() {
241             ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
242         }
243 
244         @Override
245         public void deregisterNow() {
246             ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
247         }
248 
249         @Override
250         public void closeNow() {
251             close(voidPromise());
252         }
253     }
254 }