1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.Attribute;
20 import io.netty.util.AttributeKey;
21 import io.netty.util.concurrent.EventExecutor;
22 import io.netty.util.internal.ThrowableUtil;
23 import io.netty.util.internal.logging.InternalLogger;
24 import io.netty.util.internal.logging.InternalLoggerFactory;
25
26 import java.net.SocketAddress;
27
28
29
30
31 public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
32 extends ChannelDuplexHandler {
33
34 private static final InternalLogger logger = InternalLoggerFactory.getInstance(CombinedChannelDuplexHandler.class);
35
36 private DelegatingChannelHandlerContext inboundCtx;
37 private DelegatingChannelHandlerContext outboundCtx;
38 private volatile boolean handlerAdded;
39
40 private I inboundHandler;
41 private O outboundHandler;
42
43
44
45
46
47
48 protected CombinedChannelDuplexHandler() {
49 ensureNotSharable();
50 }
51
52
53
54
55 public CombinedChannelDuplexHandler(I inboundHandler, O outboundHandler) {
56 ensureNotSharable();
57 init(inboundHandler, outboundHandler);
58 }
59
60
61
62
63
64
65
66
67
68 protected final void init(I inboundHandler, O outboundHandler) {
69 validate(inboundHandler, outboundHandler);
70 this.inboundHandler = inboundHandler;
71 this.outboundHandler = outboundHandler;
72 }
73
74 private void validate(I inboundHandler, O outboundHandler) {
75 if (this.inboundHandler != null) {
76 throw new IllegalStateException(
77 "init() can not be invoked if " + CombinedChannelDuplexHandler.class.getSimpleName() +
78 " was constructed with non-default constructor.");
79 }
80
81 if (inboundHandler == null) {
82 throw new NullPointerException("inboundHandler");
83 }
84 if (outboundHandler == null) {
85 throw new NullPointerException("outboundHandler");
86 }
87 if (inboundHandler instanceof ChannelOutboundHandler) {
88 throw new IllegalArgumentException(
89 "inboundHandler must not implement " +
90 ChannelOutboundHandler.class.getSimpleName() + " to get combined.");
91 }
92 if (outboundHandler instanceof ChannelInboundHandler) {
93 throw new IllegalArgumentException(
94 "outboundHandler must not implement " +
95 ChannelInboundHandler.class.getSimpleName() + " to get combined.");
96 }
97 }
98
99 protected final I inboundHandler() {
100 return inboundHandler;
101 }
102
103 protected final O outboundHandler() {
104 return outboundHandler;
105 }
106
107 private void checkAdded() {
108 if (!handlerAdded) {
109 throw new IllegalStateException("handler not added to pipeline yet");
110 }
111 }
112
113
114
115
116 public final void removeInboundHandler() {
117 checkAdded();
118 inboundCtx.remove();
119 }
120
121
122
123
124 public final void removeOutboundHandler() {
125 checkAdded();
126 outboundCtx.remove();
127 }
128
129 @Override
130 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
131 if (inboundHandler == null) {
132 throw new IllegalStateException(
133 "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
134 " if " + CombinedChannelDuplexHandler.class.getSimpleName() +
135 " was constructed with the default constructor.");
136 }
137
138 outboundCtx = new DelegatingChannelHandlerContext(ctx, outboundHandler);
139 inboundCtx = new DelegatingChannelHandlerContext(ctx, inboundHandler) {
140 @SuppressWarnings("deprecation")
141 @Override
142 public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
143 if (!outboundCtx.removed) {
144 try {
145
146
147 outboundHandler.exceptionCaught(outboundCtx, cause);
148 } catch (Throwable error) {
149 if (logger.isDebugEnabled()) {
150 logger.debug(
151 "An exception {}" +
152 "was thrown by a user handler's exceptionCaught() " +
153 "method while handling the following exception:",
154 ThrowableUtil.stackTraceToString(error), cause);
155 } else if (logger.isWarnEnabled()) {
156 logger.warn(
157 "An exception '{}' [enable DEBUG level for full stacktrace] " +
158 "was thrown by a user handler's exceptionCaught() " +
159 "method while handling the following exception:", error, cause);
160 }
161 }
162 } else {
163 super.fireExceptionCaught(cause);
164 }
165 return this;
166 }
167 };
168
169
170
171 handlerAdded = true;
172
173 try {
174 inboundHandler.handlerAdded(inboundCtx);
175 } finally {
176 outboundHandler.handlerAdded(outboundCtx);
177 }
178 }
179
180 @Override
181 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
182 try {
183 inboundCtx.remove();
184 } finally {
185 outboundCtx.remove();
186 }
187 }
188
189 @Override
190 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
191 assert ctx == inboundCtx.ctx;
192 if (!inboundCtx.removed) {
193 inboundHandler.channelRegistered(inboundCtx);
194 } else {
195 inboundCtx.fireChannelRegistered();
196 }
197 }
198
199 @Override
200 public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
201 assert ctx == inboundCtx.ctx;
202 if (!inboundCtx.removed) {
203 inboundHandler.channelUnregistered(inboundCtx);
204 } else {
205 inboundCtx.fireChannelUnregistered();
206 }
207 }
208
209 @Override
210 public void channelActive(ChannelHandlerContext ctx) throws Exception {
211 assert ctx == inboundCtx.ctx;
212 if (!inboundCtx.removed) {
213 inboundHandler.channelActive(inboundCtx);
214 } else {
215 inboundCtx.fireChannelActive();
216 }
217 }
218
219 @Override
220 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
221 assert ctx == inboundCtx.ctx;
222 if (!inboundCtx.removed) {
223 inboundHandler.channelInactive(inboundCtx);
224 } else {
225 inboundCtx.fireChannelInactive();
226 }
227 }
228
229 @Override
230 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
231 assert ctx == inboundCtx.ctx;
232 if (!inboundCtx.removed) {
233 inboundHandler.exceptionCaught(inboundCtx, cause);
234 } else {
235 inboundCtx.fireExceptionCaught(cause);
236 }
237 }
238
239 @Override
240 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
241 assert ctx == inboundCtx.ctx;
242 if (!inboundCtx.removed) {
243 inboundHandler.userEventTriggered(inboundCtx, evt);
244 } else {
245 inboundCtx.fireUserEventTriggered(evt);
246 }
247 }
248
249 @Override
250 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
251 assert ctx == inboundCtx.ctx;
252 if (!inboundCtx.removed) {
253 inboundHandler.channelRead(inboundCtx, msg);
254 } else {
255 inboundCtx.fireChannelRead(msg);
256 }
257 }
258
259 @Override
260 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
261 assert ctx == inboundCtx.ctx;
262 if (!inboundCtx.removed) {
263 inboundHandler.channelReadComplete(inboundCtx);
264 } else {
265 inboundCtx.fireChannelReadComplete();
266 }
267 }
268
269 @Override
270 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
271 assert ctx == inboundCtx.ctx;
272 if (!inboundCtx.removed) {
273 inboundHandler.channelWritabilityChanged(inboundCtx);
274 } else {
275 inboundCtx.fireChannelWritabilityChanged();
276 }
277 }
278
279 @Override
280 public void bind(
281 ChannelHandlerContext ctx,
282 SocketAddress localAddress, ChannelPromise promise) throws Exception {
283 assert ctx == outboundCtx.ctx;
284 if (!outboundCtx.removed) {
285 outboundHandler.bind(outboundCtx, localAddress, promise);
286 } else {
287 outboundCtx.bind(localAddress, promise);
288 }
289 }
290
291 @Override
292 public void connect(
293 ChannelHandlerContext ctx,
294 SocketAddress remoteAddress, SocketAddress localAddress,
295 ChannelPromise promise) throws Exception {
296 assert ctx == outboundCtx.ctx;
297 if (!outboundCtx.removed) {
298 outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise);
299 } else {
300 outboundCtx.connect(localAddress, promise);
301 }
302 }
303
304 @Override
305 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
306 assert ctx == outboundCtx.ctx;
307 if (!outboundCtx.removed) {
308 outboundHandler.disconnect(outboundCtx, promise);
309 } else {
310 outboundCtx.disconnect(promise);
311 }
312 }
313
314 @Override
315 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
316 assert ctx == outboundCtx.ctx;
317 if (!outboundCtx.removed) {
318 outboundHandler.close(outboundCtx, promise);
319 } else {
320 outboundCtx.close(promise);
321 }
322 }
323
324 @Override
325 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
326 assert ctx == outboundCtx.ctx;
327 if (!outboundCtx.removed) {
328 outboundHandler.deregister(outboundCtx, promise);
329 } else {
330 outboundCtx.deregister(promise);
331 }
332 }
333
334 @Override
335 public void read(ChannelHandlerContext ctx) throws Exception {
336 assert ctx == outboundCtx.ctx;
337 if (!outboundCtx.removed) {
338 outboundHandler.read(outboundCtx);
339 } else {
340 outboundCtx.read();
341 }
342 }
343
344 @Override
345 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
346 assert ctx == outboundCtx.ctx;
347 if (!outboundCtx.removed) {
348 outboundHandler.write(outboundCtx, msg, promise);
349 } else {
350 outboundCtx.write(msg, promise);
351 }
352 }
353
354 @Override
355 public void flush(ChannelHandlerContext ctx) throws Exception {
356 assert ctx == outboundCtx.ctx;
357 if (!outboundCtx.removed) {
358 outboundHandler.flush(outboundCtx);
359 } else {
360 outboundCtx.flush();
361 }
362 }
363
364 private static class DelegatingChannelHandlerContext implements ChannelHandlerContext {
365
366 private final ChannelHandlerContext ctx;
367 private final ChannelHandler handler;
368 boolean removed;
369
370 DelegatingChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) {
371 this.ctx = ctx;
372 this.handler = handler;
373 }
374
375 @Override
376 public Channel channel() {
377 return ctx.channel();
378 }
379
380 @Override
381 public EventExecutor executor() {
382 return ctx.executor();
383 }
384
385 @Override
386 public String name() {
387 return ctx.name();
388 }
389
390 @Override
391 public ChannelHandler handler() {
392 return ctx.handler();
393 }
394
395 @Override
396 public boolean isRemoved() {
397 return removed || ctx.isRemoved();
398 }
399
400 @Override
401 public ChannelHandlerContext fireChannelRegistered() {
402 ctx.fireChannelRegistered();
403 return this;
404 }
405
406 @Override
407 public ChannelHandlerContext fireChannelUnregistered() {
408 ctx.fireChannelUnregistered();
409 return this;
410 }
411
412 @Override
413 public ChannelHandlerContext fireChannelActive() {
414 ctx.fireChannelActive();
415 return this;
416 }
417
418 @Override
419 public ChannelHandlerContext fireChannelInactive() {
420 ctx.fireChannelInactive();
421 return this;
422 }
423
424 @Override
425 public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
426 ctx.fireExceptionCaught(cause);
427 return this;
428 }
429
430 @Override
431 public ChannelHandlerContext fireUserEventTriggered(Object event) {
432 ctx.fireUserEventTriggered(event);
433 return this;
434 }
435
436 @Override
437 public ChannelHandlerContext fireChannelRead(Object msg) {
438 ctx.fireChannelRead(msg);
439 return this;
440 }
441
442 @Override
443 public ChannelHandlerContext fireChannelReadComplete() {
444 ctx.fireChannelReadComplete();
445 return this;
446 }
447
448 @Override
449 public ChannelHandlerContext fireChannelWritabilityChanged() {
450 ctx.fireChannelWritabilityChanged();
451 return this;
452 }
453
454 @Override
455 public ChannelFuture bind(SocketAddress localAddress) {
456 return ctx.bind(localAddress);
457 }
458
459 @Override
460 public ChannelFuture connect(SocketAddress remoteAddress) {
461 return ctx.connect(remoteAddress);
462 }
463
464 @Override
465 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
466 return ctx.connect(remoteAddress, localAddress);
467 }
468
469 @Override
470 public ChannelFuture disconnect() {
471 return ctx.disconnect();
472 }
473
474 @Override
475 public ChannelFuture close() {
476 return ctx.close();
477 }
478
479 @Override
480 public ChannelFuture deregister() {
481 return ctx.deregister();
482 }
483
484 @Override
485 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
486 return ctx.bind(localAddress, promise);
487 }
488
489 @Override
490 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
491 return ctx.connect(remoteAddress, promise);
492 }
493
494 @Override
495 public ChannelFuture connect(
496 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
497 return ctx.connect(remoteAddress, localAddress, promise);
498 }
499
500 @Override
501 public ChannelFuture disconnect(ChannelPromise promise) {
502 return ctx.disconnect(promise);
503 }
504
505 @Override
506 public ChannelFuture close(ChannelPromise promise) {
507 return ctx.close(promise);
508 }
509
510 @Override
511 public ChannelFuture deregister(ChannelPromise promise) {
512 return ctx.deregister(promise);
513 }
514
515 @Override
516 public ChannelHandlerContext read() {
517 ctx.read();
518 return this;
519 }
520
521 @Override
522 public ChannelFuture write(Object msg) {
523 return ctx.write(msg);
524 }
525
526 @Override
527 public ChannelFuture write(Object msg, ChannelPromise promise) {
528 return ctx.write(msg, promise);
529 }
530
531 @Override
532 public ChannelHandlerContext flush() {
533 ctx.flush();
534 return this;
535 }
536
537 @Override
538 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
539 return ctx.writeAndFlush(msg, promise);
540 }
541
542 @Override
543 public ChannelFuture writeAndFlush(Object msg) {
544 return ctx.writeAndFlush(msg);
545 }
546
547 @Override
548 public ChannelPipeline pipeline() {
549 return ctx.pipeline();
550 }
551
552 @Override
553 public ByteBufAllocator alloc() {
554 return ctx.alloc();
555 }
556
557 @Override
558 public ChannelPromise newPromise() {
559 return ctx.newPromise();
560 }
561
562 @Override
563 public ChannelProgressivePromise newProgressivePromise() {
564 return ctx.newProgressivePromise();
565 }
566
567 @Override
568 public ChannelFuture newSucceededFuture() {
569 return ctx.newSucceededFuture();
570 }
571
572 @Override
573 public ChannelFuture newFailedFuture(Throwable cause) {
574 return ctx.newFailedFuture(cause);
575 }
576
577 @Override
578 public ChannelPromise voidPromise() {
579 return ctx.voidPromise();
580 }
581
582 @Override
583 public <T> Attribute<T> attr(AttributeKey<T> key) {
584 return ctx.attr(key);
585 }
586
587 final void remove() {
588 EventExecutor executor = executor();
589 if (executor.inEventLoop()) {
590 remove0();
591 } else {
592 executor.execute(new Runnable() {
593 @Override
594 public void run() {
595 remove0();
596 }
597 });
598 }
599 }
600
601 private void remove0() {
602 if (!removed) {
603 removed = true;
604 try {
605 handler.handlerRemoved(this);
606 } catch (Throwable cause) {
607 fireExceptionCaught(new ChannelPipelineException(
608 handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause));
609 }
610 }
611 }
612 }
613 }