View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.internal.ObjectUtil;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  
26  import java.net.SocketAddress;
27  
28  /**
29   *  Combines a {@link ChannelInboundHandler} and a {@link ChannelOutboundHandler} into one {@link ChannelHandler}.
30   */
31  public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
32          extends ChannelDuplexHandler {
33  
34      private static final InternalLogger logger = InternalLoggerFactory.getInstance(CombinedChannelDuplexHandler.class);
35  
36      private DelegatingChannelHandlerContext inboundCtx;
37      private DelegatingChannelHandlerContext outboundCtx;
38      private volatile boolean handlerAdded;
39  
40      private I inboundHandler;
41      private O outboundHandler;
42  
43      /**
44       * Creates a new uninitialized instance. A class that extends this handler must invoke
45       * {@link #init(ChannelInboundHandler, ChannelOutboundHandler)} before adding this handler into a
46       * {@link ChannelPipeline}.
47       */
48      protected CombinedChannelDuplexHandler() {
49          ensureNotSharable();
50      }
51  
52      /**
53       * Creates a new instance that combines the specified two handlers into one.
54       */
55      public CombinedChannelDuplexHandler(I inboundHandler, O outboundHandler) {
56          ensureNotSharable();
57          init(inboundHandler, outboundHandler);
58      }
59  
60      /**
61       * Initialized this handler with the specified handlers.
62       *
63       * @throws IllegalStateException if this handler was not constructed via the default constructor or
64       *                               if this handler does not implement all required handler interfaces
65       * @throws IllegalArgumentException if the specified handlers cannot be combined into one due to a conflict
66       *                                  in the type hierarchy
67       */
68      protected final void init(I inboundHandler, O outboundHandler) {
69          validate(inboundHandler, outboundHandler);
70          this.inboundHandler = inboundHandler;
71          this.outboundHandler = outboundHandler;
72      }
73  
74      private void validate(I inboundHandler, O outboundHandler) {
75          if (this.inboundHandler != null) {
76              throw new IllegalStateException(
77                      "init() can not be invoked if " + CombinedChannelDuplexHandler.class.getSimpleName() +
78                              " was constructed with non-default constructor.");
79          }
80  
81          ObjectUtil.checkNotNull(inboundHandler, "inboundHandler");
82          ObjectUtil.checkNotNull(outboundHandler, "outboundHandler");
83  
84          if (inboundHandler instanceof ChannelOutboundHandler) {
85              throw new IllegalArgumentException(
86                      "inboundHandler must not implement " +
87                      ChannelOutboundHandler.class.getSimpleName() + " to get combined.");
88          }
89          if (outboundHandler instanceof ChannelInboundHandler) {
90              throw new IllegalArgumentException(
91                      "outboundHandler must not implement " +
92                      ChannelInboundHandler.class.getSimpleName() + " to get combined.");
93          }
94      }
95  
96      protected final I inboundHandler() {
97          return inboundHandler;
98      }
99  
100     protected final O outboundHandler() {
101         return outboundHandler;
102     }
103 
104     private void checkAdded() {
105         if (!handlerAdded) {
106             throw new IllegalStateException("handler not added to pipeline yet");
107         }
108     }
109 
110     /**
111      * Removes the {@link ChannelInboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
112      */
113     public final void removeInboundHandler() {
114         checkAdded();
115         inboundCtx.remove();
116     }
117 
118     /**
119      * Removes the {@link ChannelOutboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
120      */
121     public final void removeOutboundHandler() {
122         checkAdded();
123         outboundCtx.remove();
124     }
125 
126     @Override
127     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
128         if (inboundHandler == null) {
129             throw new IllegalStateException(
130                     "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
131                             " if " +  CombinedChannelDuplexHandler.class.getSimpleName() +
132                             " was constructed with the default constructor.");
133         }
134 
135         outboundCtx = new DelegatingChannelHandlerContext(ctx, outboundHandler);
136         inboundCtx = new DelegatingChannelHandlerContext(ctx, inboundHandler) {
137             @SuppressWarnings("deprecation")
138             @Override
139             public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
140                 if (!outboundCtx.removed) {
141                     try {
142                         // We directly delegate to the ChannelOutboundHandler as this may override exceptionCaught(...)
143                         // as well
144                         outboundHandler.exceptionCaught(outboundCtx, cause);
145                     } catch (Throwable error) {
146                         if (logger.isDebugEnabled()) {
147                             logger.debug(
148                                     "An exception " +
149                                     "was thrown by a user handler's exceptionCaught() " +
150                                     "method while handling the following exception:", cause);
151                         } else if (logger.isWarnEnabled()) {
152                             logger.warn(
153                                     "An exception '{}' [enable DEBUG level for full stacktrace] " +
154                                     "was thrown by a user handler's exceptionCaught() " +
155                                     "method while handling the following exception:", error, cause);
156                         }
157                     }
158                 } else {
159                     super.fireExceptionCaught(cause);
160                 }
161                 return this;
162             }
163         };
164 
165         // The inboundCtx and outboundCtx were created and set now it's safe to call removeInboundHandler() and
166         // removeOutboundHandler().
167         handlerAdded = true;
168 
169         try {
170             inboundHandler.handlerAdded(inboundCtx);
171         } finally {
172             outboundHandler.handlerAdded(outboundCtx);
173         }
174     }
175 
176     @Override
177     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
178         try {
179             inboundCtx.remove();
180         } finally {
181             outboundCtx.remove();
182         }
183     }
184 
185     @Override
186     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
187         assert ctx == inboundCtx.ctx;
188         if (!inboundCtx.removed) {
189             inboundHandler.channelRegistered(inboundCtx);
190         } else {
191             inboundCtx.fireChannelRegistered();
192         }
193     }
194 
195     @Override
196     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
197         assert ctx == inboundCtx.ctx;
198         if (!inboundCtx.removed) {
199             inboundHandler.channelUnregistered(inboundCtx);
200         } else {
201             inboundCtx.fireChannelUnregistered();
202         }
203     }
204 
205     @Override
206     public void channelActive(ChannelHandlerContext ctx) throws Exception {
207         assert ctx == inboundCtx.ctx;
208         if (!inboundCtx.removed) {
209             inboundHandler.channelActive(inboundCtx);
210         } else {
211             inboundCtx.fireChannelActive();
212         }
213     }
214 
215     @Override
216     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
217         assert ctx == inboundCtx.ctx;
218         if (!inboundCtx.removed) {
219             inboundHandler.channelInactive(inboundCtx);
220         } else {
221             inboundCtx.fireChannelInactive();
222         }
223     }
224 
225     @Override
226     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
227         assert ctx == inboundCtx.ctx;
228         if (!inboundCtx.removed) {
229             inboundHandler.exceptionCaught(inboundCtx, cause);
230         } else {
231             inboundCtx.fireExceptionCaught(cause);
232         }
233     }
234 
235     @Override
236     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
237         assert ctx == inboundCtx.ctx;
238         if (!inboundCtx.removed) {
239             inboundHandler.userEventTriggered(inboundCtx, evt);
240         } else {
241             inboundCtx.fireUserEventTriggered(evt);
242         }
243     }
244 
245     @Override
246     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
247         assert ctx == inboundCtx.ctx;
248         if (!inboundCtx.removed) {
249             inboundHandler.channelRead(inboundCtx, msg);
250         } else {
251             inboundCtx.fireChannelRead(msg);
252         }
253     }
254 
255     @Override
256     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
257         assert ctx == inboundCtx.ctx;
258         if (!inboundCtx.removed) {
259             inboundHandler.channelReadComplete(inboundCtx);
260         } else {
261             inboundCtx.fireChannelReadComplete();
262         }
263     }
264 
265     @Override
266     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
267         assert ctx == inboundCtx.ctx;
268         if (!inboundCtx.removed) {
269             inboundHandler.channelWritabilityChanged(inboundCtx);
270         } else {
271             inboundCtx.fireChannelWritabilityChanged();
272         }
273     }
274 
275     @Override
276     public void bind(
277             ChannelHandlerContext ctx,
278             SocketAddress localAddress, ChannelPromise promise) throws Exception {
279         assert ctx == outboundCtx.ctx;
280         if (!outboundCtx.removed) {
281             outboundHandler.bind(outboundCtx, localAddress, promise);
282         } else {
283             outboundCtx.bind(localAddress, promise);
284         }
285     }
286 
287     @Override
288     public void connect(
289             ChannelHandlerContext ctx,
290             SocketAddress remoteAddress, SocketAddress localAddress,
291             ChannelPromise promise) throws Exception {
292         assert ctx == outboundCtx.ctx;
293         if (!outboundCtx.removed) {
294             outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise);
295         } else {
296             outboundCtx.connect(remoteAddress, localAddress, promise);
297         }
298     }
299 
300     @Override
301     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
302         assert ctx == outboundCtx.ctx;
303         if (!outboundCtx.removed) {
304             outboundHandler.disconnect(outboundCtx, promise);
305         } else {
306             outboundCtx.disconnect(promise);
307         }
308     }
309 
310     @Override
311     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
312         assert ctx == outboundCtx.ctx;
313         if (!outboundCtx.removed) {
314             outboundHandler.close(outboundCtx, promise);
315         } else {
316             outboundCtx.close(promise);
317         }
318     }
319 
320     @Override
321     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
322         assert ctx == outboundCtx.ctx;
323         if (!outboundCtx.removed) {
324             outboundHandler.deregister(outboundCtx, promise);
325         } else {
326             outboundCtx.deregister(promise);
327         }
328     }
329 
330     @Override
331     public void read(ChannelHandlerContext ctx) throws Exception {
332         assert ctx == outboundCtx.ctx;
333         if (!outboundCtx.removed) {
334             outboundHandler.read(outboundCtx);
335         } else {
336             outboundCtx.read();
337         }
338     }
339 
340     @Override
341     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
342         assert ctx == outboundCtx.ctx;
343         if (!outboundCtx.removed) {
344             outboundHandler.write(outboundCtx, msg, promise);
345         } else {
346             outboundCtx.write(msg, promise);
347         }
348     }
349 
350     @Override
351     public void flush(ChannelHandlerContext ctx) throws Exception {
352         assert ctx == outboundCtx.ctx;
353         if (!outboundCtx.removed) {
354             outboundHandler.flush(outboundCtx);
355         } else {
356             outboundCtx.flush();
357         }
358     }
359 
360     private static class DelegatingChannelHandlerContext implements ChannelHandlerContext {
361 
362         private final ChannelHandlerContext ctx;
363         private final ChannelHandler handler;
364         boolean removed;
365 
366         DelegatingChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) {
367             this.ctx = ctx;
368             this.handler = handler;
369         }
370 
371         @Override
372         public Channel channel() {
373             return ctx.channel();
374         }
375 
376         @Override
377         public EventExecutor executor() {
378             return ctx.executor();
379         }
380 
381         @Override
382         public String name() {
383             return ctx.name();
384         }
385 
386         @Override
387         public ChannelHandler handler() {
388             return ctx.handler();
389         }
390 
391         @Override
392         public boolean isRemoved() {
393             return removed || ctx.isRemoved();
394         }
395 
396         @Override
397         public ChannelHandlerContext fireChannelRegistered() {
398             ctx.fireChannelRegistered();
399             return this;
400         }
401 
402         @Override
403         public ChannelHandlerContext fireChannelUnregistered() {
404             ctx.fireChannelUnregistered();
405             return this;
406         }
407 
408         @Override
409         public ChannelHandlerContext fireChannelActive() {
410             ctx.fireChannelActive();
411             return this;
412         }
413 
414         @Override
415         public ChannelHandlerContext fireChannelInactive() {
416             ctx.fireChannelInactive();
417             return this;
418         }
419 
420         @Override
421         public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
422             ctx.fireExceptionCaught(cause);
423             return this;
424         }
425 
426         @Override
427         public ChannelHandlerContext fireUserEventTriggered(Object event) {
428             ctx.fireUserEventTriggered(event);
429             return this;
430         }
431 
432         @Override
433         public ChannelHandlerContext fireChannelRead(Object msg) {
434             ctx.fireChannelRead(msg);
435             return this;
436         }
437 
438         @Override
439         public ChannelHandlerContext fireChannelReadComplete() {
440             ctx.fireChannelReadComplete();
441             return this;
442         }
443 
444         @Override
445         public ChannelHandlerContext fireChannelWritabilityChanged() {
446             ctx.fireChannelWritabilityChanged();
447             return this;
448         }
449 
450         @Override
451         public ChannelFuture bind(SocketAddress localAddress) {
452             return ctx.bind(localAddress);
453         }
454 
455         @Override
456         public ChannelFuture connect(SocketAddress remoteAddress) {
457             return ctx.connect(remoteAddress);
458         }
459 
460         @Override
461         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
462             return ctx.connect(remoteAddress, localAddress);
463         }
464 
465         @Override
466         public ChannelFuture disconnect() {
467             return ctx.disconnect();
468         }
469 
470         @Override
471         public ChannelFuture close() {
472             return ctx.close();
473         }
474 
475         @Override
476         public ChannelFuture deregister() {
477             return ctx.deregister();
478         }
479 
480         @Override
481         public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
482             return ctx.bind(localAddress, promise);
483         }
484 
485         @Override
486         public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
487             return ctx.connect(remoteAddress, promise);
488         }
489 
490         @Override
491         public ChannelFuture connect(
492                 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
493             return ctx.connect(remoteAddress, localAddress, promise);
494         }
495 
496         @Override
497         public ChannelFuture disconnect(ChannelPromise promise) {
498             return ctx.disconnect(promise);
499         }
500 
501         @Override
502         public ChannelFuture close(ChannelPromise promise) {
503             return ctx.close(promise);
504         }
505 
506         @Override
507         public ChannelFuture deregister(ChannelPromise promise) {
508             return ctx.deregister(promise);
509         }
510 
511         @Override
512         public ChannelHandlerContext read() {
513             ctx.read();
514             return this;
515         }
516 
517         @Override
518         public ChannelFuture write(Object msg) {
519             return ctx.write(msg);
520         }
521 
522         @Override
523         public ChannelFuture write(Object msg, ChannelPromise promise) {
524             return ctx.write(msg, promise);
525         }
526 
527         @Override
528         public ChannelHandlerContext flush() {
529             ctx.flush();
530             return this;
531         }
532 
533         @Override
534         public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
535             return ctx.writeAndFlush(msg, promise);
536         }
537 
538         @Override
539         public ChannelFuture writeAndFlush(Object msg) {
540             return ctx.writeAndFlush(msg);
541         }
542 
543         @Override
544         public ChannelPipeline pipeline() {
545             return ctx.pipeline();
546         }
547 
548         @Override
549         public ByteBufAllocator alloc() {
550             return ctx.alloc();
551         }
552 
553         @Override
554         public ChannelPromise newPromise() {
555             return ctx.newPromise();
556         }
557 
558         @Override
559         public ChannelProgressivePromise newProgressivePromise() {
560             return ctx.newProgressivePromise();
561         }
562 
563         @Override
564         public ChannelFuture newSucceededFuture() {
565             return ctx.newSucceededFuture();
566         }
567 
568         @Override
569         public ChannelFuture newFailedFuture(Throwable cause) {
570             return ctx.newFailedFuture(cause);
571         }
572 
573         @Override
574         public ChannelPromise voidPromise() {
575             return ctx.voidPromise();
576         }
577 
578         @Override
579         public <T> Attribute<T> attr(AttributeKey<T> key) {
580             return ctx.channel().attr(key);
581         }
582 
583         @Override
584         public <T> boolean hasAttr(AttributeKey<T> key) {
585             return ctx.channel().hasAttr(key);
586         }
587 
588         final void remove() {
589             EventExecutor executor = executor();
590             if (executor.inEventLoop()) {
591                 remove0();
592             } else {
593                 executor.execute(new Runnable() {
594                     @Override
595                     public void run() {
596                         remove0();
597                     }
598                 });
599             }
600         }
601 
602         private void remove0() {
603             if (!removed) {
604                 removed = true;
605                 try {
606                     handler.handlerRemoved(this);
607                 } catch (Throwable cause) {
608                     fireExceptionCaught(new ChannelPipelineException(
609                             handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause));
610                 }
611             }
612         }
613     }
614 }