1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AbstractChannel;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultChannelConfig;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.PreferHeapByteBufAllocator;
29 import io.netty.channel.RecvByteBufAllocator;
30 import io.netty.channel.SingleThreadEventLoop;
31 import io.netty.util.ReferenceCountUtil;
32 import io.netty.util.concurrent.Future;
33 import io.netty.util.concurrent.SingleThreadEventExecutor;
34 import io.netty.util.internal.InternalThreadLocalMap;
35 import io.netty.util.internal.PlatformDependent;
36 import io.netty.util.internal.logging.InternalLogger;
37 import io.netty.util.internal.logging.InternalLoggerFactory;
38
39 import java.net.ConnectException;
40 import java.net.SocketAddress;
41 import java.nio.channels.AlreadyConnectedException;
42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.ConnectionPendingException;
44 import java.nio.channels.NotYetConnectedException;
45 import java.util.Queue;
46 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
47
48
49
50
51 public class LocalChannel extends AbstractChannel {
52 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
53 @SuppressWarnings({ "rawtypes" })
54 private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
55 AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
56 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57 private static final int MAX_READER_STACK_DEPTH = 8;
58
59 private enum State { OPEN, BOUND, CONNECTED, CLOSED }
60
61 private final ChannelConfig config = new DefaultChannelConfig(this);
62
63 final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
64 private final Runnable readTask = new Runnable() {
65 @Override
66 public void run() {
67
68 if (!inboundBuffer.isEmpty()) {
69 readInbound();
70 }
71 }
72 };
73
74 private final Runnable shutdownHook = new Runnable() {
75 @Override
76 public void run() {
77 unsafe().close(unsafe().voidPromise());
78 }
79 };
80
81 private final Runnable finishReadTask = new Runnable() {
82 @Override
83 public void run() {
84 finishPeerRead0(LocalChannel.this);
85 }
86 };
87
88 private volatile State state;
89 private volatile LocalChannel peer;
90 private volatile LocalAddress localAddress;
91 private volatile LocalAddress remoteAddress;
92 private volatile ChannelPromise connectPromise;
93 private volatile boolean readInProgress;
94 private volatile boolean writeInProgress;
95 private volatile Future<?> finishReadFuture;
96
97 public LocalChannel() {
98 super(null);
99 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
100 }
101
102 protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
103 super(parent);
104 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
105 this.peer = peer;
106 localAddress = parent.localAddress();
107 remoteAddress = peer.localAddress();
108 }
109
110 @Override
111 public ChannelMetadata metadata() {
112 return METADATA;
113 }
114
115 @Override
116 public ChannelConfig config() {
117 return config;
118 }
119
120 @Override
121 public LocalServerChannel parent() {
122 return (LocalServerChannel) super.parent();
123 }
124
125 @Override
126 public LocalAddress localAddress() {
127 return (LocalAddress) super.localAddress();
128 }
129
130 @Override
131 public LocalAddress remoteAddress() {
132 return (LocalAddress) super.remoteAddress();
133 }
134
135 @Override
136 public boolean isOpen() {
137 return state != State.CLOSED;
138 }
139
140 @Override
141 public boolean isActive() {
142 return state == State.CONNECTED;
143 }
144
145 @Override
146 protected AbstractUnsafe newUnsafe() {
147 return new LocalUnsafe();
148 }
149
150 @Override
151 protected boolean isCompatible(EventLoop loop) {
152 return loop instanceof SingleThreadEventLoop;
153 }
154
155 @Override
156 protected SocketAddress localAddress0() {
157 return localAddress;
158 }
159
160 @Override
161 protected SocketAddress remoteAddress0() {
162 return remoteAddress;
163 }
164
165 @Override
166 protected void doRegister() throws Exception {
167
168
169
170
171
172 if (peer != null && parent() != null) {
173
174
175 final LocalChannel peer = this.peer;
176 state = State.CONNECTED;
177
178 peer.remoteAddress = parent() == null ? null : parent().localAddress();
179 peer.state = State.CONNECTED;
180
181
182
183
184
185 peer.eventLoop().execute(new Runnable() {
186 @Override
187 public void run() {
188 ChannelPromise promise = peer.connectPromise;
189
190
191
192 if (promise != null && promise.trySuccess()) {
193 peer.pipeline().fireChannelActive();
194 }
195 }
196 });
197 }
198 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
199 }
200
201 @Override
202 protected void doBind(SocketAddress localAddress) throws Exception {
203 this.localAddress =
204 LocalChannelRegistry.register(this, this.localAddress,
205 localAddress);
206 state = State.BOUND;
207 }
208
209 @Override
210 protected void doDisconnect() throws Exception {
211 doClose();
212 }
213
214 @Override
215 protected void doClose() throws Exception {
216 final LocalChannel peer = this.peer;
217 State oldState = state;
218 try {
219 if (oldState != State.CLOSED) {
220
221 if (localAddress != null) {
222 if (parent() == null) {
223 LocalChannelRegistry.unregister(localAddress);
224 }
225 localAddress = null;
226 }
227
228
229
230 state = State.CLOSED;
231
232
233 if (writeInProgress && peer != null) {
234 finishPeerRead(peer);
235 }
236
237 ChannelPromise promise = connectPromise;
238 if (promise != null) {
239
240 promise.tryFailure(new ClosedChannelException());
241 connectPromise = null;
242 }
243 }
244
245 if (peer != null) {
246 this.peer = null;
247
248
249
250 EventLoop peerEventLoop = peer.eventLoop();
251 final boolean peerIsActive = peer.isActive();
252 try {
253 peerEventLoop.execute(new Runnable() {
254 @Override
255 public void run() {
256 peer.tryClose(peerIsActive);
257 }
258 });
259 } catch (Throwable cause) {
260 logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
261 this, peer, cause);
262 if (peerEventLoop.inEventLoop()) {
263 peer.releaseInboundBuffers();
264 } else {
265
266
267 peer.close();
268 }
269 PlatformDependent.throwException(cause);
270 }
271 }
272 } finally {
273
274 if (oldState != null && oldState != State.CLOSED) {
275
276
277
278
279 releaseInboundBuffers();
280 }
281 }
282 }
283
284 private void tryClose(boolean isActive) {
285 if (isActive) {
286 unsafe().close(unsafe().voidPromise());
287 } else {
288 releaseInboundBuffers();
289 }
290 }
291
292 @Override
293 protected void doDeregister() throws Exception {
294
295 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
296 }
297
298 private void readInbound() {
299 RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
300 handle.reset(config());
301 ChannelPipeline pipeline = pipeline();
302 do {
303 Object received = inboundBuffer.poll();
304 if (received == null) {
305 break;
306 }
307 if (received instanceof ByteBuf && inboundBuffer.peek() instanceof ByteBuf) {
308 ByteBuf msg = (ByteBuf) received;
309 ByteBuf output = handle.allocate(alloc());
310 if (msg.readableBytes() < output.writableBytes()) {
311
312 output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
313 msg.release();
314 while ((received = inboundBuffer.peek()) instanceof ByteBuf &&
315 ((ByteBuf) received).readableBytes() < output.writableBytes()) {
316 inboundBuffer.poll();
317 msg = (ByteBuf) received;
318 output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
319 msg.release();
320 }
321 handle.lastBytesRead(output.readableBytes());
322 received = output;
323 } else {
324
325 handle.lastBytesRead(output.capacity());
326 output.release();
327 }
328 }
329 handle.incMessagesRead(1);
330 pipeline.fireChannelRead(received);
331 } while (handle.continueReading());
332 handle.readComplete();
333 pipeline.fireChannelReadComplete();
334 }
335
336 @Override
337 protected void doBeginRead() throws Exception {
338 if (readInProgress) {
339 return;
340 }
341
342 Queue<Object> inboundBuffer = this.inboundBuffer;
343 if (inboundBuffer.isEmpty()) {
344 readInProgress = true;
345 return;
346 }
347
348 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
349 final int stackDepth = threadLocals.localChannelReaderStackDepth();
350 if (stackDepth < MAX_READER_STACK_DEPTH) {
351 threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
352 try {
353 readInbound();
354 } finally {
355 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
356 }
357 } else {
358 try {
359 eventLoop().execute(readTask);
360 } catch (Throwable cause) {
361 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
362 close();
363 peer.close();
364 PlatformDependent.throwException(cause);
365 }
366 }
367 }
368
369 @Override
370 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
371 switch (state) {
372 case OPEN:
373 case BOUND:
374 throw new NotYetConnectedException();
375 case CLOSED:
376 throw new ClosedChannelException();
377 case CONNECTED:
378 break;
379 }
380
381 final LocalChannel peer = this.peer;
382
383 writeInProgress = true;
384 try {
385 ClosedChannelException exception = null;
386 for (;;) {
387 Object msg = in.current();
388 if (msg == null) {
389 break;
390 }
391 try {
392
393
394 if (peer.state == State.CONNECTED) {
395 peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
396 in.remove();
397 } else {
398 if (exception == null) {
399 exception = new ClosedChannelException();
400 }
401 in.remove(exception);
402 }
403 } catch (Throwable cause) {
404 in.remove(cause);
405 }
406 }
407 } finally {
408
409
410
411
412
413 writeInProgress = false;
414 }
415
416 finishPeerRead(peer);
417 }
418
419 private void finishPeerRead(final LocalChannel peer) {
420
421 if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
422 finishPeerRead0(peer);
423 } else {
424 runFinishPeerReadTask(peer);
425 }
426 }
427
428 private void runFinishTask0() {
429
430
431 if (writeInProgress) {
432 finishReadFuture = eventLoop().submit(finishReadTask);
433 } else {
434 eventLoop().execute(finishReadTask);
435 }
436 }
437
438 private void runFinishPeerReadTask(final LocalChannel peer) {
439 try {
440 peer.runFinishTask0();
441 } catch (Throwable cause) {
442 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
443 close();
444 peer.close();
445 PlatformDependent.throwException(cause);
446 }
447 }
448
449 private void releaseInboundBuffers() {
450 assert eventLoop() == null || eventLoop().inEventLoop();
451 readInProgress = false;
452 Queue<Object> inboundBuffer = this.inboundBuffer;
453 Object msg;
454 while ((msg = inboundBuffer.poll()) != null) {
455 ReferenceCountUtil.release(msg);
456 }
457 }
458
459 private void finishPeerRead0(LocalChannel peer) {
460 Future<?> peerFinishReadFuture = peer.finishReadFuture;
461 if (peerFinishReadFuture != null) {
462 if (!peerFinishReadFuture.isDone()) {
463 runFinishPeerReadTask(peer);
464 return;
465 } else {
466
467 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
468 }
469 }
470
471
472 if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
473 peer.readInProgress = false;
474 peer.readInbound();
475 }
476 }
477
478 private class LocalUnsafe extends AbstractUnsafe {
479
480 @Override
481 public void connect(final SocketAddress remoteAddress,
482 SocketAddress localAddress, final ChannelPromise promise) {
483 if (!promise.setUncancellable() || !ensureOpen(promise)) {
484 return;
485 }
486
487 if (state == State.CONNECTED) {
488 Exception cause = new AlreadyConnectedException();
489 safeSetFailure(promise, cause);
490 return;
491 }
492
493 if (connectPromise != null) {
494 throw new ConnectionPendingException();
495 }
496
497 connectPromise = promise;
498
499 if (state != State.BOUND) {
500
501 if (localAddress == null) {
502 localAddress = new LocalAddress(LocalChannel.this);
503 }
504 }
505
506 if (localAddress != null) {
507 try {
508 doBind(localAddress);
509 } catch (Throwable t) {
510 safeSetFailure(promise, t);
511 close(voidPromise());
512 return;
513 }
514 }
515
516 Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
517 if (!(boundChannel instanceof LocalServerChannel)) {
518 Exception cause = new ConnectException("connection refused: " + remoteAddress);
519 safeSetFailure(promise, cause);
520 close(voidPromise());
521 return;
522 }
523
524 LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
525 peer = serverChannel.serve(LocalChannel.this);
526 }
527 }
528 }