1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.local;
17
18 import io.netty5.buffer.api.DefaultBufferAllocators;
19 import io.netty5.channel.AbstractServerChannel;
20 import io.netty5.channel.ChannelOption;
21 import io.netty5.channel.ChannelPipeline;
22 import io.netty5.channel.ChannelShutdownDirection;
23 import io.netty5.channel.EventLoop;
24 import io.netty5.channel.EventLoopGroup;
25 import io.netty5.channel.RecvBufferAllocator;
26 import io.netty5.channel.ServerChannel;
27 import io.netty5.channel.ServerChannelRecvBufferAllocator;
28
29 import java.net.SocketAddress;
30 import java.util.ArrayDeque;
31 import java.util.Queue;
32
33
34
35
36 public class LocalServerChannel extends AbstractServerChannel<LocalChannel, LocalAddress, LocalAddress>
37 implements LocalChannelUnsafe {
38
39 private final Queue<Object> inboundBuffer = new ArrayDeque<>();
40 private volatile int state;
41 private volatile LocalAddress localAddress;
42 private volatile boolean acceptInProgress;
43
44 public LocalServerChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup) {
45 super(eventLoop, childEventLoopGroup, LocalChannel.class);
46 setOption(ChannelOption.BUFFER_ALLOCATOR, DefaultBufferAllocators.onHeapAllocator());
47 }
48
49 @Override
50 public boolean isOpen() {
51 return state < 2;
52 }
53
54 @Override
55 public boolean isActive() {
56 return state == 1;
57 }
58
59 @Override
60 protected LocalAddress localAddress0() {
61 return localAddress;
62 }
63
64 @Override
65 protected void doBind(SocketAddress localAddress) throws Exception {
66 this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
67 state = 1;
68 }
69
70 @Override
71 protected void doClose() throws Exception {
72 if (state <= 1) {
73
74 if (localAddress != null) {
75 LocalChannelRegistry.unregister(localAddress);
76 localAddress = null;
77 }
78 state = 2;
79 }
80 }
81
82 @Override
83 protected void doBeginRead() throws Exception {
84 if (acceptInProgress) {
85 return;
86 }
87
88 Queue<Object> inboundBuffer = this.inboundBuffer;
89 if (inboundBuffer.isEmpty()) {
90 acceptInProgress = true;
91 return;
92 }
93
94 readInbound();
95 }
96
97 LocalChannel serve(final LocalChannel peer) {
98 final LocalChannel child = newLocalChannel(peer);
99 if (executor().inEventLoop()) {
100 serve0(child);
101 } else {
102 executor().execute(() -> serve0(child));
103 }
104 return child;
105 }
106
107 private void readInbound() {
108 RecvBufferAllocator.Handle handle = recvBufAllocHandle();
109 handle.reset();
110 ChannelPipeline pipeline = pipeline();
111 do {
112 Object m = inboundBuffer.poll();
113 if (m == null) {
114 break;
115 }
116 pipeline.fireChannelRead(m);
117 } while (handle.continueReading(isAutoRead()) && !isShutdown(ChannelShutdownDirection.Inbound));
118
119 pipeline.fireChannelReadComplete();
120 readIfIsAutoRead();
121 }
122
123
124
125
126
127 protected LocalChannel newLocalChannel(LocalChannel peer) {
128 return new LocalChannel(this, childEventLoopGroup().next(), peer);
129 }
130
131 private void serve0(final LocalChannel child) {
132 inboundBuffer.add(child);
133 if (acceptInProgress) {
134 acceptInProgress = false;
135
136 readInbound();
137 }
138 }
139
140 @Override
141 public void registerTransportNow() {
142 }
143
144 @Override
145 public void deregisterTransportNow() {
146 }
147
148 @Override
149 public void closeTransportNow() {
150 closeTransport(newPromise());
151 }
152 }