View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.AbstractServerChannel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelPipeline;
21  import io.netty.channel.DefaultChannelConfig;
22  import io.netty.channel.EventLoop;
23  import io.netty.channel.PreferHeapByteBufAllocator;
24  import io.netty.channel.RecvByteBufAllocator;
25  import io.netty.channel.ServerChannel;
26  import io.netty.channel.SingleThreadEventLoop;
27  import io.netty.util.concurrent.SingleThreadEventExecutor;
28  
29  import java.net.SocketAddress;
30  import java.util.ArrayDeque;
31  import java.util.Queue;
32  
33  /**
34   * A {@link ServerChannel} for the local transport which allows in VM communication.
35   */
36  public class LocalServerChannel extends AbstractServerChannel {
37  
38      private final ChannelConfig config = new DefaultChannelConfig(this);
39      private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
40      private final Runnable shutdownHook = new Runnable() {
41          @Override
42          public void run() {
43              unsafe().close(unsafe().voidPromise());
44          }
45      };
46  
47      private volatile int state; // 0 - open, 1 - active, 2 - closed
48      private volatile LocalAddress localAddress;
49      private volatile boolean acceptInProgress;
50  
51      public LocalServerChannel() {
52          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
53      }
54  
55      @Override
56      public ChannelConfig config() {
57          return config;
58      }
59  
60      @Override
61      public LocalAddress localAddress() {
62          return (LocalAddress) super.localAddress();
63      }
64  
65      @Override
66      public LocalAddress remoteAddress() {
67          return (LocalAddress) super.remoteAddress();
68      }
69  
70      @Override
71      public boolean isOpen() {
72          return state < 2;
73      }
74  
75      @Override
76      public boolean isActive() {
77          return state == 1;
78      }
79  
80      @Override
81      protected boolean isCompatible(EventLoop loop) {
82          return loop instanceof SingleThreadEventLoop;
83      }
84  
85      @Override
86      protected SocketAddress localAddress0() {
87          return localAddress;
88      }
89  
90      @Override
91      protected void doRegister() throws Exception {
92          ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
93      }
94  
95      @Override
96      protected void doBind(SocketAddress localAddress) throws Exception {
97          this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
98          state = 1;
99      }
100 
101     @Override
102     protected void doClose() throws Exception {
103         if (state <= 1) {
104             // Update all internal state before the closeFuture is notified.
105             if (localAddress != null) {
106                 LocalChannelRegistry.unregister(localAddress);
107                 localAddress = null;
108             }
109             state = 2;
110         }
111     }
112 
113     @Override
114     protected void doDeregister() throws Exception {
115         ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
116     }
117 
118     @Override
119     protected void doBeginRead() throws Exception {
120         if (acceptInProgress) {
121             return;
122         }
123 
124         Queue<Object> inboundBuffer = this.inboundBuffer;
125         if (inboundBuffer.isEmpty()) {
126             acceptInProgress = true;
127             return;
128         }
129 
130         readInbound();
131     }
132 
133     LocalChannel serve(final LocalChannel peer) {
134         final LocalChannel child = newLocalChannel(peer);
135         if (eventLoop().inEventLoop()) {
136             serve0(child);
137         } else {
138             eventLoop().execute(new Runnable() {
139                 @Override
140                 public void run() {
141                     serve0(child);
142                 }
143             });
144         }
145         return child;
146     }
147 
148     private void readInbound() {
149         RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
150         handle.reset(config());
151         ChannelPipeline pipeline = pipeline();
152         do {
153             Object m = inboundBuffer.poll();
154             if (m == null) {
155                 break;
156             }
157             pipeline.fireChannelRead(m);
158         } while (handle.continueReading());
159 
160         pipeline.fireChannelReadComplete();
161     }
162 
163     /**
164      * A factory method for {@link LocalChannel}s. Users may override it
165      * to create custom instances of {@link LocalChannel}s.
166      */
167     protected LocalChannel newLocalChannel(LocalChannel peer) {
168         return new LocalChannel(this, peer);
169     }
170 
171     private void serve0(final LocalChannel child) {
172         inboundBuffer.add(child);
173         if (acceptInProgress) {
174             acceptInProgress = false;
175 
176             readInbound();
177         }
178     }
179 }