View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.AbstractServerChannel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelPipeline;
21  import io.netty.channel.DefaultChannelConfig;
22  import io.netty.channel.EventLoop;
23  import io.netty.channel.PreferHeapByteBufAllocator;
24  import io.netty.channel.RecvByteBufAllocator;
25  import io.netty.channel.ServerChannel;
26  import io.netty.channel.ServerChannelRecvByteBufAllocator;
27  import io.netty.channel.SingleThreadEventLoop;
28  import io.netty.util.concurrent.SingleThreadEventExecutor;
29  
30  import java.net.SocketAddress;
31  import java.util.ArrayDeque;
32  import java.util.Queue;
33  
34  /**
35   * A {@link ServerChannel} for the local transport which allows in VM communication.
36   */
37  public class LocalServerChannel extends AbstractServerChannel {
38  
39      private final ChannelConfig config =
40              new DefaultChannelConfig(this, new ServerChannelRecvByteBufAllocator()) { };
41      private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
42      private final Runnable shutdownHook = new Runnable() {
43          @Override
44          public void run() {
45              unsafe().close(unsafe().voidPromise());
46          }
47      };
48  
49      private volatile int state; // 0 - open, 1 - active, 2 - closed
50      private volatile LocalAddress localAddress;
51      private volatile boolean acceptInProgress;
52  
53      public LocalServerChannel() {
54          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
55      }
56  
57      @Override
58      public ChannelConfig config() {
59          return config;
60      }
61  
62      @Override
63      public LocalAddress localAddress() {
64          return (LocalAddress) super.localAddress();
65      }
66  
67      @Override
68      public LocalAddress remoteAddress() {
69          return (LocalAddress) super.remoteAddress();
70      }
71  
72      @Override
73      public boolean isOpen() {
74          return state < 2;
75      }
76  
77      @Override
78      public boolean isActive() {
79          return state == 1;
80      }
81  
82      @Override
83      protected boolean isCompatible(EventLoop loop) {
84          return loop instanceof SingleThreadEventLoop;
85      }
86  
87      @Override
88      protected SocketAddress localAddress0() {
89          return localAddress;
90      }
91  
92      @Override
93      protected void doRegister() throws Exception {
94          ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
95      }
96  
97      @Override
98      protected void doBind(SocketAddress localAddress) throws Exception {
99          this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
100         state = 1;
101     }
102 
103     @Override
104     protected void doClose() throws Exception {
105         if (state <= 1) {
106             // Update all internal state before the closeFuture is notified.
107             if (localAddress != null) {
108                 LocalChannelRegistry.unregister(localAddress);
109                 localAddress = null;
110             }
111             state = 2;
112         }
113     }
114 
115     @Override
116     protected void doDeregister() throws Exception {
117         ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
118     }
119 
120     @Override
121     protected void doBeginRead() throws Exception {
122         if (acceptInProgress) {
123             return;
124         }
125 
126         Queue<Object> inboundBuffer = this.inboundBuffer;
127         if (inboundBuffer.isEmpty()) {
128             acceptInProgress = true;
129             return;
130         }
131 
132         readInbound();
133     }
134 
135     LocalChannel serve(final LocalChannel peer) {
136         final LocalChannel child = newLocalChannel(peer);
137         if (eventLoop().inEventLoop()) {
138             serve0(child);
139         } else {
140             eventLoop().execute(new Runnable() {
141                 @Override
142                 public void run() {
143                     serve0(child);
144                 }
145             });
146         }
147         return child;
148     }
149 
150     private void readInbound() {
151         RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
152         handle.reset(config());
153         ChannelPipeline pipeline = pipeline();
154         do {
155             Object m = inboundBuffer.poll();
156             if (m == null) {
157                 break;
158             }
159             pipeline.fireChannelRead(m);
160         } while (handle.continueReading());
161         handle.readComplete();
162         pipeline.fireChannelReadComplete();
163     }
164 
165     /**
166      * A factory method for {@link LocalChannel}s. Users may override it
167      * to create custom instances of {@link LocalChannel}s.
168      */
169     protected LocalChannel newLocalChannel(LocalChannel peer) {
170         return new LocalChannel(this, peer);
171     }
172 
173     private void serve0(final LocalChannel child) {
174         inboundBuffer.add(child);
175         if (acceptInProgress) {
176             acceptInProgress = false;
177 
178             readInbound();
179         }
180     }
181 }