1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.channel.AbstractServerChannel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelPipeline;
21 import io.netty.channel.DefaultChannelConfig;
22 import io.netty.channel.EventLoop;
23 import io.netty.channel.ServerChannel;
24 import io.netty.channel.SingleThreadEventLoop;
25 import io.netty.util.concurrent.SingleThreadEventExecutor;
26
27 import java.net.SocketAddress;
28 import java.util.ArrayDeque;
29 import java.util.Queue;
30
31
32
33
34 public class LocalServerChannel extends AbstractServerChannel {
35
36 private final ChannelConfig config = new DefaultChannelConfig(this);
37 private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
38 private final Runnable shutdownHook = new Runnable() {
39 @Override
40 public void run() {
41 unsafe().close(unsafe().voidPromise());
42 }
43 };
44
45 private volatile int state;
46 private volatile LocalAddress localAddress;
47 private volatile boolean acceptInProgress;
48
49 @Override
50 public ChannelConfig config() {
51 return config;
52 }
53
54 @Override
55 public LocalAddress localAddress() {
56 return (LocalAddress) super.localAddress();
57 }
58
59 @Override
60 public LocalAddress remoteAddress() {
61 return (LocalAddress) super.remoteAddress();
62 }
63
64 @Override
65 public boolean isOpen() {
66 return state < 2;
67 }
68
69 @Override
70 public boolean isActive() {
71 return state == 1;
72 }
73
74 @Override
75 protected boolean isCompatible(EventLoop loop) {
76 return loop instanceof SingleThreadEventLoop;
77 }
78
79 @Override
80 protected SocketAddress localAddress0() {
81 return localAddress;
82 }
83
84 @Override
85 protected void doRegister() throws Exception {
86 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
87 }
88
89 @Override
90 protected void doBind(SocketAddress localAddress) throws Exception {
91 this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
92 state = 1;
93 }
94
95 @Override
96 protected void doClose() throws Exception {
97 if (state <= 1) {
98
99 if (localAddress != null) {
100 LocalChannelRegistry.unregister(localAddress);
101 localAddress = null;
102 }
103 state = 2;
104 }
105 }
106
107 @Override
108 protected void doDeregister() throws Exception {
109 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
110 }
111
112 @Override
113 protected void doBeginRead() throws Exception {
114 if (acceptInProgress) {
115 return;
116 }
117
118 Queue<Object> inboundBuffer = this.inboundBuffer;
119 if (inboundBuffer.isEmpty()) {
120 acceptInProgress = true;
121 return;
122 }
123
124 ChannelPipeline pipeline = pipeline();
125 for (;;) {
126 Object m = inboundBuffer.poll();
127 if (m == null) {
128 break;
129 }
130 pipeline.fireChannelRead(m);
131 }
132 pipeline.fireChannelReadComplete();
133 }
134
135 LocalChannel serve(final LocalChannel peer) {
136 final LocalChannel child = newLocalChannel(peer);
137 if (eventLoop().inEventLoop()) {
138 serve0(child);
139 } else {
140 eventLoop().execute(new Runnable() {
141 @Override
142 public void run() {
143 serve0(child);
144 }
145 });
146 }
147 return child;
148 }
149
150
151
152
153
154 protected LocalChannel newLocalChannel(LocalChannel peer) {
155 return new LocalChannel(this, peer);
156 }
157
158 private void serve0(final LocalChannel child) {
159 inboundBuffer.add(child);
160 if (acceptInProgress) {
161 acceptInProgress = false;
162 ChannelPipeline pipeline = pipeline();
163 for (;;) {
164 Object m = inboundBuffer.poll();
165 if (m == null) {
166 break;
167 }
168 pipeline.fireChannelRead(m);
169 }
170 pipeline.fireChannelReadComplete();
171 }
172 }
173 }