View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.ChannelHandler.Skip;
20  import io.netty.util.Attribute;
21  import io.netty.util.AttributeKey;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.ResourceLeakHint;
24  import io.netty.util.concurrent.EventExecutor;
25  import io.netty.util.concurrent.FastThreadLocal;
26  import io.netty.util.concurrent.PausableEventExecutor;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.StringUtil;
29  import java.net.SocketAddress;
30  import java.util.WeakHashMap;
31  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
32  
33  /**
34   * Abstract base class for {@link ChannelHandlerContext} implementations.
35   */
36  abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
37  
38      // This class keeps an integer member field 'skipFlags' whose each bit tells if the corresponding handler method
39      // is annotated with @Skip. 'skipFlags' is retrieved in runtime via the reflection API and is cached.
40      // The following constants signify which bit of 'skipFlags' corresponds to which handler method:
41  
42      static final int MASK_HANDLER_ADDED = 1;
43      static final int MASK_HANDLER_REMOVED = 1 << 1;
44  
45      private static final int MASK_EXCEPTION_CAUGHT = 1 << 2;
46      private static final int MASK_CHANNEL_REGISTERED = 1 << 3;
47      private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4;
48      private static final int MASK_CHANNEL_ACTIVE = 1 << 5;
49      private static final int MASK_CHANNEL_INACTIVE = 1 << 6;
50      private static final int MASK_CHANNEL_READ = 1 << 7;
51      private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8;
52      private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9;
53      private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10;
54  
55      private static final int MASK_BIND = 1 << 11;
56      private static final int MASK_CONNECT = 1 << 12;
57      private static final int MASK_DISCONNECT = 1 << 13;
58      private static final int MASK_CLOSE = 1 << 14;
59      private static final int MASK_DEREGISTER = 1 << 15;
60      private static final int MASK_READ = 1 << 16;
61      private static final int MASK_WRITE = 1 << 17;
62      private static final int MASK_FLUSH = 1 << 18;
63  
64      private static final int MASKGROUP_INBOUND = MASK_EXCEPTION_CAUGHT |
65              MASK_CHANNEL_REGISTERED |
66              MASK_CHANNEL_UNREGISTERED |
67              MASK_CHANNEL_ACTIVE |
68              MASK_CHANNEL_INACTIVE |
69              MASK_CHANNEL_READ |
70              MASK_CHANNEL_READ_COMPLETE |
71              MASK_CHANNEL_WRITABILITY_CHANGED |
72              MASK_USER_EVENT_TRIGGERED;
73  
74      private static final int MASKGROUP_OUTBOUND = MASK_BIND |
75              MASK_CONNECT |
76              MASK_DISCONNECT |
77              MASK_CLOSE |
78              MASK_DEREGISTER |
79              MASK_READ |
80              MASK_WRITE |
81              MASK_FLUSH;
82  
83      /**
84       * Cache the result of the costly generation of {@link #skipFlags} in a thread-local {@link WeakHashMap}.
85       */
86      private static final FastThreadLocal<WeakHashMap<Class<?>, Integer>> skipFlagsCache =
87              new FastThreadLocal<WeakHashMap<Class<?>, Integer>>() {
88                  @Override
89                  protected WeakHashMap<Class<?>, Integer> initialValue() throws Exception {
90                      return new WeakHashMap<Class<?>, Integer>();
91                  }
92              };
93  
94      private static final AtomicReferenceFieldUpdater<AbstractChannelHandlerContext, PausableChannelEventExecutor>
95              WRAPPED_EVENTEXECUTOR_UPDATER;
96  
97      static {
98          AtomicReferenceFieldUpdater<AbstractChannelHandlerContext, PausableChannelEventExecutor> updater =
99                 PlatformDependent.newAtomicReferenceFieldUpdater(
100                        AbstractChannelHandlerContext.class, "wrappedEventLoop");
101         if (updater == null) {
102             updater = AtomicReferenceFieldUpdater.newUpdater(
103                     AbstractChannelHandlerContext.class, PausableChannelEventExecutor.class, "wrappedEventLoop");
104         }
105         WRAPPED_EVENTEXECUTOR_UPDATER = updater;
106     }
107 
108     /**
109      * Returns an integer bitset that tells which handler methods were annotated with {@link Skip}.
110      * It gets the value from {@link #skipFlagsCache} if an handler of the same type were queried before.
111      * Otherwise, it delegates to {@link #skipFlags0(Class)} to get it.
112      */
113     static int skipFlags(ChannelHandler handler) {
114         WeakHashMap<Class<?>, Integer> cache = skipFlagsCache.get();
115         Class<? extends ChannelHandler> handlerType = handler.getClass();
116         int flagsVal;
117         Integer flags = cache.get(handlerType);
118         if (flags != null) {
119             flagsVal = flags;
120         } else {
121             flagsVal = skipFlags0(handlerType);
122             cache.put(handlerType, Integer.valueOf(flagsVal));
123         }
124 
125         return flagsVal;
126     }
127 
128     /**
129      * Determines the {@link #skipFlags} of the specified {@code handlerType} using the reflection API.
130      */
131     static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
132         int flags = 0;
133         try {
134             if (isSkippable(handlerType, "handlerAdded")) {
135                 flags |= MASK_HANDLER_ADDED;
136             }
137             if (isSkippable(handlerType, "handlerRemoved")) {
138                 flags |= MASK_HANDLER_REMOVED;
139             }
140             if (isSkippable(handlerType, "exceptionCaught", Throwable.class)) {
141                 flags |= MASK_EXCEPTION_CAUGHT;
142             }
143             if (isSkippable(handlerType, "channelRegistered")) {
144                 flags |= MASK_CHANNEL_REGISTERED;
145             }
146             if (isSkippable(handlerType, "channelUnregistered")) {
147                 flags |= MASK_CHANNEL_UNREGISTERED;
148             }
149             if (isSkippable(handlerType, "channelActive")) {
150                 flags |= MASK_CHANNEL_ACTIVE;
151             }
152             if (isSkippable(handlerType, "channelInactive")) {
153                 flags |= MASK_CHANNEL_INACTIVE;
154             }
155             if (isSkippable(handlerType, "channelRead", Object.class)) {
156                 flags |= MASK_CHANNEL_READ;
157             }
158             if (isSkippable(handlerType, "channelReadComplete")) {
159                 flags |= MASK_CHANNEL_READ_COMPLETE;
160             }
161             if (isSkippable(handlerType, "channelWritabilityChanged")) {
162                 flags |= MASK_CHANNEL_WRITABILITY_CHANGED;
163             }
164             if (isSkippable(handlerType, "userEventTriggered", Object.class)) {
165                 flags |= MASK_USER_EVENT_TRIGGERED;
166             }
167             if (isSkippable(handlerType, "bind", SocketAddress.class, ChannelPromise.class)) {
168                 flags |= MASK_BIND;
169             }
170             if (isSkippable(handlerType, "connect", SocketAddress.class, SocketAddress.class, ChannelPromise.class)) {
171                 flags |= MASK_CONNECT;
172             }
173             if (isSkippable(handlerType, "disconnect", ChannelPromise.class)) {
174                 flags |= MASK_DISCONNECT;
175             }
176             if (isSkippable(handlerType, "close", ChannelPromise.class)) {
177                 flags |= MASK_CLOSE;
178             }
179             if (isSkippable(handlerType, "deregister", ChannelPromise.class)) {
180                 flags |= MASK_DEREGISTER;
181             }
182             if (isSkippable(handlerType, "read")) {
183                 flags |= MASK_READ;
184             }
185             if (isSkippable(handlerType, "write", Object.class, ChannelPromise.class)) {
186                 flags |= MASK_WRITE;
187             }
188             if (isSkippable(handlerType, "flush")) {
189                 flags |= MASK_FLUSH;
190             }
191         } catch (Exception e) {
192             // Should never reach here.
193             PlatformDependent.throwException(e);
194         }
195 
196         return flags;
197     }
198 
199     @SuppressWarnings("rawtypes")
200     private static boolean isSkippable(
201             Class<?> handlerType, String methodName, Class<?>... paramTypes) throws Exception {
202 
203         Class[] newParamTypes = new Class[paramTypes.length + 1];
204         newParamTypes[0] = ChannelHandlerContext.class;
205         System.arraycopy(paramTypes, 0, newParamTypes, 1, paramTypes.length);
206 
207         return handlerType.getMethod(methodName, newParamTypes).isAnnotationPresent(Skip.class);
208     }
209 
210     volatile AbstractChannelHandlerContext next;
211     volatile AbstractChannelHandlerContext prev;
212 
213     private final AbstractChannel channel;
214     private final DefaultChannelPipeline pipeline;
215     private final String name;
216 
217     /**
218      * Set when the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} of
219      * this context's handler is invoked.
220      * Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
221      *
222      * See {@link #fireChannelReadComplete()} to understand how this flag is used.
223      */
224     boolean invokedThisChannelRead;
225 
226     /**
227      * Set when a user calls {@link #fireChannelRead(Object)} on this context.
228      * Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
229      *
230      * See {@link #fireChannelReadComplete()} to understand how this flag is used.
231      */
232     private volatile boolean invokedNextChannelRead;
233 
234     /**
235      * Set when a user calls {@link #read()} on this context.
236      * Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
237      *
238      * See {@link #fireChannelReadComplete()} to understand how this flag is used.
239      */
240     private volatile boolean invokedPrevRead;
241 
242     /**
243      * {@code true} if and only if this context has been removed from the pipeline.
244      */
245     private boolean removed;
246 
247     final int skipFlags;
248 
249     // Will be set to null if no child executor should be used, otherwise it will be set to the
250     // child executor.
251     final ChannelHandlerInvoker invoker;
252     private ChannelFuture succeededFuture;
253 
254     // Lazily instantiated tasks used to trigger events to a handler with different executor.
255     // These needs to be volatile as otherwise an other Thread may see an half initialized instance.
256     // See the JMM for more details
257     volatile Runnable invokeChannelReadCompleteTask;
258     volatile Runnable invokeReadTask;
259     volatile Runnable invokeFlushTask;
260     volatile Runnable invokeChannelWritableStateChangedTask;
261 
262     /**
263      * Wrapped {@link EventLoop} and {@link ChannelHandlerInvoker} to support {@link Channel#deregister()}.
264      */
265     @SuppressWarnings("UnusedDeclaration")
266     private volatile PausableChannelEventExecutor wrappedEventLoop;
267 
268     AbstractChannelHandlerContext(
269             DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, int skipFlags) {
270 
271         if (name == null) {
272             throw new NullPointerException("name");
273         }
274 
275         channel = pipeline.channel;
276         this.pipeline = pipeline;
277         this.name = name;
278         this.invoker = invoker;
279         this.skipFlags = skipFlags;
280     }
281 
282     @Override
283     public final Channel channel() {
284         return channel;
285     }
286 
287     @Override
288     public ChannelPipeline pipeline() {
289         return pipeline;
290     }
291 
292     @Override
293     public ByteBufAllocator alloc() {
294         return channel().config().getAllocator();
295     }
296 
297     @Override
298     public final EventExecutor executor() {
299         if (invoker == null) {
300             return channel().eventLoop();
301         } else {
302             return wrappedEventLoop();
303         }
304     }
305 
306     @Override
307     public final ChannelHandlerInvoker invoker() {
308         if (invoker == null) {
309             return channel().eventLoop().asInvoker();
310         } else {
311             return wrappedEventLoop();
312         }
313     }
314 
315     private PausableChannelEventExecutor wrappedEventLoop() {
316         PausableChannelEventExecutor wrapped = wrappedEventLoop;
317         if (wrapped == null) {
318             wrapped = new PausableChannelEventExecutor0();
319             if (!WRAPPED_EVENTEXECUTOR_UPDATER.compareAndSet(this, null, wrapped)) {
320                 // Set in the meantime so we need to issue another volatile read
321                 return wrappedEventLoop;
322             }
323         }
324         return wrapped;
325     }
326 
327     @Override
328     public String name() {
329         return name;
330     }
331 
332     @Override
333     public <T> Attribute<T> attr(AttributeKey<T> key) {
334         return channel.attr(key);
335     }
336 
337     @Override
338     public <T> boolean hasAttr(AttributeKey<T> key) {
339         return channel.hasAttr(key);
340     }
341 
342     @Override
343     public ChannelHandlerContext fireChannelRegistered() {
344         AbstractChannelHandlerContext next = findContextInbound();
345         next.invoker().invokeChannelRegistered(next);
346         return this;
347     }
348 
349     @Override
350     public ChannelHandlerContext fireChannelUnregistered() {
351         AbstractChannelHandlerContext next = findContextInbound();
352         next.invoker().invokeChannelUnregistered(next);
353         return this;
354     }
355 
356     @Override
357     public ChannelHandlerContext fireChannelActive() {
358         AbstractChannelHandlerContext next = findContextInbound();
359         next.invoker().invokeChannelActive(next);
360         return this;
361     }
362 
363     @Override
364     public ChannelHandlerContext fireChannelInactive() {
365         AbstractChannelHandlerContext next = findContextInbound();
366         next.invoker().invokeChannelInactive(next);
367         return this;
368     }
369 
370     @Override
371     public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
372         AbstractChannelHandlerContext next = findContextInbound();
373         next.invoker().invokeExceptionCaught(next, cause);
374         return this;
375     }
376 
377     @Override
378     public ChannelHandlerContext fireUserEventTriggered(Object event) {
379         AbstractChannelHandlerContext next = findContextInbound();
380         next.invoker().invokeUserEventTriggered(next, event);
381         return this;
382     }
383 
384     @Override
385     public ChannelHandlerContext fireChannelRead(Object msg) {
386         AbstractChannelHandlerContext next = findContextInbound();
387         ReferenceCountUtil.touch(msg, next);
388         invokedNextChannelRead = true;
389         next.invoker().invokeChannelRead(next, msg);
390         return this;
391     }
392 
393     @Override
394     public ChannelHandlerContext fireChannelReadComplete() {
395         /**
396          * If the handler of this context did not produce any messages via {@link #fireChannelRead(Object)},
397          * there's no reason to trigger {@code channelReadComplete()} even if the handler called this method.
398          *
399          * This is pretty common for the handlers that transform multiple messages into one message,
400          * such as byte-to-message decoder and message aggregators.
401          *
402          * Only one exception is when nobody invoked the channelRead() method of this context's handler.
403          * It means the handler has been added later dynamically.
404          */
405         if (invokedNextChannelRead ||  // The handler of this context produced a message, or
406             !invokedThisChannelRead) { // it is not required to produce a message to trigger the event.
407 
408             invokedNextChannelRead = false;
409             invokedPrevRead = false;
410 
411             AbstractChannelHandlerContext next = findContextInbound();
412             next.invoker().invokeChannelReadComplete(next);
413             return this;
414         }
415 
416         /**
417          * At this point, we are sure the handler of this context did not produce anything and
418          * we suppressed the {@code channelReadComplete()} event.
419          *
420          * If the next handler invoked {@link #read()} to read something but nothing was produced
421          * by the handler of this context, someone has to issue another {@link #read()} operation
422          * until the handler of this context produces something.
423          *
424          * Why? Because otherwise the next handler will not receive {@code channelRead()} nor
425          * {@code channelReadComplete()} event at all for the {@link #read()} operation it issued.
426          */
427         if (invokedPrevRead && !channel().config().isAutoRead()) {
428             /**
429              * The next (or upstream) handler invoked {@link #read()}, but it didn't get any
430              * {@code channelRead()} event. We should read once more, so that the handler of the current
431              * context have a chance to produce something.
432              */
433             read();
434         } else {
435             invokedPrevRead = false;
436         }
437 
438         return this;
439     }
440 
441     @Override
442     public ChannelHandlerContext fireChannelWritabilityChanged() {
443         AbstractChannelHandlerContext next = findContextInbound();
444         next.invoker().invokeChannelWritabilityChanged(next);
445         return this;
446     }
447 
448     @Override
449     public ChannelFuture bind(SocketAddress localAddress) {
450         return bind(localAddress, newPromise());
451     }
452 
453     @Override
454     public ChannelFuture connect(SocketAddress remoteAddress) {
455         return connect(remoteAddress, newPromise());
456     }
457 
458     @Override
459     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
460         return connect(remoteAddress, localAddress, newPromise());
461     }
462 
463     @Override
464     public ChannelFuture disconnect() {
465         return disconnect(newPromise());
466     }
467 
468     @Override
469     public ChannelFuture close() {
470         return close(newPromise());
471     }
472 
473     @Override
474     public ChannelFuture deregister() {
475         return deregister(newPromise());
476     }
477 
478     @Override
479     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
480         AbstractChannelHandlerContext next = findContextOutbound();
481         next.invoker().invokeBind(next, localAddress, promise);
482         return promise;
483     }
484 
485     @Override
486     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
487         return connect(remoteAddress, null, promise);
488     }
489 
490     @Override
491     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
492         AbstractChannelHandlerContext next = findContextOutbound();
493         next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
494         return promise;
495     }
496 
497     @Override
498     public ChannelFuture disconnect(ChannelPromise promise) {
499         if (!channel().metadata().hasDisconnect()) {
500             return close(promise);
501         }
502 
503         AbstractChannelHandlerContext next = findContextOutbound();
504         next.invoker().invokeDisconnect(next, promise);
505         return promise;
506     }
507 
508     @Override
509     public ChannelFuture close(ChannelPromise promise) {
510         AbstractChannelHandlerContext next = findContextOutbound();
511         next.invoker().invokeClose(next, promise);
512         return promise;
513     }
514 
515     @Override
516     public ChannelFuture deregister(ChannelPromise promise) {
517         AbstractChannelHandlerContext next = findContextOutbound();
518         next.invoker().invokeDeregister(next, promise);
519         return promise;
520     }
521 
522     @Override
523     public ChannelHandlerContext read() {
524         AbstractChannelHandlerContext next = findContextOutbound();
525         invokedPrevRead = true;
526         next.invoker().invokeRead(next);
527         return this;
528     }
529 
530     @Override
531     public ChannelFuture write(Object msg) {
532         return write(msg, newPromise());
533     }
534 
535     @Override
536     public ChannelFuture write(Object msg, ChannelPromise promise) {
537         AbstractChannelHandlerContext next = findContextOutbound();
538         ReferenceCountUtil.touch(msg, next);
539         next.invoker().invokeWrite(next, msg, promise);
540         return promise;
541     }
542 
543     @Override
544     public ChannelHandlerContext flush() {
545         AbstractChannelHandlerContext next = findContextOutbound();
546         next.invoker().invokeFlush(next);
547         return this;
548     }
549 
550     @Override
551     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
552         AbstractChannelHandlerContext next;
553         next = findContextOutbound();
554         ReferenceCountUtil.touch(msg, next);
555         next.invoker().invokeWrite(next, msg, promise);
556         next = findContextOutbound();
557         next.invoker().invokeFlush(next);
558         return promise;
559     }
560 
561     @Override
562     public ChannelFuture writeAndFlush(Object msg) {
563         return writeAndFlush(msg, newPromise());
564     }
565 
566     @Override
567     public ChannelPromise newPromise() {
568         return new DefaultChannelPromise(channel(), executor());
569     }
570 
571     @Override
572     public ChannelProgressivePromise newProgressivePromise() {
573         return new DefaultChannelProgressivePromise(channel(), executor());
574     }
575 
576     @Override
577     public ChannelFuture newSucceededFuture() {
578         ChannelFuture succeededFuture = this.succeededFuture;
579         if (succeededFuture == null) {
580             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
581         }
582         return succeededFuture;
583     }
584 
585     @Override
586     public ChannelFuture newFailedFuture(Throwable cause) {
587         return new FailedChannelFuture(channel(), executor(), cause);
588     }
589 
590     private AbstractChannelHandlerContext findContextInbound() {
591         AbstractChannelHandlerContext ctx = this;
592         do {
593             ctx = ctx.next;
594         } while ((ctx.skipFlags & MASKGROUP_INBOUND) == MASKGROUP_INBOUND);
595         return ctx;
596     }
597 
598     private AbstractChannelHandlerContext findContextOutbound() {
599         AbstractChannelHandlerContext ctx = this;
600         do {
601             ctx = ctx.prev;
602         } while ((ctx.skipFlags & MASKGROUP_OUTBOUND) == MASKGROUP_OUTBOUND);
603         return ctx;
604     }
605 
606     @Override
607     public ChannelPromise voidPromise() {
608         return channel.voidPromise();
609     }
610 
611     void setRemoved() {
612         removed = true;
613     }
614 
615     @Override
616     public boolean isRemoved() {
617         return removed;
618     }
619 
620     @Override
621     public String toHintString() {
622         return '\'' + name + "' will handle the message from this point.";
623     }
624 
625     @Override
626     public String toString() {
627         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel + ')';
628     }
629 
630     private final class PausableChannelEventExecutor0 extends PausableChannelEventExecutor {
631 
632         @Override
633         public void rejectNewTasks() {
634             /**
635              * This cast is correct because {@link #channel()} always returns an {@link AbstractChannel} and
636              * {@link AbstractChannel#eventLoop()} always returns a {@link PausableChannelEventExecutor}.
637              */
638             ((PausableEventExecutor) channel().eventLoop()).rejectNewTasks();
639         }
640 
641         @Override
642         public void acceptNewTasks() {
643             ((PausableEventExecutor) channel().eventLoop()).acceptNewTasks();
644         }
645 
646         @Override
647         public boolean isAcceptingNewTasks() {
648             return ((PausableEventExecutor) channel().eventLoop()).isAcceptingNewTasks();
649         }
650 
651         @Override
652         public Channel channel() {
653             return AbstractChannelHandlerContext.this.channel();
654         }
655 
656         @Override
657         public EventExecutor unwrap() {
658             return unwrapInvoker().executor();
659         }
660 
661         @Override
662         public ChannelHandlerInvoker unwrapInvoker() {
663             /**
664              * {@link #invoker} can not be {@code null}, because {@link PausableChannelEventExecutor0} will only be
665              * instantiated if {@link #invoker} is not {@code null}.
666              */
667             return invoker;
668         }
669     }
670 }