View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelException;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultChannelConfig;
27  import io.netty.channel.EventLoop;
28  import io.netty.channel.SingleThreadEventLoop;
29  import io.netty.util.ReferenceCountUtil;
30  import io.netty.util.internal.InternalThreadLocalMap;
31  
32  import java.net.SocketAddress;
33  import java.nio.channels.AlreadyConnectedException;
34  import java.nio.channels.ClosedChannelException;
35  import java.nio.channels.ConnectionPendingException;
36  import java.nio.channels.NotYetConnectedException;
37  import java.util.ArrayDeque;
38  import java.util.Collections;
39  import java.util.Queue;
40  
41  /**
42   * A {@link Channel} for the local transport.
43   */
44  public class LocalChannel extends AbstractChannel {
45  
46      private enum State { OPEN, BOUND, CONNECTED, CLOSED }
47  
48      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
49  
50      private static final int MAX_READER_STACK_DEPTH = 8;
51  
52      private final ChannelConfig config = new DefaultChannelConfig(this);
53      private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
54      private final Runnable readTask = new Runnable() {
55          @Override
56          public void run() {
57              ChannelPipeline pipeline = pipeline();
58              for (;;) {
59                  Object m = inboundBuffer.poll();
60                  if (m == null) {
61                      break;
62                  }
63                  pipeline.fireChannelRead(m);
64              }
65              pipeline.fireChannelReadComplete();
66          }
67      };
68  
69      private final Runnable shutdownHook = new Runnable() {
70          @Override
71          public void run() {
72              unsafe().close(unsafe().voidPromise());
73          }
74      };
75  
76      private volatile State state;
77      private volatile LocalChannel peer;
78      private volatile LocalAddress localAddress;
79      private volatile LocalAddress remoteAddress;
80      private volatile ChannelPromise connectPromise;
81      private volatile boolean readInProgress;
82      private volatile boolean registerInProgress;
83  
84      public LocalChannel() {
85          super(null);
86      }
87  
88      LocalChannel(LocalServerChannel parent, LocalChannel peer) {
89          super(parent);
90          this.peer = peer;
91          localAddress = parent.localAddress();
92          remoteAddress = peer.localAddress();
93      }
94  
95      @Override
96      public ChannelMetadata metadata() {
97          return METADATA;
98      }
99  
100     @Override
101     public ChannelConfig config() {
102         return config;
103     }
104 
105     @Override
106     public LocalServerChannel parent() {
107         return (LocalServerChannel) super.parent();
108     }
109 
110     @Override
111     public LocalAddress localAddress() {
112         return (LocalAddress) super.localAddress();
113     }
114 
115     @Override
116     public LocalAddress remoteAddress() {
117         return (LocalAddress) super.remoteAddress();
118     }
119 
120     @Override
121     public boolean isOpen() {
122         return state != State.CLOSED;
123     }
124 
125     @Override
126     public boolean isActive() {
127         return state == State.CONNECTED;
128     }
129 
130     @Override
131     protected AbstractUnsafe newUnsafe() {
132         return new LocalUnsafe();
133     }
134 
135     @Override
136     protected boolean isCompatible(EventLoop loop) {
137         return loop instanceof SingleThreadEventLoop;
138     }
139 
140     @Override
141     protected SocketAddress localAddress0() {
142         return localAddress;
143     }
144 
145     @Override
146     protected SocketAddress remoteAddress0() {
147         return remoteAddress;
148     }
149 
150     @Override
151     protected void doRegister() throws Exception {
152         // Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
153         // This is needed as a peer may not be null also if a LocalChannel was connected before and
154         // deregistered / registered later again.
155         //
156         // See https://github.com/netty/netty/issues/2400
157         if (peer != null && parent() != null) {
158             // Store the peer in a local variable as it may be set to null if doClose() is called.
159             // Because of this we also set registerInProgress to true as we check for this in doClose() and make sure
160             // we delay the fireChannelInactive() to be fired after the fireChannelActive() and so keep the correct
161             // order of events.
162             //
163             // See https://github.com/netty/netty/issues/2144
164             final LocalChannel peer = this.peer;
165             registerInProgress = true;
166             state = State.CONNECTED;
167 
168             peer.remoteAddress = parent() == null ? null : parent().localAddress();
169             peer.state = State.CONNECTED;
170 
171             // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
172             // This ensures that if both channels are on the same event loop, the peer's channelActive
173             // event is triggered *after* this channel's channelRegistered event, so that this channel's
174             // pipeline is fully initialized by ChannelInitializer before any channelRead events.
175             peer.eventLoop().execute(new Runnable() {
176                 @Override
177                 public void run() {
178                     registerInProgress = false;
179                     peer.pipeline().fireChannelActive();
180                     peer.connectPromise.setSuccess();
181                 }
182             });
183         }
184         ((SingleThreadEventLoop) eventLoop().unwrap()).addShutdownHook(shutdownHook);
185     }
186 
187     @Override
188     protected void doBind(SocketAddress localAddress) throws Exception {
189         this.localAddress =
190                 LocalChannelRegistry.register(this, this.localAddress,
191                         localAddress);
192         state = State.BOUND;
193     }
194 
195     @Override
196     protected void doDisconnect() throws Exception {
197         doClose();
198     }
199 
200     @Override
201     protected void doClose() throws Exception {
202         if (state != State.CLOSED) {
203             // Update all internal state before the closeFuture is notified.
204             if (localAddress != null) {
205                 if (parent() == null) {
206                     LocalChannelRegistry.unregister(localAddress);
207                 }
208                 localAddress = null;
209             }
210             state = State.CLOSED;
211         }
212 
213         final LocalChannel peer = this.peer;
214         if (peer != null && peer.isActive()) {
215             // Need to execute the close in the correct EventLoop
216             // See https://github.com/netty/netty/issues/1777
217             EventLoop eventLoop = peer.eventLoop();
218 
219             // Also check if the registration was not done yet. In this case we submit the close to the EventLoop
220             // to make sure it is run after the registration completes.
221             //
222             // See https://github.com/netty/netty/issues/2144
223             if (eventLoop.inEventLoop() && !registerInProgress) {
224                 peer.unsafe().close(unsafe().voidPromise());
225             } else {
226                 peer.eventLoop().execute(new Runnable() {
227                     @Override
228                     public void run() {
229                         peer.unsafe().close(unsafe().voidPromise());
230                     }
231                 });
232             }
233             this.peer = null;
234         }
235     }
236 
237     @Override
238     protected void doDeregister() throws Exception {
239         // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
240         ((SingleThreadEventLoop) eventLoop().unwrap()).removeShutdownHook(shutdownHook);
241     }
242 
243     @Override
244     protected void doBeginRead() throws Exception {
245         if (readInProgress) {
246             return;
247         }
248 
249         ChannelPipeline pipeline = pipeline();
250         Queue<Object> inboundBuffer = this.inboundBuffer;
251         if (inboundBuffer.isEmpty()) {
252             readInProgress = true;
253             return;
254         }
255 
256         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
257         final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
258         if (stackDepth < MAX_READER_STACK_DEPTH) {
259             threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
260             try {
261                 for (;;) {
262                     Object received = inboundBuffer.poll();
263                     if (received == null) {
264                         break;
265                     }
266                     pipeline.fireChannelRead(received);
267                 }
268                 pipeline.fireChannelReadComplete();
269             } finally {
270                 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
271             }
272         } else {
273             eventLoop().execute(readTask);
274         }
275     }
276 
277     @Override
278     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
279         switch (state) {
280         case OPEN:
281         case BOUND:
282             throw new NotYetConnectedException();
283         case CLOSED:
284             throw new ClosedChannelException();
285         }
286 
287         final LocalChannel peer = this.peer;
288         final ChannelPipeline peerPipeline = peer.pipeline();
289         final EventLoop peerLoop = peer.eventLoop();
290 
291         if (peerLoop == eventLoop()) {
292             for (;;) {
293                 Object msg = in.current();
294                 if (msg == null) {
295                     break;
296                 }
297                 peer.inboundBuffer.add(msg);
298                 ReferenceCountUtil.retain(msg);
299                 in.remove();
300             }
301             finishPeerRead(peer, peerPipeline);
302         } else {
303             // Use a copy because the original msgs will be recycled by AbstractChannel.
304             final Object[] msgsCopy = new Object[in.size()];
305             for (int i = 0; i < msgsCopy.length; i ++) {
306                 msgsCopy[i] = ReferenceCountUtil.retain(in.current());
307                 in.remove();
308             }
309 
310             peerLoop.execute(new Runnable() {
311                 @Override
312                 public void run() {
313                     Collections.addAll(peer.inboundBuffer, msgsCopy);
314                     finishPeerRead(peer, peerPipeline);
315                 }
316             });
317         }
318     }
319 
320     private static void finishPeerRead(LocalChannel peer, ChannelPipeline peerPipeline) {
321         if (peer.readInProgress) {
322             peer.readInProgress = false;
323             for (;;) {
324                 Object received = peer.inboundBuffer.poll();
325                 if (received == null) {
326                     break;
327                 }
328                 peerPipeline.fireChannelRead(received);
329             }
330             peerPipeline.fireChannelReadComplete();
331         }
332     }
333 
334     private class LocalUnsafe extends AbstractUnsafe {
335 
336         @Override
337         public void connect(final SocketAddress remoteAddress,
338                 SocketAddress localAddress, final ChannelPromise promise) {
339             if (!promise.setUncancellable() || !ensureOpen(promise)) {
340                 return;
341             }
342 
343             if (state == State.CONNECTED) {
344                 Exception cause = new AlreadyConnectedException();
345                 safeSetFailure(promise, cause);
346                 pipeline().fireExceptionCaught(cause);
347                 return;
348             }
349 
350             if (connectPromise != null) {
351                 throw new ConnectionPendingException();
352             }
353 
354             connectPromise = promise;
355 
356             if (state != State.BOUND) {
357                 // Not bound yet and no localAddress specified - get one.
358                 if (localAddress == null) {
359                     localAddress = new LocalAddress(LocalChannel.this);
360                 }
361             }
362 
363             if (localAddress != null) {
364                 try {
365                     doBind(localAddress);
366                 } catch (Throwable t) {
367                     safeSetFailure(promise, t);
368                     close(voidPromise());
369                     return;
370                 }
371             }
372 
373             Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
374             if (!(boundChannel instanceof LocalServerChannel)) {
375                 Exception cause = new ChannelException("connection refused");
376                 safeSetFailure(promise, cause);
377                 close(voidPromise());
378                 return;
379             }
380 
381             LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
382             peer = serverChannel.serve(LocalChannel.this);
383         }
384     }
385 }