View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.AbstractServerChannel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelPipeline;
21  import io.netty.channel.DefaultChannelConfig;
22  import io.netty.channel.EventLoop;
23  import io.netty.channel.PreferHeapByteBufAllocator;
24  import io.netty.channel.ServerChannel;
25  import io.netty.channel.SingleThreadEventLoop;
26  import io.netty.util.concurrent.SingleThreadEventExecutor;
27  
28  import java.net.SocketAddress;
29  import java.util.ArrayDeque;
30  import java.util.Queue;
31  
32  /**
33   * A {@link ServerChannel} for the local transport which allows in VM communication.
34   */
35  public class LocalServerChannel extends AbstractServerChannel {
36  
37      private final ChannelConfig config = new DefaultChannelConfig(this);
38      private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
39      private final Runnable shutdownHook = new Runnable() {
40          @Override
41          public void run() {
42              unsafe().close(unsafe().voidPromise());
43          }
44      };
45  
46      private volatile int state; // 0 - open, 1 - active, 2 - closed
47      private volatile LocalAddress localAddress;
48      private volatile boolean acceptInProgress;
49  
50      public LocalServerChannel() {
51          config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
52      }
53  
54      @Override
55      public ChannelConfig config() {
56          return config;
57      }
58  
59      @Override
60      public LocalAddress localAddress() {
61          return (LocalAddress) super.localAddress();
62      }
63  
64      @Override
65      public LocalAddress remoteAddress() {
66          return (LocalAddress) super.remoteAddress();
67      }
68  
69      @Override
70      public boolean isOpen() {
71          return state < 2;
72      }
73  
74      @Override
75      public boolean isActive() {
76          return state == 1;
77      }
78  
79      @Override
80      protected boolean isCompatible(EventLoop loop) {
81          return loop instanceof SingleThreadEventLoop;
82      }
83  
84      @Override
85      protected SocketAddress localAddress0() {
86          return localAddress;
87      }
88  
89      @Override
90      protected void doRegister() throws Exception {
91          ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
92      }
93  
94      @Override
95      protected void doBind(SocketAddress localAddress) throws Exception {
96          this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
97          state = 1;
98      }
99  
100     @Override
101     protected void doClose() throws Exception {
102         if (state <= 1) {
103             // Update all internal state before the closeFuture is notified.
104             if (localAddress != null) {
105                 LocalChannelRegistry.unregister(localAddress);
106                 localAddress = null;
107             }
108             state = 2;
109         }
110     }
111 
112     @Override
113     protected void doDeregister() throws Exception {
114         ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
115     }
116 
117     @Override
118     protected void doBeginRead() throws Exception {
119         if (acceptInProgress) {
120             return;
121         }
122 
123         Queue<Object> inboundBuffer = this.inboundBuffer;
124         if (inboundBuffer.isEmpty()) {
125             acceptInProgress = true;
126             return;
127         }
128 
129         ChannelPipeline pipeline = pipeline();
130         for (;;) {
131             Object m = inboundBuffer.poll();
132             if (m == null) {
133                 break;
134             }
135             pipeline.fireChannelRead(m);
136         }
137         pipeline.fireChannelReadComplete();
138     }
139 
140     LocalChannel serve(final LocalChannel peer) {
141         final LocalChannel child = newLocalChannel(peer);
142         if (eventLoop().inEventLoop()) {
143             serve0(child);
144         } else {
145             eventLoop().execute(new Runnable() {
146               @Override
147               public void run() {
148                 serve0(child);
149               }
150             });
151         }
152         return child;
153     }
154 
155     /**
156      * A factory method for {@link LocalChannel}s. Users may override it
157      * to create custom instances of {@link LocalChannel}s.
158      */
159     protected LocalChannel newLocalChannel(LocalChannel peer) {
160         return new LocalChannel(this, peer);
161     }
162 
163     private void serve0(final LocalChannel child) {
164         inboundBuffer.add(child);
165         if (acceptInProgress) {
166             acceptInProgress = false;
167             ChannelPipeline pipeline = pipeline();
168             for (;;) {
169                 Object m = inboundBuffer.poll();
170                 if (m == null) {
171                     break;
172                 }
173                 pipeline.fireChannelRead(m);
174             }
175             pipeline.fireChannelReadComplete();
176         }
177     }
178 }