View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version
5    * 2.0 (the "License"); you may not use this file except in compliance with the
6    * License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations under
14   * the License.
15   */
16  package io.netty.handler.flow;
17  
18  import java.util.ArrayDeque;
19  import java.util.Queue;
20  
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelDuplexHandler;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.MessageToByteEncoder;
27  import io.netty.util.ReferenceCountUtil;
28  import io.netty.util.internal.ObjectPool;
29  import io.netty.util.internal.ObjectPool.Handle;
30  import io.netty.util.internal.ObjectPool.ObjectCreator;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  /**
35   * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
36   *
37   * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
38   * many events as they like for any given input. A channel's auto reading configuration doesn't usually
39   * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
40   * like to hold subsequent events while they're processing one event. It's a common problem with the
41   * {@code HttpObjectDecoder} that will very often fire an {@code HttpRequest} that is immediately followed
42   * by a {@code LastHttpContent} event.
43   *
44   * <pre>{@code
45   * ChannelPipeline pipeline = ...;
46   *
47   * pipeline.addLast(new HttpServerCodec());
48   * pipeline.addLast(new FlowControlHandler());
49   *
50   * pipeline.addLast(new MyExampleHandler());
51   *
52   * class MyExampleHandler extends ChannelInboundHandlerAdapter {
53   *   @Override
54   *   public void channelRead(ChannelHandlerContext ctx, Object msg) {
55   *     if (msg instanceof HttpRequest) {
56   *       ctx.channel().config().setAutoRead(false);
57   *
58   *       // The FlowControlHandler will hold any subsequent events that
59   *       // were emitted by HttpObjectDecoder until auto reading is turned
60   *       // back on or Channel#read() is being called.
61   *     }
62   *   }
63   * }
64   * }</pre>
65   *
66   * @see ChannelConfig#setAutoRead(boolean)
67   */
68  public class FlowControlHandler extends ChannelDuplexHandler {
69      private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
70  
71      private final boolean releaseMessages;
72  
73      private RecyclableArrayDeque queue;
74  
75      private ChannelConfig config;
76  
77      private boolean shouldConsume;
78  
79      public FlowControlHandler() {
80          this(true);
81      }
82  
83      public FlowControlHandler(boolean releaseMessages) {
84          this.releaseMessages = releaseMessages;
85      }
86  
87      /**
88       * Determine if the underlying {@link Queue} is empty. This method exists for
89       * testing, debugging and inspection purposes and it is not Thread safe!
90       */
91      boolean isQueueEmpty() {
92          return queue == null || queue.isEmpty();
93      }
94  
95      /**
96       * Releases all messages and destroys the {@link Queue}.
97       */
98      private void destroy() {
99          if (queue != null) {
100 
101             if (!queue.isEmpty()) {
102                 logger.trace("Non-empty queue: {}", queue);
103 
104                 if (releaseMessages) {
105                     Object msg;
106                     while ((msg = queue.poll()) != null) {
107                         ReferenceCountUtil.safeRelease(msg);
108                     }
109                 }
110             }
111 
112             queue.recycle();
113             queue = null;
114         }
115     }
116 
117     @Override
118     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
119         config = ctx.channel().config();
120     }
121 
122     @Override
123     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
124         super.handlerRemoved(ctx);
125         if (!isQueueEmpty()) {
126             dequeue(ctx, queue.size());
127         }
128         destroy();
129     }
130 
131     @Override
132     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
133         destroy();
134         ctx.fireChannelInactive();
135     }
136 
137     @Override
138     public void read(ChannelHandlerContext ctx) throws Exception {
139         if (dequeue(ctx, 1) == 0) {
140             // It seems no messages were consumed. We need to read() some
141             // messages from upstream and once one arrives it need to be
142             // relayed to downstream to keep the flow going.
143             shouldConsume = true;
144             ctx.read();
145         } else if (config.isAutoRead()) {
146             ctx.read();
147         }
148     }
149 
150     @Override
151     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
152         if (queue == null) {
153             queue = RecyclableArrayDeque.newInstance();
154         }
155 
156         queue.offer(msg);
157 
158         // We just received one message. Do we need to relay it regardless
159         // of the auto reading configuration? The answer is yes if this
160         // method was called as a result of a prior read() call.
161         int minConsume = shouldConsume ? 1 : 0;
162         shouldConsume = false;
163 
164         dequeue(ctx, minConsume);
165     }
166 
167     @Override
168     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
169         if (isQueueEmpty()) {
170             ctx.fireChannelReadComplete();
171         } else {
172             // Don't relay completion events from upstream as they
173             // make no sense in this context. See dequeue() where
174             // a new set of completion events is being produced.
175         }
176     }
177 
178     /**
179      * Dequeues one or many (or none) messages depending on the channel's auto
180      * reading state and returns the number of messages that were consumed from
181      * the internal queue.
182      *
183      * The {@code minConsume} argument is used to force {@code dequeue()} into
184      * consuming that number of messages regardless of the channel's auto
185      * reading configuration.
186      *
187      * @see #read(ChannelHandlerContext)
188      * @see #channelRead(ChannelHandlerContext, Object)
189      */
190     private int dequeue(ChannelHandlerContext ctx, int minConsume) {
191         int consumed = 0;
192 
193         // fireChannelRead(...) may call ctx.read() and so this method may reentrance. Because of this we need to
194         // check if queue was set to null in the meantime and if so break the loop.
195         while (queue != null && (consumed < minConsume || config.isAutoRead())) {
196             Object msg = queue.poll();
197             if (msg == null) {
198                 break;
199             }
200 
201             ++consumed;
202             ctx.fireChannelRead(msg);
203         }
204 
205         // We're firing a completion event every time one (or more)
206         // messages were consumed and the queue ended up being drained
207         // to an empty state.
208         if (queue != null && queue.isEmpty()) {
209             queue.recycle();
210             queue = null;
211 
212             if (consumed > 0) {
213                 ctx.fireChannelReadComplete();
214             }
215         }
216 
217         return consumed;
218     }
219 
220     /**
221      * A recyclable {@link ArrayDeque}.
222      */
223     private static final class RecyclableArrayDeque extends ArrayDeque<Object> {
224 
225         private static final long serialVersionUID = 0L;
226 
227         /**
228          * A value of {@code 2} should be a good choice for most scenarios.
229          */
230         private static final int DEFAULT_NUM_ELEMENTS = 2;
231 
232         private static final ObjectPool<RecyclableArrayDeque> RECYCLER = ObjectPool.newPool(
233                 new ObjectCreator<RecyclableArrayDeque>() {
234             @Override
235             public RecyclableArrayDeque newObject(Handle<RecyclableArrayDeque> handle) {
236                 return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle);
237             }
238         });
239 
240         public static RecyclableArrayDeque newInstance() {
241             return RECYCLER.get();
242         }
243 
244         private final Handle<RecyclableArrayDeque> handle;
245 
246         private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) {
247             super(numElements);
248             this.handle = handle;
249         }
250 
251         public void recycle() {
252             clear();
253             handle.recycle(this);
254         }
255     }
256 }