View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version
5    * 2.0 (the "License"); you may not use this file except in compliance with the
6    * License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations under
14   * the License.
15   */
16  package io.netty.handler.flow;
17  
18  import java.util.ArrayDeque;
19  import java.util.Queue;
20  
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelDuplexHandler;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.MessageToByteEncoder;
27  import io.netty.util.Recycler;
28  import io.netty.util.ReferenceCountUtil;
29  import io.netty.util.internal.ObjectPool.Handle;
30  import io.netty.util.internal.logging.InternalLogger;
31  import io.netty.util.internal.logging.InternalLoggerFactory;
32  
33  /**
34   * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
35   *
36   * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
37   * many events as they like for any given input. A channel's auto reading configuration doesn't usually
38   * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
39   * like to hold subsequent events while they're processing one event. It's a common problem with the
40   * {@code HttpObjectDecoder} that will very often fire an {@code HttpRequest} that is immediately followed
41   * by a {@code LastHttpContent} event.
42   *
43   * <pre>{@code
44   * ChannelPipeline pipeline = ...;
45   *
46   * pipeline.addLast(new HttpServerCodec());
47   * pipeline.addLast(new FlowControlHandler());
48   *
49   * pipeline.addLast(new MyExampleHandler());
50   *
51   * class MyExampleHandler extends ChannelInboundHandlerAdapter {
52   *   @Override
53   *   public void channelRead(ChannelHandlerContext ctx, Object msg) {
54   *     if (msg instanceof HttpRequest) {
55   *       ctx.channel().config().setAutoRead(false);
56   *
57   *       // The FlowControlHandler will hold any subsequent events that
58   *       // were emitted by HttpObjectDecoder until auto reading is turned
59   *       // back on or Channel#read() is being called.
60   *     }
61   *   }
62   * }
63   * }</pre>
64   *
65   * @see ChannelConfig#setAutoRead(boolean)
66   */
67  public class FlowControlHandler extends ChannelDuplexHandler {
68      private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
69  
70      private final boolean releaseMessages;
71  
72      private RecyclableArrayDeque queue;
73  
74      private ChannelConfig config;
75  
76      private boolean shouldConsume;
77  
78      public FlowControlHandler() {
79          this(true);
80      }
81  
82      public FlowControlHandler(boolean releaseMessages) {
83          this.releaseMessages = releaseMessages;
84      }
85  
86      /**
87       * Determine if the underlying {@link Queue} is empty. This method exists for
88       * testing, debugging and inspection purposes and it is not Thread safe!
89       */
90      boolean isQueueEmpty() {
91          return queue == null || queue.isEmpty();
92      }
93  
94      /**
95       * Releases all messages and destroys the {@link Queue}.
96       */
97      private void destroy() {
98          if (queue != null) {
99  
100             if (!queue.isEmpty()) {
101                 logger.trace("Non-empty queue: {}", queue);
102 
103                 if (releaseMessages) {
104                     Object msg;
105                     while ((msg = queue.poll()) != null) {
106                         ReferenceCountUtil.safeRelease(msg);
107                     }
108                 }
109             }
110 
111             queue.recycle();
112             queue = null;
113         }
114     }
115 
116     @Override
117     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
118         config = ctx.channel().config();
119     }
120 
121     @Override
122     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
123         super.handlerRemoved(ctx);
124         if (!isQueueEmpty()) {
125             dequeue(ctx, queue.size());
126         }
127         destroy();
128     }
129 
130     @Override
131     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
132         destroy();
133         ctx.fireChannelInactive();
134     }
135 
136     @Override
137     public void read(ChannelHandlerContext ctx) throws Exception {
138         if (dequeue(ctx, 1) == 0) {
139             // It seems no messages were consumed. We need to read() some
140             // messages from upstream and once one arrives it need to be
141             // relayed to downstream to keep the flow going.
142             shouldConsume = true;
143             ctx.read();
144         } else if (config.isAutoRead()) {
145             ctx.read();
146         }
147     }
148 
149     @Override
150     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
151         if (queue == null) {
152             queue = RecyclableArrayDeque.newInstance();
153         }
154 
155         queue.offer(msg);
156 
157         // We just received one message. Do we need to relay it regardless
158         // of the auto reading configuration? The answer is yes if this
159         // method was called as a result of a prior read() call.
160         int minConsume = shouldConsume ? 1 : 0;
161         shouldConsume = false;
162 
163         dequeue(ctx, minConsume);
164     }
165 
166     @Override
167     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
168         if (isQueueEmpty()) {
169             ctx.fireChannelReadComplete();
170         } else {
171             // Don't relay completion events from upstream as they
172             // make no sense in this context. See dequeue() where
173             // a new set of completion events is being produced.
174         }
175     }
176 
177     /**
178      * Dequeues one or many (or none) messages depending on the channel's auto
179      * reading state and returns the number of messages that were consumed from
180      * the internal queue.
181      *
182      * The {@code minConsume} argument is used to force {@code dequeue()} into
183      * consuming that number of messages regardless of the channel's auto
184      * reading configuration.
185      *
186      * @see #read(ChannelHandlerContext)
187      * @see #channelRead(ChannelHandlerContext, Object)
188      */
189     private int dequeue(ChannelHandlerContext ctx, int minConsume) {
190         int consumed = 0;
191 
192         // fireChannelRead(...) may call ctx.read() and so this method may reentrance. Because of this we need to
193         // check if queue was set to null in the meantime and if so break the loop.
194         while (queue != null && (consumed < minConsume || config.isAutoRead())) {
195             Object msg = queue.poll();
196             if (msg == null) {
197                 break;
198             }
199 
200             ++consumed;
201             ctx.fireChannelRead(msg);
202         }
203 
204         // We're firing a completion event every time one (or more)
205         // messages were consumed and the queue ended up being drained
206         // to an empty state.
207         if (queue != null && queue.isEmpty()) {
208             queue.recycle();
209             queue = null;
210 
211             if (consumed > 0) {
212                 ctx.fireChannelReadComplete();
213             }
214         }
215 
216         return consumed;
217     }
218 
219     /**
220      * A recyclable {@link ArrayDeque}.
221      */
222     private static final class RecyclableArrayDeque extends ArrayDeque<Object> {
223 
224         private static final long serialVersionUID = 0L;
225 
226         /**
227          * A value of {@code 2} should be a good choice for most scenarios.
228          */
229         private static final int DEFAULT_NUM_ELEMENTS = 2;
230 
231         private static final Recycler<RecyclableArrayDeque> RECYCLER =
232                 new Recycler<RecyclableArrayDeque>() {
233                     @Override
234                     protected RecyclableArrayDeque newObject(Handle<RecyclableArrayDeque> handle) {
235                         return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle);
236                     }
237                 };
238 
239         public static RecyclableArrayDeque newInstance() {
240             return RECYCLER.get();
241         }
242 
243         private final Handle<RecyclableArrayDeque> handle;
244 
245         private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) {
246             super(numElements);
247             this.handle = handle;
248         }
249 
250         public void recycle() {
251             clear();
252             handle.recycle(this);
253         }
254     }
255 }