View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version
5    * 2.0 (the "License"); you may not use this file except in compliance with the
6    * License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations under
14   * the License.
15   */
16  package io.netty.handler.flow;
17  
18  import java.util.ArrayDeque;
19  import java.util.Queue;
20  
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelDuplexHandler;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.MessageToByteEncoder;
27  import io.netty.util.Recycler;
28  import io.netty.util.ReferenceCountUtil;
29  import io.netty.util.internal.ObjectPool.Handle;
30  import io.netty.util.internal.logging.InternalLogger;
31  import io.netty.util.internal.logging.InternalLoggerFactory;
32  
33  /**
34   * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
35   * <p>
36   * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
37   * many events as they like for any given input. A channel's auto reading configuration doesn't usually
38   * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
39   * like to hold subsequent events while they're processing one event. It's a common problem with the
40   * {@code HttpObjectDecoder} that will very often fire an {@code HttpRequest} that is immediately followed
41   * by a {@code LastHttpContent} event.
42   *
43   * <pre>{@code
44   * ChannelPipeline pipeline = ...;
45   *
46   * pipeline.addLast(new HttpServerCodec());
47   * pipeline.addLast(new FlowControlHandler());
48   *
49   * pipeline.addLast(new MyExampleHandler());
50   *
51   * class MyExampleHandler extends ChannelInboundHandlerAdapter {
52   *   @Override
53   *   public void channelRead(ChannelHandlerContext ctx, Object msg) {
54   *     if (msg instanceof HttpRequest) {
55   *       ctx.channel().config().setAutoRead(false);
56   *
57   *       // The FlowControlHandler will hold any subsequent events that
58   *       // were emitted by HttpObjectDecoder until auto reading is turned
59   *       // back on or Channel#read() is being called.
60   *     }
61   *   }
62   * }
63   * }</pre>
64   *
65   * @see ChannelConfig#setAutoRead(boolean)
66   */
67  public class FlowControlHandler extends ChannelDuplexHandler {
68      private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
69  
70      private final boolean releaseMessages;
71  
72      private RecyclableArrayDeque queue;
73  
74      private ChannelConfig config;
75  
76      /**
77       * Number of unsatisfied downstream {@code read()} calls. A downstream {@code read()} is considered unsatisfied
78       * if auto-read is off and if it has not yet been paired with a {@code fireChannelRead} or
79       * a cumulative {@code fireChannelReadComplete}.
80       * <p>
81       * A {@code read()} can be satisfied in three ways, whichever comes first:
82       * <ul>
83       *     <li>inside the {@code read()} call itself, by {@code dequeue()}ing a message</li>
84       *     <li>in a {@code channelRead()}</li>
85       *     <li>in a {@code channelReadComplete()}</li>
86       * </ul>
87       * A {@code read()} can be satisfied with auto-read on.
88       * <p>
89       * When one or more {@code read()} calls are unsatisfied, a downstream {@code channelReadComplete} is fired
90       * only when either of the following happens:
91       * <ul>
92       *     <li>auto-read is off and {@code unsatisfiedReads} returns to zero after {@code dequeue()}ing, or</li>
93       *     <li>an upstream {@code channelReadComplete} arrives</li>
94       * </ul>
95       */
96      private int unsatisfiedReads;
97  
98      public FlowControlHandler() {
99          this(true);
100     }
101 
102     public FlowControlHandler(boolean releaseMessages) {
103         this.releaseMessages = releaseMessages;
104     }
105 
106     /**
107      * Determine if the underlying {@link Queue} is empty. This method exists for
108      * testing, debugging and inspection purposes and it is not Thread safe!
109      */
110     boolean isQueueEmpty() {
111         return queue == null || queue.isEmpty();
112     }
113 
114     /**
115      * Releases all messages and destroys the {@link Queue}.
116      */
117     private void destroy() {
118         if (queue != null) {
119 
120             if (!queue.isEmpty()) {
121                 logger.trace("Non-empty queue: {}", queue);
122 
123                 if (releaseMessages) {
124                     Object msg;
125                     while ((msg = queue.poll()) != null) {
126                         ReferenceCountUtil.safeRelease(msg);
127                     }
128                 }
129             }
130 
131             queue.recycle();
132             queue = null;
133         }
134     }
135 
136     @Override
137     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
138         config = ctx.channel().config();
139     }
140 
141     @Override
142     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
143         super.handlerRemoved(ctx);
144         if (!isQueueEmpty()) {
145             dequeueAll(ctx);
146             ctx.fireChannelReadComplete();
147         }
148         destroy();
149     }
150 
151     @Override
152     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
153         destroy();
154         ctx.fireChannelInactive();
155     }
156 
157     @Override
158     public void read(ChannelHandlerContext ctx) throws Exception {
159         if (config.isAutoRead()) {
160             dequeueAll(ctx);
161             ctx.read();
162         } else {
163             unsatisfiedReads++;
164 
165             if (dequeueOne(ctx)) {
166                 if (unsatisfiedReads == 0) {
167                     ctx.fireChannelReadComplete();
168                 }
169             } else {
170                 // Could not satisfy the read() from the queue.
171                 // We need to request data from upstream so we can satisfy the read() in channelRead() or
172                 // channelReadComplete() if it is going to be an empty read.
173                 ctx.read();
174             }
175         }
176     }
177 
178     @Override
179     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
180         if (queue == null) {
181             queue = RecyclableArrayDeque.newInstance();
182         }
183 
184         queue.offer(msg);
185 
186         if (config.isAutoRead()) {
187             dequeueAll(ctx);
188         } else if (unsatisfiedReads > 0) {
189             dequeueOne(ctx);
190 
191             if (unsatisfiedReads == 0) {
192                 ctx.fireChannelReadComplete();
193             }
194         }
195     }
196 
197     @Override
198     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
199         // Upstream closed the read cycle. Collapse every outstanding read() into a single downstream
200         // channelReadComplete; spurious upstream completions with no pending read are dropped.
201         if (config.isAutoRead() || unsatisfiedReads > 0) {
202             unsatisfiedReads = 0;
203             ctx.fireChannelReadComplete();
204         }
205     }
206 
207     private boolean dequeueOne(ChannelHandlerContext ctx) {
208         return dequeue(ctx, 1) > 0;
209     }
210 
211     private int dequeueAll(ChannelHandlerContext ctx) {
212         return dequeue(ctx, -1);
213     }
214 
215     /**
216      * Dequeues up to {@code maxConsume} messages, fires them downstream and
217      * updates {@code unsatisfiedReads} accordingly. If {@code maxConsume} is negative,
218      * there is no upper limit on the number of messages to dequeue and fire downstream.
219      *
220      * @see #read(ChannelHandlerContext)
221      * @see #channelRead(ChannelHandlerContext, Object)
222      */
223     private int dequeue(ChannelHandlerContext ctx, int maxConsume) {
224         int consumed = 0;
225 
226         // fireChannelRead(...) may call ctx.read() and so this method may be re-entered. Because of that
227         // we need to check if queue was set to null in the meantime and, if so, break out of the loop.
228         while (queue != null && (consumed < maxConsume || maxConsume < 0)) {
229             Object msg = queue.poll();
230             if (msg == null) {
231                 break;
232             }
233 
234             ++consumed;
235             ctx.fireChannelRead(msg);
236         }
237 
238         if (queue != null && queue.isEmpty()) {
239             queue.recycle();
240             queue = null;
241         }
242 
243         unsatisfiedReads = Math.max(unsatisfiedReads - consumed, 0);
244 
245         return consumed;
246     }
247 
248     /**
249      * A recyclable {@link ArrayDeque}.
250      */
251     private static final class RecyclableArrayDeque extends ArrayDeque<Object> {
252 
253         private static final long serialVersionUID = 0L;
254 
255         /**
256          * A value of {@code 2} should be a good choice for most scenarios.
257          */
258         private static final int DEFAULT_NUM_ELEMENTS = 2;
259 
260         private static final Recycler<RecyclableArrayDeque> RECYCLER =
261                 new Recycler<RecyclableArrayDeque>() {
262                     @Override
263                     protected RecyclableArrayDeque newObject(Handle<RecyclableArrayDeque> handle) {
264                         return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle);
265                     }
266                 };
267 
268         public static RecyclableArrayDeque newInstance() {
269             return RECYCLER.get();
270         }
271 
272         private final Handle<RecyclableArrayDeque> handle;
273 
274         private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) {
275             super(numElements);
276             this.handle = handle;
277         }
278 
279         public void recycle() {
280             clear();
281             handle.recycle(this);
282         }
283     }
284 }