View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel;
17  
18  import io.netty5.channel.internal.DelegatingChannelHandlerContext;
19  import io.netty5.util.concurrent.EventExecutor;
20  import io.netty5.util.concurrent.Future;
21  
22  import java.net.SocketAddress;
23  
24  import static java.util.Objects.requireNonNull;
25  
26  /**
27   *  Combines the inbound handling of one {@link ChannelHandler} with the outbound handling of
28   *  another {@link ChannelHandler}.
29   */
30  public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends ChannelHandler>
31          extends ChannelHandlerAdapter {
32  
33      private CombinedChannelHandlerContext inboundCtx;
34      private CombinedChannelHandlerContext outboundCtx;
35      private volatile boolean handlerAdded;
36  
37      private I inboundHandler;
38      private O outboundHandler;
39  
40      /**
41       * Creates a new uninitialized instance. A class that extends this handler must invoke
42       * {@link #init(ChannelHandler, ChannelHandler)} before adding this handler into a
43       * {@link ChannelPipeline}.
44       */
45      protected CombinedChannelDuplexHandler() {
46      }
47  
48      /**
49       * Creates a new instance that combines the specified two handlers into one.
50       */
51      public CombinedChannelDuplexHandler(I inboundHandler, O outboundHandler) {
52          init(inboundHandler, outboundHandler);
53      }
54  
55      @Override
56      public final boolean isSharable() {
57          // Can't be sharable as we keep state.
58          return false;
59      }
60  
61      /**
62       * Initialized this handler with the specified handlers.
63       *
64       * @throws IllegalStateException if this handler was not constructed via the default constructor or
65       *                               if this handler does not implement all required handler interfaces
66       * @throws IllegalArgumentException if the specified handlers cannot be combined into one due to a conflict
67       *                                  in the type hierarchy
68       */
69      protected final void init(I inboundHandler, O outboundHandler) {
70          validate(inboundHandler, outboundHandler);
71          this.inboundHandler = inboundHandler;
72          this.outboundHandler = outboundHandler;
73      }
74  
75      private void validate(I inboundHandler, O outboundHandler) {
76          if (this.inboundHandler != null) {
77              throw new IllegalStateException(
78                      "init() can not be invoked if " + CombinedChannelDuplexHandler.class.getSimpleName() +
79                              " was constructed with non-default constructor.");
80          }
81  
82          requireNonNull(inboundHandler, "inboundHandler");
83          requireNonNull(outboundHandler, "outboundHandler");
84          if (ChannelHandlerMask.isOutbound(inboundHandler.getClass())) {
85              throw new IllegalArgumentException(
86                      "inboundHandler must not implement any outbound method to get combined.");
87          }
88          if (ChannelHandlerMask.isInbound(outboundHandler.getClass())) {
89              throw new IllegalArgumentException(
90                      "outboundHandler must not implement any inbound method to get combined.");
91          }
92      }
93  
94      protected final I inboundHandler() {
95          return inboundHandler;
96      }
97  
98      protected final O outboundHandler() {
99          return outboundHandler;
100     }
101 
102     private void checkAdded() {
103         if (!handlerAdded) {
104             throw new IllegalStateException("handler not added to pipeline yet");
105         }
106     }
107 
108     /**
109      * Removes the inbound {@link ChannelHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
110      */
111     public final void removeInboundHandler() {
112         checkAdded();
113         inboundCtx.remove();
114     }
115 
116     /**
117      * Removes the outbound {@link ChannelHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
118      */
119     public final void removeOutboundHandler() {
120         checkAdded();
121         outboundCtx.remove();
122     }
123 
124     @Override
125     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
126         if (inboundHandler == null) {
127             throw new IllegalStateException(
128                     "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
129                             " if " +  CombinedChannelDuplexHandler.class.getSimpleName() +
130                             " was constructed with the default constructor.");
131         }
132 
133         outboundCtx = new CombinedChannelHandlerContext(ctx, outboundHandler);
134         inboundCtx = new CombinedChannelHandlerContext(ctx, inboundHandler);
135 
136         // The inboundCtx and outboundCtx were created and set now it's safe to call removeInboundHandler() and
137         // removeOutboundHandler().
138         handlerAdded = true;
139 
140         try {
141             inboundHandler.handlerAdded(inboundCtx);
142         } finally {
143             outboundHandler.handlerAdded(outboundCtx);
144         }
145     }
146 
147     @Override
148     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
149         try {
150             inboundCtx.remove();
151         } finally {
152             outboundCtx.remove();
153         }
154     }
155 
156     @Override
157     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
158         assert ctx == inboundCtx.delegatingCtx();
159         if (!inboundCtx.removed) {
160             inboundHandler.channelRegistered(inboundCtx);
161         } else {
162             inboundCtx.fireChannelRegistered();
163         }
164     }
165 
166     @Override
167     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
168         assert ctx == inboundCtx.delegatingCtx();
169         if (!inboundCtx.removed) {
170             inboundHandler.channelUnregistered(inboundCtx);
171         } else {
172             inboundCtx.fireChannelUnregistered();
173         }
174     }
175 
176     @Override
177     public void channelActive(ChannelHandlerContext ctx) throws Exception {
178         assert ctx == inboundCtx.delegatingCtx();
179         if (!inboundCtx.removed) {
180             inboundHandler.channelActive(inboundCtx);
181         } else {
182             inboundCtx.fireChannelActive();
183         }
184     }
185 
186     @Override
187     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
188         assert ctx == inboundCtx.delegatingCtx();
189         if (!inboundCtx.removed) {
190             inboundHandler.channelInactive(inboundCtx);
191         } else {
192             inboundCtx.fireChannelInactive();
193         }
194     }
195 
196     @Override
197     public void channelShutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) throws Exception {
198         assert ctx == inboundCtx.delegatingCtx();
199         if (!inboundCtx.removed) {
200             inboundHandler.channelShutdown(inboundCtx, direction);
201         } else {
202             inboundCtx.fireChannelInactive();
203         }
204     }
205 
206     @Override
207     public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
208         assert ctx == inboundCtx.delegatingCtx();
209         if (!inboundCtx.removed) {
210             inboundHandler.channelExceptionCaught(inboundCtx, cause);
211         } else {
212             inboundCtx.fireChannelExceptionCaught(cause);
213         }
214     }
215 
216     @Override
217     public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) throws Exception {
218             assert ctx == inboundCtx.delegatingCtx();
219         if (!inboundCtx.removed) {
220             inboundHandler.channelInboundEvent(inboundCtx, evt);
221         } else {
222             inboundCtx.fireChannelInboundEvent(evt);
223         }
224     }
225 
226     @Override
227     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
228         assert ctx == inboundCtx.delegatingCtx();
229         if (!inboundCtx.removed) {
230             inboundHandler.channelRead(inboundCtx, msg);
231         } else {
232             inboundCtx.fireChannelRead(msg);
233         }
234     }
235 
236     @Override
237     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
238         assert ctx == inboundCtx.delegatingCtx();
239         if (!inboundCtx.removed) {
240             inboundHandler.channelReadComplete(inboundCtx);
241         } else {
242             inboundCtx.fireChannelReadComplete();
243         }
244     }
245 
246     @Override
247     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
248         assert ctx == inboundCtx.delegatingCtx();
249         if (!inboundCtx.removed) {
250             inboundHandler.channelWritabilityChanged(inboundCtx);
251         } else {
252             inboundCtx.fireChannelWritabilityChanged();
253         }
254     }
255 
256     @Override
257     public Future<Void> bind(
258             ChannelHandlerContext ctx,
259             SocketAddress localAddress) {
260         assert ctx == outboundCtx.delegatingCtx();
261         if (!outboundCtx.removed) {
262             return outboundHandler.bind(outboundCtx, localAddress);
263         } else {
264             return outboundCtx.bind(localAddress);
265         }
266     }
267 
268     @Override
269     public Future<Void> connect(
270             ChannelHandlerContext ctx,
271             SocketAddress remoteAddress, SocketAddress localAddress) {
272         assert ctx == outboundCtx.delegatingCtx();
273         if (!outboundCtx.removed) {
274             return outboundHandler.connect(outboundCtx, remoteAddress, localAddress);
275         } else {
276             return outboundCtx.connect(remoteAddress, localAddress);
277         }
278     }
279 
280     @Override
281     public Future<Void> disconnect(ChannelHandlerContext ctx) {
282         assert ctx == outboundCtx.delegatingCtx();
283         if (!outboundCtx.removed) {
284             return outboundHandler.disconnect(outboundCtx);
285         } else {
286             return outboundCtx.disconnect();
287         }
288     }
289 
290     @Override
291     public Future<Void> close(ChannelHandlerContext ctx) {
292         assert ctx == outboundCtx.delegatingCtx();
293         if (!outboundCtx.removed) {
294             return outboundHandler.close(outboundCtx);
295         } else {
296             return outboundCtx.close();
297         }
298     }
299 
300     @Override
301     public Future<Void> shutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) {
302         assert ctx == outboundCtx.delegatingCtx();
303         if (!outboundCtx.removed) {
304             return outboundHandler.shutdown(outboundCtx, direction);
305         } else {
306             return outboundCtx.shutdown(direction);
307         }
308     }
309 
310     @Override
311     public Future<Void> register(ChannelHandlerContext ctx) {
312         assert ctx == outboundCtx.delegatingCtx();
313         if (!outboundCtx.removed) {
314             return outboundHandler.register(outboundCtx);
315         } else {
316             return outboundCtx.register();
317         }
318     }
319 
320     @Override
321     public Future<Void> deregister(ChannelHandlerContext ctx) {
322         assert ctx == outboundCtx.delegatingCtx();
323         if (!outboundCtx.removed) {
324             return outboundHandler.deregister(outboundCtx);
325         } else {
326             return outboundCtx.deregister();
327         }
328     }
329 
330     @Override
331     public void read(ChannelHandlerContext ctx) {
332         assert ctx == outboundCtx.delegatingCtx();
333         if (!outboundCtx.removed) {
334             outboundHandler.read(outboundCtx);
335         } else {
336             outboundCtx.read();
337         }
338     }
339 
340     @Override
341     public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
342         assert ctx == outboundCtx.delegatingCtx();
343         if (!outboundCtx.removed) {
344             return outboundHandler.write(outboundCtx, msg);
345         } else {
346             return outboundCtx.write(msg);
347         }
348     }
349 
350     @Override
351     public void flush(ChannelHandlerContext ctx) {
352         assert ctx == outboundCtx.delegatingCtx();
353         if (!outboundCtx.removed) {
354             outboundHandler.flush(outboundCtx);
355         } else {
356             outboundCtx.flush();
357         }
358     }
359 
360     @Override
361     public Future<Void> sendOutboundEvent(ChannelHandlerContext ctx, Object event) {
362         assert ctx == outboundCtx.delegatingCtx();
363         if (!outboundCtx.removed) {
364             return outboundHandler.sendOutboundEvent(outboundCtx, event);
365         } else {
366             return outboundCtx.sendOutboundEvent(event);
367         }
368     }
369 
370     @Override
371     public long pendingOutboundBytes(ChannelHandlerContext ctx) {
372         if (!outboundCtx.removed) {
373             return outboundCtx.handler().pendingOutboundBytes(outboundCtx);
374         }
375         return 0;
376     }
377 
378     private static final class CombinedChannelHandlerContext extends DelegatingChannelHandlerContext {
379 
380         private final ChannelHandler handler;
381         boolean removed;
382 
383         CombinedChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) {
384             super(ctx);
385             this.handler = handler;
386         }
387 
388         @Override
389         public boolean isRemoved() {
390             return delegatingCtx().isRemoved() || removed;
391         }
392 
393         @Override
394         public ChannelHandler handler() {
395             return handler;
396         }
397 
398         void remove() {
399             EventExecutor executor = executor();
400             if (executor.inEventLoop()) {
401                 remove0();
402             } else {
403                 executor.execute(this::remove0);
404             }
405         }
406 
407         private void remove0() {
408             if (!removed) {
409                 removed = true;
410                 try {
411                     handler.handlerRemoved(this);
412                 } catch (Throwable cause) {
413                     this.fireChannelExceptionCaught(new ChannelPipelineException(
414                             handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause));
415                 }
416             }
417         }
418     }
419 }