View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.internal.ThrowableUtil;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  
26  import java.net.SocketAddress;
27  
28  /**
29   *  Combines a {@link ChannelInboundHandler} and a {@link ChannelOutboundHandler} into one {@link ChannelHandler}.
30   */
31  public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
32          extends ChannelDuplexHandler {
33  
34      private static final InternalLogger logger = InternalLoggerFactory.getInstance(CombinedChannelDuplexHandler.class);
35  
36      private DelegatingChannelHandlerContext inboundCtx;
37      private DelegatingChannelHandlerContext outboundCtx;
38      private volatile boolean handlerAdded;
39  
40      private I inboundHandler;
41      private O outboundHandler;
42  
43      /**
44       * Creates a new uninitialized instance. A class that extends this handler must invoke
45       * {@link #init(ChannelInboundHandler, ChannelOutboundHandler)} before adding this handler into a
46       * {@link ChannelPipeline}.
47       */
48      protected CombinedChannelDuplexHandler() {
49          ensureNotSharable();
50      }
51  
52      /**
53       * Creates a new instance that combines the specified two handlers into one.
54       */
55      public CombinedChannelDuplexHandler(I inboundHandler, O outboundHandler) {
56          ensureNotSharable();
57          init(inboundHandler, outboundHandler);
58      }
59  
60      /**
61       * Initialized this handler with the specified handlers.
62       *
63       * @throws IllegalStateException if this handler was not constructed via the default constructor or
64       *                               if this handler does not implement all required handler interfaces
65       * @throws IllegalArgumentException if the specified handlers cannot be combined into one due to a conflict
66       *                                  in the type hierarchy
67       */
68      protected final void init(I inboundHandler, O outboundHandler) {
69          validate(inboundHandler, outboundHandler);
70          this.inboundHandler = inboundHandler;
71          this.outboundHandler = outboundHandler;
72      }
73  
74      private void validate(I inboundHandler, O outboundHandler) {
75          if (this.inboundHandler != null) {
76              throw new IllegalStateException(
77                      "init() can not be invoked if " + CombinedChannelDuplexHandler.class.getSimpleName() +
78                              " was constructed with non-default constructor.");
79          }
80  
81          if (inboundHandler == null) {
82              throw new NullPointerException("inboundHandler");
83          }
84          if (outboundHandler == null) {
85              throw new NullPointerException("outboundHandler");
86          }
87          if (inboundHandler instanceof ChannelOutboundHandler) {
88              throw new IllegalArgumentException(
89                      "inboundHandler must not implement " +
90                      ChannelOutboundHandler.class.getSimpleName() + " to get combined.");
91          }
92          if (outboundHandler instanceof ChannelInboundHandler) {
93              throw new IllegalArgumentException(
94                      "outboundHandler must not implement " +
95                      ChannelInboundHandler.class.getSimpleName() + " to get combined.");
96          }
97      }
98  
99      protected final I inboundHandler() {
100         return inboundHandler;
101     }
102 
103     protected final O outboundHandler() {
104         return outboundHandler;
105     }
106 
107     private void checkAdded() {
108         if (!handlerAdded) {
109             throw new IllegalStateException("handler not added to pipeline yet");
110         }
111     }
112 
113     /**
114      * Removes the {@link ChannelInboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
115      */
116     public final void removeInboundHandler() {
117         checkAdded();
118         inboundCtx.remove();
119     }
120 
121     /**
122      * Removes the {@link ChannelOutboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
123      */
124     public final void removeOutboundHandler() {
125         checkAdded();
126         outboundCtx.remove();
127     }
128 
129     @Override
130     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
131         if (inboundHandler == null) {
132             throw new IllegalStateException(
133                     "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
134                             " if " +  CombinedChannelDuplexHandler.class.getSimpleName() +
135                             " was constructed with the default constructor.");
136         }
137 
138         outboundCtx = new DelegatingChannelHandlerContext(ctx, outboundHandler);
139         inboundCtx = new DelegatingChannelHandlerContext(ctx, inboundHandler) {
140             @SuppressWarnings("deprecation")
141             @Override
142             public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
143                 if (!outboundCtx.removed) {
144                     try {
145                         // We directly delegate to the ChannelOutboundHandler as this may override exceptionCaught(...)
146                         // as well
147                         outboundHandler.exceptionCaught(outboundCtx, cause);
148                     } catch (Throwable error) {
149                         if (logger.isDebugEnabled()) {
150                             logger.debug(
151                                     "An exception {}" +
152                                     "was thrown by a user handler's exceptionCaught() " +
153                                     "method while handling the following exception:",
154                                     ThrowableUtil.stackTraceToString(error), cause);
155                         } else if (logger.isWarnEnabled()) {
156                             logger.warn(
157                                     "An exception '{}' [enable DEBUG level for full stacktrace] " +
158                                     "was thrown by a user handler's exceptionCaught() " +
159                                     "method while handling the following exception:", error, cause);
160                         }
161                     }
162                 } else {
163                     super.fireExceptionCaught(cause);
164                 }
165                 return this;
166             }
167         };
168 
169         // The inboundCtx and outboundCtx were created and set now it's safe to call removeInboundHandler() and
170         // removeOutboundHandler().
171         handlerAdded = true;
172 
173         try {
174             inboundHandler.handlerAdded(inboundCtx);
175         } finally {
176             outboundHandler.handlerAdded(outboundCtx);
177         }
178     }
179 
180     @Override
181     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
182         try {
183             inboundCtx.remove();
184         } finally {
185             outboundCtx.remove();
186         }
187     }
188 
189     @Override
190     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
191         assert ctx == inboundCtx.ctx;
192         if (!inboundCtx.removed) {
193             inboundHandler.channelRegistered(inboundCtx);
194         } else {
195             inboundCtx.fireChannelRegistered();
196         }
197     }
198 
199     @Override
200     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
201         assert ctx == inboundCtx.ctx;
202         if (!inboundCtx.removed) {
203             inboundHandler.channelUnregistered(inboundCtx);
204         } else {
205             inboundCtx.fireChannelUnregistered();
206         }
207     }
208 
209     @Override
210     public void channelActive(ChannelHandlerContext ctx) throws Exception {
211         assert ctx == inboundCtx.ctx;
212         if (!inboundCtx.removed) {
213             inboundHandler.channelActive(inboundCtx);
214         } else {
215             inboundCtx.fireChannelActive();
216         }
217     }
218 
219     @Override
220     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
221         assert ctx == inboundCtx.ctx;
222         if (!inboundCtx.removed) {
223             inboundHandler.channelInactive(inboundCtx);
224         } else {
225             inboundCtx.fireChannelInactive();
226         }
227     }
228 
229     @Override
230     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
231         assert ctx == inboundCtx.ctx;
232         if (!inboundCtx.removed) {
233             inboundHandler.exceptionCaught(inboundCtx, cause);
234         } else {
235             inboundCtx.fireExceptionCaught(cause);
236         }
237     }
238 
239     @Override
240     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
241         assert ctx == inboundCtx.ctx;
242         if (!inboundCtx.removed) {
243             inboundHandler.userEventTriggered(inboundCtx, evt);
244         } else {
245             inboundCtx.fireUserEventTriggered(evt);
246         }
247     }
248 
249     @Override
250     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
251         assert ctx == inboundCtx.ctx;
252         if (!inboundCtx.removed) {
253             inboundHandler.channelRead(inboundCtx, msg);
254         } else {
255             inboundCtx.fireChannelRead(msg);
256         }
257     }
258 
259     @Override
260     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
261         assert ctx == inboundCtx.ctx;
262         if (!inboundCtx.removed) {
263             inboundHandler.channelReadComplete(inboundCtx);
264         } else {
265             inboundCtx.fireChannelReadComplete();
266         }
267     }
268 
269     @Override
270     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
271         assert ctx == inboundCtx.ctx;
272         if (!inboundCtx.removed) {
273             inboundHandler.channelWritabilityChanged(inboundCtx);
274         } else {
275             inboundCtx.fireChannelWritabilityChanged();
276         }
277     }
278 
279     @Override
280     public void bind(
281             ChannelHandlerContext ctx,
282             SocketAddress localAddress, ChannelPromise promise) throws Exception {
283         assert ctx == outboundCtx.ctx;
284         if (!outboundCtx.removed) {
285             outboundHandler.bind(outboundCtx, localAddress, promise);
286         } else {
287             outboundCtx.bind(localAddress, promise);
288         }
289     }
290 
291     @Override
292     public void connect(
293             ChannelHandlerContext ctx,
294             SocketAddress remoteAddress, SocketAddress localAddress,
295             ChannelPromise promise) throws Exception {
296         assert ctx == outboundCtx.ctx;
297         if (!outboundCtx.removed) {
298             outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise);
299         } else {
300             outboundCtx.connect(localAddress, promise);
301         }
302     }
303 
304     @Override
305     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
306         assert ctx == outboundCtx.ctx;
307         if (!outboundCtx.removed) {
308             outboundHandler.disconnect(outboundCtx, promise);
309         } else {
310             outboundCtx.disconnect(promise);
311         }
312     }
313 
314     @Override
315     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
316         assert ctx == outboundCtx.ctx;
317         if (!outboundCtx.removed) {
318             outboundHandler.close(outboundCtx, promise);
319         } else {
320             outboundCtx.close(promise);
321         }
322     }
323 
324     @Override
325     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
326         assert ctx == outboundCtx.ctx;
327         if (!outboundCtx.removed) {
328             outboundHandler.deregister(outboundCtx, promise);
329         } else {
330             outboundCtx.deregister(promise);
331         }
332     }
333 
334     @Override
335     public void read(ChannelHandlerContext ctx) throws Exception {
336         assert ctx == outboundCtx.ctx;
337         if (!outboundCtx.removed) {
338             outboundHandler.read(outboundCtx);
339         } else {
340             outboundCtx.read();
341         }
342     }
343 
344     @Override
345     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
346         assert ctx == outboundCtx.ctx;
347         if (!outboundCtx.removed) {
348             outboundHandler.write(outboundCtx, msg, promise);
349         } else {
350             outboundCtx.write(msg, promise);
351         }
352     }
353 
354     @Override
355     public void flush(ChannelHandlerContext ctx) throws Exception {
356         assert ctx == outboundCtx.ctx;
357         if (!outboundCtx.removed) {
358             outboundHandler.flush(outboundCtx);
359         } else {
360             outboundCtx.flush();
361         }
362     }
363 
364     private static class DelegatingChannelHandlerContext implements ChannelHandlerContext {
365 
366         private final ChannelHandlerContext ctx;
367         private final ChannelHandler handler;
368         boolean removed;
369 
370         DelegatingChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) {
371             this.ctx = ctx;
372             this.handler = handler;
373         }
374 
375         @Override
376         public Channel channel() {
377             return ctx.channel();
378         }
379 
380         @Override
381         public EventExecutor executor() {
382             return ctx.executor();
383         }
384 
385         @Override
386         public String name() {
387             return ctx.name();
388         }
389 
390         @Override
391         public ChannelHandler handler() {
392             return ctx.handler();
393         }
394 
395         @Override
396         public boolean isRemoved() {
397             return removed || ctx.isRemoved();
398         }
399 
400         @Override
401         public ChannelHandlerContext fireChannelRegistered() {
402             ctx.fireChannelRegistered();
403             return this;
404         }
405 
406         @Override
407         public ChannelHandlerContext fireChannelUnregistered() {
408             ctx.fireChannelUnregistered();
409             return this;
410         }
411 
412         @Override
413         public ChannelHandlerContext fireChannelActive() {
414             ctx.fireChannelActive();
415             return this;
416         }
417 
418         @Override
419         public ChannelHandlerContext fireChannelInactive() {
420             ctx.fireChannelInactive();
421             return this;
422         }
423 
424         @Override
425         public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
426             ctx.fireExceptionCaught(cause);
427             return this;
428         }
429 
430         @Override
431         public ChannelHandlerContext fireUserEventTriggered(Object event) {
432             ctx.fireUserEventTriggered(event);
433             return this;
434         }
435 
436         @Override
437         public ChannelHandlerContext fireChannelRead(Object msg) {
438             ctx.fireChannelRead(msg);
439             return this;
440         }
441 
442         @Override
443         public ChannelHandlerContext fireChannelReadComplete() {
444             ctx.fireChannelReadComplete();
445             return this;
446         }
447 
448         @Override
449         public ChannelHandlerContext fireChannelWritabilityChanged() {
450             ctx.fireChannelWritabilityChanged();
451             return this;
452         }
453 
454         @Override
455         public ChannelFuture bind(SocketAddress localAddress) {
456             return ctx.bind(localAddress);
457         }
458 
459         @Override
460         public ChannelFuture connect(SocketAddress remoteAddress) {
461             return ctx.connect(remoteAddress);
462         }
463 
464         @Override
465         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
466             return ctx.connect(remoteAddress, localAddress);
467         }
468 
469         @Override
470         public ChannelFuture disconnect() {
471             return ctx.disconnect();
472         }
473 
474         @Override
475         public ChannelFuture close() {
476             return ctx.close();
477         }
478 
479         @Override
480         public ChannelFuture deregister() {
481             return ctx.deregister();
482         }
483 
484         @Override
485         public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
486             return ctx.bind(localAddress, promise);
487         }
488 
489         @Override
490         public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
491             return ctx.connect(remoteAddress, promise);
492         }
493 
494         @Override
495         public ChannelFuture connect(
496                 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
497             return ctx.connect(remoteAddress, localAddress, promise);
498         }
499 
500         @Override
501         public ChannelFuture disconnect(ChannelPromise promise) {
502             return ctx.disconnect(promise);
503         }
504 
505         @Override
506         public ChannelFuture close(ChannelPromise promise) {
507             return ctx.close(promise);
508         }
509 
510         @Override
511         public ChannelFuture deregister(ChannelPromise promise) {
512             return ctx.deregister(promise);
513         }
514 
515         @Override
516         public ChannelHandlerContext read() {
517             ctx.read();
518             return this;
519         }
520 
521         @Override
522         public ChannelFuture write(Object msg) {
523             return ctx.write(msg);
524         }
525 
526         @Override
527         public ChannelFuture write(Object msg, ChannelPromise promise) {
528             return ctx.write(msg, promise);
529         }
530 
531         @Override
532         public ChannelHandlerContext flush() {
533             ctx.flush();
534             return this;
535         }
536 
537         @Override
538         public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
539             return ctx.writeAndFlush(msg, promise);
540         }
541 
542         @Override
543         public ChannelFuture writeAndFlush(Object msg) {
544             return ctx.writeAndFlush(msg);
545         }
546 
547         @Override
548         public ChannelPipeline pipeline() {
549             return ctx.pipeline();
550         }
551 
552         @Override
553         public ByteBufAllocator alloc() {
554             return ctx.alloc();
555         }
556 
557         @Override
558         public ChannelPromise newPromise() {
559             return ctx.newPromise();
560         }
561 
562         @Override
563         public ChannelProgressivePromise newProgressivePromise() {
564             return ctx.newProgressivePromise();
565         }
566 
567         @Override
568         public ChannelFuture newSucceededFuture() {
569             return ctx.newSucceededFuture();
570         }
571 
572         @Override
573         public ChannelFuture newFailedFuture(Throwable cause) {
574             return ctx.newFailedFuture(cause);
575         }
576 
577         @Override
578         public ChannelPromise voidPromise() {
579             return ctx.voidPromise();
580         }
581 
582         @Override
583         public <T> Attribute<T> attr(AttributeKey<T> key) {
584             return ctx.attr(key);
585         }
586 
587         final void remove() {
588             EventExecutor executor = executor();
589             if (executor.inEventLoop()) {
590                 remove0();
591             } else {
592                 executor.execute(new Runnable() {
593                     @Override
594                     public void run() {
595                         remove0();
596                     }
597                 });
598             }
599         }
600 
601         private void remove0() {
602             if (!removed) {
603                 removed = true;
604                 try {
605                     handler.handlerRemoved(this);
606                 } catch (Throwable cause) {
607                     fireExceptionCaught(new ChannelPipelineException(
608                             handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause));
609                 }
610             }
611         }
612     }
613 }