1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.local;
17
18 import io.netty5.buffer.api.DefaultBufferAllocators;
19 import io.netty5.channel.ChannelOption;
20 import io.netty5.channel.ChannelShutdownDirection;
21 import io.netty5.util.ReferenceCounted;
22 import io.netty5.util.Resource;
23 import io.netty5.buffer.api.internal.ResourceSupport;
24 import io.netty5.buffer.api.internal.Statics;
25 import io.netty5.channel.AbstractChannel;
26 import io.netty5.channel.Channel;
27 import io.netty5.channel.ChannelMetadata;
28 import io.netty5.channel.ChannelOutboundBuffer;
29 import io.netty5.channel.ChannelPipeline;
30 import io.netty5.channel.EventLoop;
31 import io.netty5.channel.RecvBufferAllocator;
32 import io.netty5.util.ReferenceCountUtil;
33 import io.netty5.util.concurrent.FastThreadLocal;
34 import io.netty5.util.concurrent.Future;
35 import io.netty5.util.internal.PlatformDependent;
36 import io.netty5.util.internal.logging.InternalLogger;
37 import io.netty5.util.internal.logging.InternalLoggerFactory;
38
39 import java.net.ConnectException;
40 import java.net.SocketAddress;
41 import java.nio.channels.AlreadyConnectedException;
42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.NotYetConnectedException;
44 import java.util.Queue;
45 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
46
47
48
49
50 public class LocalChannel extends AbstractChannel<LocalServerChannel, LocalAddress, LocalAddress>
51 implements LocalChannelUnsafe {
52 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
53 @SuppressWarnings("rawtypes")
54 private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
55 AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
56 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57 private static final int MAX_READER_STACK_DEPTH = 8;
58
59 private enum State { OPEN, BOUND, CONNECTED, CLOSED }
60
61
62 final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
63 private final Runnable readTask = () -> {
64
65 if (!inboundBuffer.isEmpty()) {
66 readInbound();
67 }
68 };
69
70 private volatile State state;
71 private volatile LocalChannel peer;
72 private volatile LocalAddress localAddress;
73 private volatile LocalAddress remoteAddress;
74 private volatile boolean readInProgress;
75 private volatile boolean writeInProgress;
76 private volatile Future<?> finishReadFuture;
77 private volatile boolean inputShutdown;
78 private volatile boolean outputShutdown;
79
80 public LocalChannel(EventLoop eventLoop) {
81 this(null, eventLoop, null);
82 }
83
84 protected LocalChannel(LocalServerChannel parent, EventLoop eventLoop, LocalChannel peer) {
85 super(parent, eventLoop, METADATA);
86 this.peer = peer;
87 if (parent != null) {
88 localAddress = parent.localAddress();
89 }
90 if (peer != null) {
91 remoteAddress = peer.localAddress();
92 }
93 setOption(ChannelOption.BUFFER_ALLOCATOR, DefaultBufferAllocators.onHeapAllocator());
94 }
95
96 @Override
97 public boolean isOpen() {
98 return state != State.CLOSED;
99 }
100
101 @Override
102 public boolean isActive() {
103 return state == State.CONNECTED;
104 }
105
106 @Override
107 protected LocalAddress localAddress0() {
108 return localAddress;
109 }
110
111 @Override
112 protected LocalAddress remoteAddress0() {
113 return remoteAddress;
114 }
115
116 @Override
117 protected void doBind(SocketAddress localAddress) throws Exception {
118 this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
119 state = State.BOUND;
120 }
121
122 @Override
123 protected void doShutdown(ChannelShutdownDirection direction) {
124 switch (direction) {
125 case Inbound:
126 inputShutdown = true;
127 break;
128 case Outbound:
129 outputShutdown = true;
130 break;
131 default:
132 throw new AssertionError();
133 }
134 }
135
136 @Override
137 public boolean isShutdown(ChannelShutdownDirection direction) {
138 if (!isActive()) {
139 return true;
140 }
141 switch (direction) {
142 case Inbound:
143 return inputShutdown;
144 case Outbound:
145 return outputShutdown;
146 default:
147 throw new AssertionError();
148 }
149 }
150
151 @Override
152 protected void doDisconnect() throws Exception {
153 doClose();
154 }
155
156 @Override
157 protected void doClose() throws Exception {
158 final LocalChannel peer = this.peer;
159 State oldState = state;
160 try {
161 if (oldState != State.CLOSED) {
162
163 if (localAddress != null) {
164 if (parent() == null) {
165 LocalChannelRegistry.unregister(localAddress);
166 }
167 localAddress = null;
168 }
169
170
171
172 state = State.CLOSED;
173
174
175 if (writeInProgress && peer != null) {
176 finishPeerRead(peer);
177 }
178 }
179
180 if (peer != null) {
181 this.peer = null;
182
183
184
185 EventLoop peerEventLoop = peer.executor();
186 final boolean peerIsActive = peer.isActive();
187 try {
188 peerEventLoop.execute(() -> peer.tryClose(peerIsActive));
189 } catch (Throwable cause) {
190 logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
191 this, peer, cause);
192 if (peerEventLoop.inEventLoop()) {
193 peer.releaseInboundBuffers();
194 } else {
195
196
197 peer.close();
198 }
199 throw cause;
200 }
201 }
202 } finally {
203
204 if (oldState != null && oldState != State.CLOSED) {
205
206
207
208
209 releaseInboundBuffers();
210 }
211 }
212 }
213
214 private void tryClose(boolean isActive) {
215 if (isActive) {
216 closeTransport(newPromise());
217 } else {
218 releaseInboundBuffers();
219 closeForciblyTransport();
220 }
221 }
222
223 private void readInbound() {
224 RecvBufferAllocator.Handle handle = recvBufAllocHandle();
225 handle.reset();
226 ChannelPipeline pipeline = pipeline();
227 do {
228 Object received = inboundBuffer.poll();
229 if (received == null) {
230 break;
231 }
232 pipeline.fireChannelRead(received);
233 } while (handle.continueReading(isAutoRead()) && !isShutdown(ChannelShutdownDirection.Inbound));
234
235 pipeline.fireChannelReadComplete();
236 readIfIsAutoRead();
237 }
238
239 private static final class ReaderStackDepth {
240 private int stackDepth;
241
242 boolean incrementIfPossible() {
243 if (stackDepth == MAX_READER_STACK_DEPTH) {
244 return false;
245 }
246 stackDepth++;
247 return true;
248 }
249
250 void decrement() {
251 stackDepth--;
252 }
253 }
254
255 private static final FastThreadLocal<ReaderStackDepth> STACK_DEPTH = new FastThreadLocal<>() {
256 @Override
257 protected ReaderStackDepth initialValue() throws Exception {
258 return new ReaderStackDepth();
259 }
260 };
261
262 @Override
263 protected void doBeginRead() throws Exception {
264 if (readInProgress) {
265 return;
266 }
267
268 Queue<Object> inboundBuffer = this.inboundBuffer;
269 if (inboundBuffer.isEmpty()) {
270 readInProgress = true;
271 return;
272 }
273
274 final ReaderStackDepth readerStackDepth = STACK_DEPTH.get();
275 if (readerStackDepth.incrementIfPossible()) {
276 try {
277 readInbound();
278 } finally {
279 readerStackDepth.decrement();
280 }
281 } else {
282 try {
283 executor().execute(readTask);
284 } catch (Throwable cause) {
285 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
286 close();
287 peer.close();
288 throw cause;
289 }
290 }
291 }
292
293 @Override
294 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
295 switch (state) {
296 case OPEN:
297 case BOUND:
298 throw new NotYetConnectedException();
299 case CLOSED:
300 throw new ClosedChannelException();
301 case CONNECTED:
302 break;
303 }
304
305 final LocalChannel peer = this.peer;
306
307 writeInProgress = true;
308 try {
309 for (;;) {
310 Object msg = in.current();
311 if (msg == null) {
312 break;
313 }
314 try {
315
316
317 if (peer.state == State.CONNECTED) {
318 if (msg instanceof ReferenceCounted) {
319 peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
320 } else if (msg instanceof ResourceSupport) {
321 peer.inboundBuffer.add(Statics.acquire((ResourceSupport<?, ?>) msg));
322 } else if (msg instanceof Resource) {
323 peer.inboundBuffer.add(((Resource<?>) msg).send().receive());
324 } else {
325 peer.inboundBuffer.add(msg);
326 }
327 in.remove();
328 } else {
329 break;
330 }
331 } catch (Throwable cause) {
332 in.remove(cause);
333 }
334 }
335 } finally {
336
337
338
339
340
341 writeInProgress = false;
342 }
343
344 finishPeerRead(peer);
345 }
346
347 private void finishPeerRead(final LocalChannel peer) {
348
349 if (peer.executor() == executor() && !peer.writeInProgress) {
350 finishPeerRead0(peer);
351 } else {
352 runFinishPeerReadTask(peer);
353 }
354 }
355
356 private void runFinishPeerReadTask(final LocalChannel peer) {
357
358
359 final Runnable finishPeerReadTask = () -> finishPeerRead0(peer);
360 try {
361 if (peer.writeInProgress) {
362 peer.finishReadFuture = peer.executor().submit(finishPeerReadTask);
363 } else {
364 peer.executor().execute(finishPeerReadTask);
365 }
366 } catch (Throwable cause) {
367 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
368 close();
369 peer.close();
370 throw cause;
371 }
372 }
373
374 private void releaseInboundBuffers() {
375 assert executor() == null || executor().inEventLoop();
376 readInProgress = false;
377 Queue<Object> inboundBuffer = this.inboundBuffer;
378 Object msg;
379 while ((msg = inboundBuffer.poll()) != null) {
380 Resource.dispose(msg);
381 }
382 }
383
384 private void finishPeerRead0(LocalChannel peer) {
385 Future<?> peerFinishReadFuture = peer.finishReadFuture;
386 if (peerFinishReadFuture != null) {
387 if (!peerFinishReadFuture.isDone()) {
388 runFinishPeerReadTask(peer);
389 return;
390 } else {
391
392 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
393 }
394 }
395
396
397 if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
398 peer.readInProgress = false;
399 peer.readInbound();
400 }
401 }
402
403 @Override
404 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
405 if (state == State.CONNECTED) {
406 throw new AlreadyConnectedException();
407 }
408
409 if (state != State.BOUND) {
410
411 if (localAddress == null) {
412 localAddress = new LocalAddress(LocalChannel.this);
413 }
414 }
415
416 if (localAddress != null) {
417 try {
418 doBind(localAddress);
419 } catch (Throwable t) {
420 closeTransport(newPromise());
421 throw t;
422 }
423 }
424
425 Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
426 if (!(boundChannel instanceof LocalServerChannel)) {
427 Exception cause = new ConnectException("connection refused: " + remoteAddress);
428 closeTransport(newPromise());
429 throw cause;
430 }
431
432 LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
433 peer = serverChannel.serve(LocalChannel.this);
434 return false;
435 }
436
437 @Override
438 protected boolean doFinishConnect(LocalAddress requestedRemoteAddress) throws Exception {
439 final LocalChannel peer = LocalChannel.this.peer;
440 if (peer == null) {
441 return false;
442 }
443 state = State.CONNECTED;
444 remoteAddress = peer.parent().localAddress();
445
446
447
448 peer.writeFlushedAsync();
449 return true;
450 }
451
452 private void writeFlushedAsync() {
453 executor().execute(this::writeFlushed);
454 }
455
456 private void finishConnectAsync() {
457
458 executor().execute(() -> {
459 if (isConnectPending()) {
460 finishConnect();
461 }
462 });
463 }
464
465 @Override
466 public void registerTransportNow() {
467
468
469 LocalChannel peer = this.peer;
470 if (parent() != null && peer != null) {
471
472 state = State.CONNECTED;
473 peer.finishConnectAsync();
474 }
475 }
476
477 @Override
478 public void deregisterTransportNow() {
479
480 }
481
482 @Override
483 public void closeTransportNow() {
484 closeTransport(newPromise());
485 }
486 }