1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.Attribute;
20 import io.netty.util.AttributeKey;
21 import io.netty.util.concurrent.EventExecutor;
22 import io.netty.util.internal.ObjectUtil;
23 import io.netty.util.internal.logging.InternalLogger;
24 import io.netty.util.internal.logging.InternalLoggerFactory;
25
26 import java.net.SocketAddress;
27
28
29
30
31 public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
32 extends ChannelDuplexHandler {
33
34 private static final InternalLogger logger = InternalLoggerFactory.getInstance(CombinedChannelDuplexHandler.class);
35
36 private DelegatingChannelHandlerContext inboundCtx;
37 private DelegatingChannelHandlerContext outboundCtx;
38 private volatile boolean handlerAdded;
39
40 private I inboundHandler;
41 private O outboundHandler;
42
43
44
45
46
47
48 protected CombinedChannelDuplexHandler() {
49 ensureNotSharable();
50 }
51
52
53
54
55 public CombinedChannelDuplexHandler(I inboundHandler, O outboundHandler) {
56 ensureNotSharable();
57 init(inboundHandler, outboundHandler);
58 }
59
60
61
62
63
64
65
66
67
68 protected final void init(I inboundHandler, O outboundHandler) {
69 validate(inboundHandler, outboundHandler);
70 this.inboundHandler = inboundHandler;
71 this.outboundHandler = outboundHandler;
72 }
73
74 private void validate(I inboundHandler, O outboundHandler) {
75 if (this.inboundHandler != null) {
76 throw new IllegalStateException(
77 "init() can not be invoked if " + CombinedChannelDuplexHandler.class.getSimpleName() +
78 " was constructed with non-default constructor.");
79 }
80
81 ObjectUtil.checkNotNull(inboundHandler, "inboundHandler");
82 ObjectUtil.checkNotNull(outboundHandler, "outboundHandler");
83
84 if (inboundHandler instanceof ChannelOutboundHandler) {
85 throw new IllegalArgumentException(
86 "inboundHandler must not implement " +
87 ChannelOutboundHandler.class.getSimpleName() + " to get combined.");
88 }
89 if (outboundHandler instanceof ChannelInboundHandler) {
90 throw new IllegalArgumentException(
91 "outboundHandler must not implement " +
92 ChannelInboundHandler.class.getSimpleName() + " to get combined.");
93 }
94 }
95
96 protected final I inboundHandler() {
97 return inboundHandler;
98 }
99
100 protected final O outboundHandler() {
101 return outboundHandler;
102 }
103
104 private void checkAdded() {
105 if (!handlerAdded) {
106 throw new IllegalStateException("handler not added to pipeline yet");
107 }
108 }
109
110
111
112
113 public final void removeInboundHandler() {
114 checkAdded();
115 inboundCtx.remove();
116 }
117
118
119
120
121 public final void removeOutboundHandler() {
122 checkAdded();
123 outboundCtx.remove();
124 }
125
126 @Override
127 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
128 if (inboundHandler == null) {
129 throw new IllegalStateException(
130 "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
131 " if " + CombinedChannelDuplexHandler.class.getSimpleName() +
132 " was constructed with the default constructor.");
133 }
134
135 outboundCtx = new DelegatingChannelHandlerContext(ctx, outboundHandler);
136 inboundCtx = new DelegatingChannelHandlerContext(ctx, inboundHandler) {
137 @SuppressWarnings("deprecation")
138 @Override
139 public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
140 if (!outboundCtx.removed) {
141 try {
142
143
144 outboundHandler.exceptionCaught(outboundCtx, cause);
145 } catch (Throwable error) {
146 if (logger.isDebugEnabled()) {
147 logger.debug(
148 "An exception " +
149 "was thrown by a user handler's exceptionCaught() " +
150 "method while handling the following exception:", cause);
151 } else if (logger.isWarnEnabled()) {
152 logger.warn(
153 "An exception '{}' [enable DEBUG level for full stacktrace] " +
154 "was thrown by a user handler's exceptionCaught() " +
155 "method while handling the following exception:", error, cause);
156 }
157 }
158 } else {
159 super.fireExceptionCaught(cause);
160 }
161 return this;
162 }
163 };
164
165
166
167 handlerAdded = true;
168
169 try {
170 inboundHandler.handlerAdded(inboundCtx);
171 } finally {
172 outboundHandler.handlerAdded(outboundCtx);
173 }
174 }
175
176 @Override
177 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
178 try {
179 inboundCtx.remove();
180 } finally {
181 outboundCtx.remove();
182 }
183 }
184
185 @Override
186 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
187 assert ctx == inboundCtx.ctx;
188 if (!inboundCtx.removed) {
189 inboundHandler.channelRegistered(inboundCtx);
190 } else {
191 inboundCtx.fireChannelRegistered();
192 }
193 }
194
195 @Override
196 public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
197 assert ctx == inboundCtx.ctx;
198 if (!inboundCtx.removed) {
199 inboundHandler.channelUnregistered(inboundCtx);
200 } else {
201 inboundCtx.fireChannelUnregistered();
202 }
203 }
204
205 @Override
206 public void channelActive(ChannelHandlerContext ctx) throws Exception {
207 assert ctx == inboundCtx.ctx;
208 if (!inboundCtx.removed) {
209 inboundHandler.channelActive(inboundCtx);
210 } else {
211 inboundCtx.fireChannelActive();
212 }
213 }
214
215 @Override
216 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
217 assert ctx == inboundCtx.ctx;
218 if (!inboundCtx.removed) {
219 inboundHandler.channelInactive(inboundCtx);
220 } else {
221 inboundCtx.fireChannelInactive();
222 }
223 }
224
225 @Override
226 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
227 assert ctx == inboundCtx.ctx;
228 if (!inboundCtx.removed) {
229 inboundHandler.exceptionCaught(inboundCtx, cause);
230 } else {
231 inboundCtx.fireExceptionCaught(cause);
232 }
233 }
234
235 @Override
236 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
237 assert ctx == inboundCtx.ctx;
238 if (!inboundCtx.removed) {
239 inboundHandler.userEventTriggered(inboundCtx, evt);
240 } else {
241 inboundCtx.fireUserEventTriggered(evt);
242 }
243 }
244
245 @Override
246 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
247 assert ctx == inboundCtx.ctx;
248 if (!inboundCtx.removed) {
249 inboundHandler.channelRead(inboundCtx, msg);
250 } else {
251 inboundCtx.fireChannelRead(msg);
252 }
253 }
254
255 @Override
256 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
257 assert ctx == inboundCtx.ctx;
258 if (!inboundCtx.removed) {
259 inboundHandler.channelReadComplete(inboundCtx);
260 } else {
261 inboundCtx.fireChannelReadComplete();
262 }
263 }
264
265 @Override
266 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
267 assert ctx == inboundCtx.ctx;
268 if (!inboundCtx.removed) {
269 inboundHandler.channelWritabilityChanged(inboundCtx);
270 } else {
271 inboundCtx.fireChannelWritabilityChanged();
272 }
273 }
274
275 @Override
276 public void bind(
277 ChannelHandlerContext ctx,
278 SocketAddress localAddress, ChannelPromise promise) throws Exception {
279 assert ctx == outboundCtx.ctx;
280 if (!outboundCtx.removed) {
281 outboundHandler.bind(outboundCtx, localAddress, promise);
282 } else {
283 outboundCtx.bind(localAddress, promise);
284 }
285 }
286
287 @Override
288 public void connect(
289 ChannelHandlerContext ctx,
290 SocketAddress remoteAddress, SocketAddress localAddress,
291 ChannelPromise promise) throws Exception {
292 assert ctx == outboundCtx.ctx;
293 if (!outboundCtx.removed) {
294 outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise);
295 } else {
296 outboundCtx.connect(remoteAddress, localAddress, promise);
297 }
298 }
299
300 @Override
301 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
302 assert ctx == outboundCtx.ctx;
303 if (!outboundCtx.removed) {
304 outboundHandler.disconnect(outboundCtx, promise);
305 } else {
306 outboundCtx.disconnect(promise);
307 }
308 }
309
310 @Override
311 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
312 assert ctx == outboundCtx.ctx;
313 if (!outboundCtx.removed) {
314 outboundHandler.close(outboundCtx, promise);
315 } else {
316 outboundCtx.close(promise);
317 }
318 }
319
320 @Override
321 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
322 assert ctx == outboundCtx.ctx;
323 if (!outboundCtx.removed) {
324 outboundHandler.deregister(outboundCtx, promise);
325 } else {
326 outboundCtx.deregister(promise);
327 }
328 }
329
330 @Override
331 public void read(ChannelHandlerContext ctx) throws Exception {
332 assert ctx == outboundCtx.ctx;
333 if (!outboundCtx.removed) {
334 outboundHandler.read(outboundCtx);
335 } else {
336 outboundCtx.read();
337 }
338 }
339
340 @Override
341 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
342 assert ctx == outboundCtx.ctx;
343 if (!outboundCtx.removed) {
344 outboundHandler.write(outboundCtx, msg, promise);
345 } else {
346 outboundCtx.write(msg, promise);
347 }
348 }
349
350 @Override
351 public void flush(ChannelHandlerContext ctx) throws Exception {
352 assert ctx == outboundCtx.ctx;
353 if (!outboundCtx.removed) {
354 outboundHandler.flush(outboundCtx);
355 } else {
356 outboundCtx.flush();
357 }
358 }
359
360 private static class DelegatingChannelHandlerContext implements ChannelHandlerContext {
361
362 private final ChannelHandlerContext ctx;
363 private final ChannelHandler handler;
364 boolean removed;
365
366 DelegatingChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) {
367 this.ctx = ctx;
368 this.handler = handler;
369 }
370
371 @Override
372 public Channel channel() {
373 return ctx.channel();
374 }
375
376 @Override
377 public EventExecutor executor() {
378 return ctx.executor();
379 }
380
381 @Override
382 public String name() {
383 return ctx.name();
384 }
385
386 @Override
387 public ChannelHandler handler() {
388 return ctx.handler();
389 }
390
391 @Override
392 public boolean isRemoved() {
393 return removed || ctx.isRemoved();
394 }
395
396 @Override
397 public ChannelHandlerContext fireChannelRegistered() {
398 ctx.fireChannelRegistered();
399 return this;
400 }
401
402 @Override
403 public ChannelHandlerContext fireChannelUnregistered() {
404 ctx.fireChannelUnregistered();
405 return this;
406 }
407
408 @Override
409 public ChannelHandlerContext fireChannelActive() {
410 ctx.fireChannelActive();
411 return this;
412 }
413
414 @Override
415 public ChannelHandlerContext fireChannelInactive() {
416 ctx.fireChannelInactive();
417 return this;
418 }
419
420 @Override
421 public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
422 ctx.fireExceptionCaught(cause);
423 return this;
424 }
425
426 @Override
427 public ChannelHandlerContext fireUserEventTriggered(Object event) {
428 ctx.fireUserEventTriggered(event);
429 return this;
430 }
431
432 @Override
433 public ChannelHandlerContext fireChannelRead(Object msg) {
434 ctx.fireChannelRead(msg);
435 return this;
436 }
437
438 @Override
439 public ChannelHandlerContext fireChannelReadComplete() {
440 ctx.fireChannelReadComplete();
441 return this;
442 }
443
444 @Override
445 public ChannelHandlerContext fireChannelWritabilityChanged() {
446 ctx.fireChannelWritabilityChanged();
447 return this;
448 }
449
450 @Override
451 public ChannelFuture bind(SocketAddress localAddress) {
452 return ctx.bind(localAddress);
453 }
454
455 @Override
456 public ChannelFuture connect(SocketAddress remoteAddress) {
457 return ctx.connect(remoteAddress);
458 }
459
460 @Override
461 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
462 return ctx.connect(remoteAddress, localAddress);
463 }
464
465 @Override
466 public ChannelFuture disconnect() {
467 return ctx.disconnect();
468 }
469
470 @Override
471 public ChannelFuture close() {
472 return ctx.close();
473 }
474
475 @Override
476 public ChannelFuture deregister() {
477 return ctx.deregister();
478 }
479
480 @Override
481 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
482 return ctx.bind(localAddress, promise);
483 }
484
485 @Override
486 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
487 return ctx.connect(remoteAddress, promise);
488 }
489
490 @Override
491 public ChannelFuture connect(
492 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
493 return ctx.connect(remoteAddress, localAddress, promise);
494 }
495
496 @Override
497 public ChannelFuture disconnect(ChannelPromise promise) {
498 return ctx.disconnect(promise);
499 }
500
501 @Override
502 public ChannelFuture close(ChannelPromise promise) {
503 return ctx.close(promise);
504 }
505
506 @Override
507 public ChannelFuture deregister(ChannelPromise promise) {
508 return ctx.deregister(promise);
509 }
510
511 @Override
512 public ChannelHandlerContext read() {
513 ctx.read();
514 return this;
515 }
516
517 @Override
518 public ChannelFuture write(Object msg) {
519 return ctx.write(msg);
520 }
521
522 @Override
523 public ChannelFuture write(Object msg, ChannelPromise promise) {
524 return ctx.write(msg, promise);
525 }
526
527 @Override
528 public ChannelHandlerContext flush() {
529 ctx.flush();
530 return this;
531 }
532
533 @Override
534 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
535 return ctx.writeAndFlush(msg, promise);
536 }
537
538 @Override
539 public ChannelFuture writeAndFlush(Object msg) {
540 return ctx.writeAndFlush(msg);
541 }
542
543 @Override
544 public ChannelPipeline pipeline() {
545 return ctx.pipeline();
546 }
547
548 @Override
549 public ByteBufAllocator alloc() {
550 return ctx.alloc();
551 }
552
553 @Override
554 public ChannelPromise newPromise() {
555 return ctx.newPromise();
556 }
557
558 @Override
559 public ChannelProgressivePromise newProgressivePromise() {
560 return ctx.newProgressivePromise();
561 }
562
563 @Override
564 public ChannelFuture newSucceededFuture() {
565 return ctx.newSucceededFuture();
566 }
567
568 @Override
569 public ChannelFuture newFailedFuture(Throwable cause) {
570 return ctx.newFailedFuture(cause);
571 }
572
573 @Override
574 public ChannelPromise voidPromise() {
575 return ctx.voidPromise();
576 }
577
578 @Override
579 public <T> Attribute<T> attr(AttributeKey<T> key) {
580 return ctx.channel().attr(key);
581 }
582
583 @Override
584 public <T> boolean hasAttr(AttributeKey<T> key) {
585 return ctx.channel().hasAttr(key);
586 }
587
588 final void remove() {
589 EventExecutor executor = executor();
590 if (executor.inEventLoop()) {
591 remove0();
592 } else {
593 executor.execute(new Runnable() {
594 @Override
595 public void run() {
596 remove0();
597 }
598 });
599 }
600 }
601
602 private void remove0() {
603 if (!removed) {
604 removed = true;
605 try {
606 handler.handlerRemoved(this);
607 } catch (Throwable cause) {
608 fireExceptionCaught(new ChannelPipelineException(
609 handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause));
610 }
611 }
612 }
613 }
614 }