1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.channel.AbstractChannel;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOutboundBuffer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.DefaultChannelConfig;
26 import io.netty.channel.EventLoop;
27 import io.netty.channel.PreferHeapByteBufAllocator;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.SingleThreadEventLoop;
30 import io.netty.util.ReferenceCountUtil;
31 import io.netty.util.concurrent.Future;
32 import io.netty.util.concurrent.SingleThreadEventExecutor;
33 import io.netty.util.internal.InternalThreadLocalMap;
34 import io.netty.util.internal.PlatformDependent;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.net.ConnectException;
39 import java.net.SocketAddress;
40 import java.nio.channels.AlreadyConnectedException;
41 import java.nio.channels.ClosedChannelException;
42 import java.nio.channels.ConnectionPendingException;
43 import java.nio.channels.NotYetConnectedException;
44 import java.util.Queue;
45 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
46
47
48
49
50 public class LocalChannel extends AbstractChannel {
51 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
52 @SuppressWarnings({ "rawtypes" })
53 private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
54 AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
55 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
56 private static final int MAX_READER_STACK_DEPTH = 8;
57
58 private enum State { OPEN, BOUND, CONNECTED, CLOSED }
59
60 private final ChannelConfig config = new DefaultChannelConfig(this);
61
62 final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
63 private final Runnable readTask = new Runnable() {
64 @Override
65 public void run() {
66
67 if (!inboundBuffer.isEmpty()) {
68 readInbound();
69 }
70 }
71 };
72
73 private final Runnable shutdownHook = new Runnable() {
74 @Override
75 public void run() {
76 unsafe().close(unsafe().voidPromise());
77 }
78 };
79
80 private volatile State state;
81 private volatile LocalChannel peer;
82 private volatile LocalAddress localAddress;
83 private volatile LocalAddress remoteAddress;
84 private volatile ChannelPromise connectPromise;
85 private volatile boolean readInProgress;
86 private volatile boolean writeInProgress;
87 private volatile Future<?> finishReadFuture;
88
89 public LocalChannel() {
90 super(null);
91 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
92 }
93
94 protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
95 super(parent);
96 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
97 this.peer = peer;
98 localAddress = parent.localAddress();
99 remoteAddress = peer.localAddress();
100 }
101
102 @Override
103 public ChannelMetadata metadata() {
104 return METADATA;
105 }
106
107 @Override
108 public ChannelConfig config() {
109 return config;
110 }
111
112 @Override
113 public LocalServerChannel parent() {
114 return (LocalServerChannel) super.parent();
115 }
116
117 @Override
118 public LocalAddress localAddress() {
119 return (LocalAddress) super.localAddress();
120 }
121
122 @Override
123 public LocalAddress remoteAddress() {
124 return (LocalAddress) super.remoteAddress();
125 }
126
127 @Override
128 public boolean isOpen() {
129 return state != State.CLOSED;
130 }
131
132 @Override
133 public boolean isActive() {
134 return state == State.CONNECTED;
135 }
136
137 @Override
138 protected AbstractUnsafe newUnsafe() {
139 return new LocalUnsafe();
140 }
141
142 @Override
143 protected boolean isCompatible(EventLoop loop) {
144 return loop instanceof SingleThreadEventLoop;
145 }
146
147 @Override
148 protected SocketAddress localAddress0() {
149 return localAddress;
150 }
151
152 @Override
153 protected SocketAddress remoteAddress0() {
154 return remoteAddress;
155 }
156
157 @Override
158 protected void doRegister() throws Exception {
159
160
161
162
163
164 if (peer != null && parent() != null) {
165
166
167 final LocalChannel peer = this.peer;
168 state = State.CONNECTED;
169
170 peer.remoteAddress = parent() == null ? null : parent().localAddress();
171 peer.state = State.CONNECTED;
172
173
174
175
176
177 peer.eventLoop().execute(new Runnable() {
178 @Override
179 public void run() {
180 ChannelPromise promise = peer.connectPromise;
181
182
183
184 if (promise != null && promise.trySuccess()) {
185 peer.pipeline().fireChannelActive();
186 }
187 }
188 });
189 }
190 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
191 }
192
193 @Override
194 protected void doBind(SocketAddress localAddress) throws Exception {
195 this.localAddress =
196 LocalChannelRegistry.register(this, this.localAddress,
197 localAddress);
198 state = State.BOUND;
199 }
200
201 @Override
202 protected void doDisconnect() throws Exception {
203 doClose();
204 }
205
206 @Override
207 protected void doClose() throws Exception {
208 final LocalChannel peer = this.peer;
209 State oldState = state;
210 try {
211 if (oldState != State.CLOSED) {
212
213 if (localAddress != null) {
214 if (parent() == null) {
215 LocalChannelRegistry.unregister(localAddress);
216 }
217 localAddress = null;
218 }
219
220
221
222 state = State.CLOSED;
223
224
225 if (writeInProgress && peer != null) {
226 finishPeerRead(peer);
227 }
228
229 ChannelPromise promise = connectPromise;
230 if (promise != null) {
231
232 promise.tryFailure(new ClosedChannelException());
233 connectPromise = null;
234 }
235 }
236
237 if (peer != null) {
238 this.peer = null;
239
240
241
242 EventLoop peerEventLoop = peer.eventLoop();
243 final boolean peerIsActive = peer.isActive();
244 try {
245 peerEventLoop.execute(new Runnable() {
246 @Override
247 public void run() {
248 peer.tryClose(peerIsActive);
249 }
250 });
251 } catch (Throwable cause) {
252 logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
253 this, peer, cause);
254 if (peerEventLoop.inEventLoop()) {
255 peer.releaseInboundBuffers();
256 } else {
257
258
259 peer.close();
260 }
261 PlatformDependent.throwException(cause);
262 }
263 }
264 } finally {
265
266 if (oldState != null && oldState != State.CLOSED) {
267
268
269
270
271 releaseInboundBuffers();
272 }
273 }
274 }
275
276 private void tryClose(boolean isActive) {
277 if (isActive) {
278 unsafe().close(unsafe().voidPromise());
279 } else {
280 releaseInboundBuffers();
281 }
282 }
283
284 @Override
285 protected void doDeregister() throws Exception {
286
287 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
288 }
289
290 private void readInbound() {
291 RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
292 handle.reset(config());
293 ChannelPipeline pipeline = pipeline();
294 do {
295 Object received = inboundBuffer.poll();
296 if (received == null) {
297 break;
298 }
299 pipeline.fireChannelRead(received);
300 } while (handle.continueReading());
301
302 pipeline.fireChannelReadComplete();
303 }
304
305 @Override
306 protected void doBeginRead() throws Exception {
307 if (readInProgress) {
308 return;
309 }
310
311 Queue<Object> inboundBuffer = this.inboundBuffer;
312 if (inboundBuffer.isEmpty()) {
313 readInProgress = true;
314 return;
315 }
316
317 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
318 final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
319 if (stackDepth < MAX_READER_STACK_DEPTH) {
320 threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
321 try {
322 readInbound();
323 } finally {
324 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
325 }
326 } else {
327 try {
328 eventLoop().execute(readTask);
329 } catch (Throwable cause) {
330 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
331 close();
332 peer.close();
333 PlatformDependent.throwException(cause);
334 }
335 }
336 }
337
338 @Override
339 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
340 switch (state) {
341 case OPEN:
342 case BOUND:
343 throw new NotYetConnectedException();
344 case CLOSED:
345 throw new ClosedChannelException();
346 case CONNECTED:
347 break;
348 }
349
350 final LocalChannel peer = this.peer;
351
352 writeInProgress = true;
353 try {
354 ClosedChannelException exception = null;
355 for (;;) {
356 Object msg = in.current();
357 if (msg == null) {
358 break;
359 }
360 try {
361
362
363 if (peer.state == State.CONNECTED) {
364 peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
365 in.remove();
366 } else {
367 if (exception == null) {
368 exception = new ClosedChannelException();
369 }
370 in.remove(exception);
371 }
372 } catch (Throwable cause) {
373 in.remove(cause);
374 }
375 }
376 } finally {
377
378
379
380
381
382 writeInProgress = false;
383 }
384
385 finishPeerRead(peer);
386 }
387
388 private void finishPeerRead(final LocalChannel peer) {
389
390 if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
391 finishPeerRead0(peer);
392 } else {
393 runFinishPeerReadTask(peer);
394 }
395 }
396
397 private void runFinishPeerReadTask(final LocalChannel peer) {
398
399
400 final Runnable finishPeerReadTask = new Runnable() {
401 @Override
402 public void run() {
403 finishPeerRead0(peer);
404 }
405 };
406 try {
407 if (peer.writeInProgress) {
408 peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
409 } else {
410 peer.eventLoop().execute(finishPeerReadTask);
411 }
412 } catch (Throwable cause) {
413 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
414 close();
415 peer.close();
416 PlatformDependent.throwException(cause);
417 }
418 }
419
420 private void releaseInboundBuffers() {
421 assert eventLoop() == null || eventLoop().inEventLoop();
422 readInProgress = false;
423 Queue<Object> inboundBuffer = this.inboundBuffer;
424 Object msg;
425 while ((msg = inboundBuffer.poll()) != null) {
426 ReferenceCountUtil.release(msg);
427 }
428 }
429
430 private void finishPeerRead0(LocalChannel peer) {
431 Future<?> peerFinishReadFuture = peer.finishReadFuture;
432 if (peerFinishReadFuture != null) {
433 if (!peerFinishReadFuture.isDone()) {
434 runFinishPeerReadTask(peer);
435 return;
436 } else {
437
438 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
439 }
440 }
441
442
443 if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
444 peer.readInProgress = false;
445 peer.readInbound();
446 }
447 }
448
449 private class LocalUnsafe extends AbstractUnsafe {
450
451 @Override
452 public void connect(final SocketAddress remoteAddress,
453 SocketAddress localAddress, final ChannelPromise promise) {
454 if (!promise.setUncancellable() || !ensureOpen(promise)) {
455 return;
456 }
457
458 if (state == State.CONNECTED) {
459 Exception cause = new AlreadyConnectedException();
460 safeSetFailure(promise, cause);
461 pipeline().fireExceptionCaught(cause);
462 return;
463 }
464
465 if (connectPromise != null) {
466 throw new ConnectionPendingException();
467 }
468
469 connectPromise = promise;
470
471 if (state != State.BOUND) {
472
473 if (localAddress == null) {
474 localAddress = new LocalAddress(LocalChannel.this);
475 }
476 }
477
478 if (localAddress != null) {
479 try {
480 doBind(localAddress);
481 } catch (Throwable t) {
482 safeSetFailure(promise, t);
483 close(voidPromise());
484 return;
485 }
486 }
487
488 Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
489 if (!(boundChannel instanceof LocalServerChannel)) {
490 Exception cause = new ConnectException("connection refused: " + remoteAddress);
491 safeSetFailure(promise, cause);
492 close(voidPromise());
493 return;
494 }
495
496 LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
497 peer = serverChannel.serve(LocalChannel.this);
498 }
499 }
500 }