View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.AbstractServerChannel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelPipeline;
21  import io.netty.channel.DefaultChannelConfig;
22  import io.netty.channel.EventLoop;
23  import io.netty.channel.ServerChannel;
24  import io.netty.channel.SingleThreadEventLoop;
25  
26  import java.net.SocketAddress;
27  import java.util.ArrayDeque;
28  import java.util.Queue;
29  
30  /**
31   * A {@link ServerChannel} for the local transport which allows in VM communication.
32   */
33  public class LocalServerChannel extends AbstractServerChannel {
34  
35      private final ChannelConfig config = new DefaultChannelConfig(this);
36      private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
37      private final Runnable shutdownHook = new Runnable() {
38          @Override
39          public void run() {
40              unsafe().close(unsafe().voidPromise());
41          }
42      };
43  
44      private volatile int state; // 0 - open, 1 - active, 2 - closed
45      private volatile LocalAddress localAddress;
46      private volatile boolean acceptInProgress;
47  
48      @Override
49      public ChannelConfig config() {
50          return config;
51      }
52  
53      @Override
54      public LocalAddress localAddress() {
55          return (LocalAddress) super.localAddress();
56      }
57  
58      @Override
59      public LocalAddress remoteAddress() {
60          return (LocalAddress) super.remoteAddress();
61      }
62  
63      @Override
64      public boolean isOpen() {
65          return state < 2;
66      }
67  
68      @Override
69      public boolean isActive() {
70          return state == 1;
71      }
72  
73      @Override
74      protected boolean isCompatible(EventLoop loop) {
75          return loop instanceof SingleThreadEventLoop;
76      }
77  
78      @Override
79      protected SocketAddress localAddress0() {
80          return localAddress;
81      }
82  
83      @Override
84      protected void doRegister() throws Exception {
85          ((SingleThreadEventLoop) eventLoop().unwrap()).addShutdownHook(shutdownHook);
86      }
87  
88      @Override
89      protected void doBind(SocketAddress localAddress) throws Exception {
90          this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
91          state = 1;
92      }
93  
94      @Override
95      protected void doClose() throws Exception {
96          if (state <= 1) {
97              // Update all internal state before the closeFuture is notified.
98              if (localAddress != null) {
99                  LocalChannelRegistry.unregister(localAddress);
100                 localAddress = null;
101             }
102             state = 2;
103         }
104     }
105 
106     @Override
107     protected void doDeregister() throws Exception {
108         ((SingleThreadEventLoop) eventLoop().unwrap()).removeShutdownHook(shutdownHook);
109     }
110 
111     @Override
112     protected void doBeginRead() throws Exception {
113         if (acceptInProgress) {
114             return;
115         }
116 
117         Queue<Object> inboundBuffer = this.inboundBuffer;
118         if (inboundBuffer.isEmpty()) {
119             acceptInProgress = true;
120             return;
121         }
122 
123         ChannelPipeline pipeline = pipeline();
124         for (;;) {
125             Object m = inboundBuffer.poll();
126             if (m == null) {
127                 break;
128             }
129             pipeline.fireChannelRead(m);
130         }
131         pipeline.fireChannelReadComplete();
132     }
133 
134     LocalChannel serve(final LocalChannel peer) {
135         final LocalChannel child = new LocalChannel(this, peer);
136         if (eventLoop().inEventLoop()) {
137             serve0(child);
138         } else {
139             eventLoop().execute(new Runnable() {
140                 @Override
141                 public void run() {
142                     serve0(child);
143                 }
144             });
145         }
146         return child;
147     }
148 
149     private void serve0(final LocalChannel child) {
150         inboundBuffer.add(child);
151         if (acceptInProgress) {
152             acceptInProgress = false;
153             ChannelPipeline pipeline = pipeline();
154             for (;;) {
155                 Object m = inboundBuffer.poll();
156                 if (m == null) {
157                     break;
158                 }
159                 pipeline.fireChannelRead(m);
160             }
161             pipeline.fireChannelReadComplete();
162         }
163     }
164 }