1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel;
17
18 import io.netty5.channel.internal.DelegatingChannelHandlerContext;
19 import io.netty5.util.concurrent.EventExecutor;
20 import io.netty5.util.concurrent.Future;
21
22 import java.net.SocketAddress;
23
24 import static java.util.Objects.requireNonNull;
25
26
27
28
29
30 public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends ChannelHandler>
31 extends ChannelHandlerAdapter {
32
33 private CombinedChannelHandlerContext inboundCtx;
34 private CombinedChannelHandlerContext outboundCtx;
35 private volatile boolean handlerAdded;
36
37 private I inboundHandler;
38 private O outboundHandler;
39
40
41
42
43
44
45 protected CombinedChannelDuplexHandler() {
46 }
47
48
49
50
51 public CombinedChannelDuplexHandler(I inboundHandler, O outboundHandler) {
52 init(inboundHandler, outboundHandler);
53 }
54
55 @Override
56 public final boolean isSharable() {
57
58 return false;
59 }
60
61
62
63
64
65
66
67
68
69 protected final void init(I inboundHandler, O outboundHandler) {
70 validate(inboundHandler, outboundHandler);
71 this.inboundHandler = inboundHandler;
72 this.outboundHandler = outboundHandler;
73 }
74
75 private void validate(I inboundHandler, O outboundHandler) {
76 if (this.inboundHandler != null) {
77 throw new IllegalStateException(
78 "init() can not be invoked if " + CombinedChannelDuplexHandler.class.getSimpleName() +
79 " was constructed with non-default constructor.");
80 }
81
82 requireNonNull(inboundHandler, "inboundHandler");
83 requireNonNull(outboundHandler, "outboundHandler");
84 if (ChannelHandlerMask.isOutbound(inboundHandler.getClass())) {
85 throw new IllegalArgumentException(
86 "inboundHandler must not implement any outbound method to get combined.");
87 }
88 if (ChannelHandlerMask.isInbound(outboundHandler.getClass())) {
89 throw new IllegalArgumentException(
90 "outboundHandler must not implement any inbound method to get combined.");
91 }
92 }
93
94 protected final I inboundHandler() {
95 return inboundHandler;
96 }
97
98 protected final O outboundHandler() {
99 return outboundHandler;
100 }
101
102 private void checkAdded() {
103 if (!handlerAdded) {
104 throw new IllegalStateException("handler not added to pipeline yet");
105 }
106 }
107
108
109
110
111 public final void removeInboundHandler() {
112 checkAdded();
113 inboundCtx.remove();
114 }
115
116
117
118
119 public final void removeOutboundHandler() {
120 checkAdded();
121 outboundCtx.remove();
122 }
123
124 @Override
125 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
126 if (inboundHandler == null) {
127 throw new IllegalStateException(
128 "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
129 " if " + CombinedChannelDuplexHandler.class.getSimpleName() +
130 " was constructed with the default constructor.");
131 }
132
133 outboundCtx = new CombinedChannelHandlerContext(ctx, outboundHandler);
134 inboundCtx = new CombinedChannelHandlerContext(ctx, inboundHandler);
135
136
137
138 handlerAdded = true;
139
140 try {
141 inboundHandler.handlerAdded(inboundCtx);
142 } finally {
143 outboundHandler.handlerAdded(outboundCtx);
144 }
145 }
146
147 @Override
148 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
149 try {
150 inboundCtx.remove();
151 } finally {
152 outboundCtx.remove();
153 }
154 }
155
156 @Override
157 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
158 assert ctx == inboundCtx.delegatingCtx();
159 if (!inboundCtx.removed) {
160 inboundHandler.channelRegistered(inboundCtx);
161 } else {
162 inboundCtx.fireChannelRegistered();
163 }
164 }
165
166 @Override
167 public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
168 assert ctx == inboundCtx.delegatingCtx();
169 if (!inboundCtx.removed) {
170 inboundHandler.channelUnregistered(inboundCtx);
171 } else {
172 inboundCtx.fireChannelUnregistered();
173 }
174 }
175
176 @Override
177 public void channelActive(ChannelHandlerContext ctx) throws Exception {
178 assert ctx == inboundCtx.delegatingCtx();
179 if (!inboundCtx.removed) {
180 inboundHandler.channelActive(inboundCtx);
181 } else {
182 inboundCtx.fireChannelActive();
183 }
184 }
185
186 @Override
187 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
188 assert ctx == inboundCtx.delegatingCtx();
189 if (!inboundCtx.removed) {
190 inboundHandler.channelInactive(inboundCtx);
191 } else {
192 inboundCtx.fireChannelInactive();
193 }
194 }
195
196 @Override
197 public void channelShutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) throws Exception {
198 assert ctx == inboundCtx.delegatingCtx();
199 if (!inboundCtx.removed) {
200 inboundHandler.channelShutdown(inboundCtx, direction);
201 } else {
202 inboundCtx.fireChannelInactive();
203 }
204 }
205
206 @Override
207 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
208 assert ctx == inboundCtx.delegatingCtx();
209 if (!inboundCtx.removed) {
210 inboundHandler.channelExceptionCaught(inboundCtx, cause);
211 } else {
212 inboundCtx.fireChannelExceptionCaught(cause);
213 }
214 }
215
216 @Override
217 public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) throws Exception {
218 assert ctx == inboundCtx.delegatingCtx();
219 if (!inboundCtx.removed) {
220 inboundHandler.channelInboundEvent(inboundCtx, evt);
221 } else {
222 inboundCtx.fireChannelInboundEvent(evt);
223 }
224 }
225
226 @Override
227 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
228 assert ctx == inboundCtx.delegatingCtx();
229 if (!inboundCtx.removed) {
230 inboundHandler.channelRead(inboundCtx, msg);
231 } else {
232 inboundCtx.fireChannelRead(msg);
233 }
234 }
235
236 @Override
237 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
238 assert ctx == inboundCtx.delegatingCtx();
239 if (!inboundCtx.removed) {
240 inboundHandler.channelReadComplete(inboundCtx);
241 } else {
242 inboundCtx.fireChannelReadComplete();
243 }
244 }
245
246 @Override
247 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
248 assert ctx == inboundCtx.delegatingCtx();
249 if (!inboundCtx.removed) {
250 inboundHandler.channelWritabilityChanged(inboundCtx);
251 } else {
252 inboundCtx.fireChannelWritabilityChanged();
253 }
254 }
255
256 @Override
257 public Future<Void> bind(
258 ChannelHandlerContext ctx,
259 SocketAddress localAddress) {
260 assert ctx == outboundCtx.delegatingCtx();
261 if (!outboundCtx.removed) {
262 return outboundHandler.bind(outboundCtx, localAddress);
263 } else {
264 return outboundCtx.bind(localAddress);
265 }
266 }
267
268 @Override
269 public Future<Void> connect(
270 ChannelHandlerContext ctx,
271 SocketAddress remoteAddress, SocketAddress localAddress) {
272 assert ctx == outboundCtx.delegatingCtx();
273 if (!outboundCtx.removed) {
274 return outboundHandler.connect(outboundCtx, remoteAddress, localAddress);
275 } else {
276 return outboundCtx.connect(remoteAddress, localAddress);
277 }
278 }
279
280 @Override
281 public Future<Void> disconnect(ChannelHandlerContext ctx) {
282 assert ctx == outboundCtx.delegatingCtx();
283 if (!outboundCtx.removed) {
284 return outboundHandler.disconnect(outboundCtx);
285 } else {
286 return outboundCtx.disconnect();
287 }
288 }
289
290 @Override
291 public Future<Void> close(ChannelHandlerContext ctx) {
292 assert ctx == outboundCtx.delegatingCtx();
293 if (!outboundCtx.removed) {
294 return outboundHandler.close(outboundCtx);
295 } else {
296 return outboundCtx.close();
297 }
298 }
299
300 @Override
301 public Future<Void> shutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) {
302 assert ctx == outboundCtx.delegatingCtx();
303 if (!outboundCtx.removed) {
304 return outboundHandler.shutdown(outboundCtx, direction);
305 } else {
306 return outboundCtx.shutdown(direction);
307 }
308 }
309
310 @Override
311 public Future<Void> register(ChannelHandlerContext ctx) {
312 assert ctx == outboundCtx.delegatingCtx();
313 if (!outboundCtx.removed) {
314 return outboundHandler.register(outboundCtx);
315 } else {
316 return outboundCtx.register();
317 }
318 }
319
320 @Override
321 public Future<Void> deregister(ChannelHandlerContext ctx) {
322 assert ctx == outboundCtx.delegatingCtx();
323 if (!outboundCtx.removed) {
324 return outboundHandler.deregister(outboundCtx);
325 } else {
326 return outboundCtx.deregister();
327 }
328 }
329
330 @Override
331 public void read(ChannelHandlerContext ctx) {
332 assert ctx == outboundCtx.delegatingCtx();
333 if (!outboundCtx.removed) {
334 outboundHandler.read(outboundCtx);
335 } else {
336 outboundCtx.read();
337 }
338 }
339
340 @Override
341 public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
342 assert ctx == outboundCtx.delegatingCtx();
343 if (!outboundCtx.removed) {
344 return outboundHandler.write(outboundCtx, msg);
345 } else {
346 return outboundCtx.write(msg);
347 }
348 }
349
350 @Override
351 public void flush(ChannelHandlerContext ctx) {
352 assert ctx == outboundCtx.delegatingCtx();
353 if (!outboundCtx.removed) {
354 outboundHandler.flush(outboundCtx);
355 } else {
356 outboundCtx.flush();
357 }
358 }
359
360 @Override
361 public Future<Void> sendOutboundEvent(ChannelHandlerContext ctx, Object event) {
362 assert ctx == outboundCtx.delegatingCtx();
363 if (!outboundCtx.removed) {
364 return outboundHandler.sendOutboundEvent(outboundCtx, event);
365 } else {
366 return outboundCtx.sendOutboundEvent(event);
367 }
368 }
369
370 @Override
371 public long pendingOutboundBytes(ChannelHandlerContext ctx) {
372 if (!outboundCtx.removed) {
373 return outboundCtx.handler().pendingOutboundBytes(outboundCtx);
374 }
375 return 0;
376 }
377
378 private static final class CombinedChannelHandlerContext extends DelegatingChannelHandlerContext {
379
380 private final ChannelHandler handler;
381 boolean removed;
382
383 CombinedChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) {
384 super(ctx);
385 this.handler = handler;
386 }
387
388 @Override
389 public boolean isRemoved() {
390 return delegatingCtx().isRemoved() || removed;
391 }
392
393 @Override
394 public ChannelHandler handler() {
395 return handler;
396 }
397
398 void remove() {
399 EventExecutor executor = executor();
400 if (executor.inEventLoop()) {
401 remove0();
402 } else {
403 executor.execute(this::remove0);
404 }
405 }
406
407 private void remove0() {
408 if (!removed) {
409 removed = true;
410 try {
411 handler.handlerRemoved(this);
412 } catch (Throwable cause) {
413 this.fireChannelExceptionCaught(new ChannelPipelineException(
414 handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause));
415 }
416 }
417 }
418 }
419 }