View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version
5    * 2.0 (the "License"); you may not use this file except in compliance with the
6    * License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations under
14   * the License.
15   */
16  package io.netty5.handler.flow;
17  
18  import io.netty5.channel.ChannelOption;
19  import io.netty5.handler.codec.ByteToMessageDecoder;
20  import io.netty5.handler.codec.MessageToByteEncoder;
21  import io.netty5.util.Resource;
22  import io.netty5.channel.ChannelHandler;
23  import io.netty5.channel.ChannelHandlerContext;
24  import io.netty5.util.internal.ObjectPool;
25  import io.netty5.util.internal.ObjectPool.Handle;
26  import io.netty5.util.internal.logging.InternalLogger;
27  import io.netty5.util.internal.logging.InternalLoggerFactory;
28  
29  import java.util.ArrayDeque;
30  import java.util.Queue;
31  
32  /**
33   * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
34   *
35   * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
36   * many events as they like for any given input. A channel's auto reading configuration doesn't usually
37   * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
38   * like to hold subsequent events while they're processing one event. It's a common problem with the
39   * {@code HttpObjectDecoder} that will very often fire an {@code HttpRequest} that is immediately followed
40   * by a {@code LastHttpContent} event.
41   *
42   * <pre>{@code
43   * ChannelPipeline pipeline = ...;
44   *
45   * pipeline.addLast(new HttpServerCodec());
46   * pipeline.addLast(new FlowControlHandler());
47   *
48   * pipeline.addLast(new MyExampleHandler());
49   *
50   * class MyExampleHandler extends ChannelInboundHandlerAdapter {
51   *   @Override
52   *   public void channelRead(ChannelHandlerContext ctx, Object msg) {
53   *     if (msg instanceof HttpRequest) {
54   *       ctx.channel().setChannelOption(ChannelOption.AUTO_READ, false);
55   *
56   *       // The FlowControlHandler will hold any subsequent events that
57   *       // were emitted by HttpObjectDecoder until auto reading is turned
58   *       // back on or Channel#read() is being called.
59   *     }
60   *   }
61   * }
62   * }</pre>
63   *
64   * @see ChannelOption#AUTO_READ)
65   */
66  public class FlowControlHandler implements ChannelHandler {
67      private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
68  
69      private final boolean releaseMessages;
70  
71      private RecyclableArrayDeque queue;
72  
73      private boolean shouldConsume;
74  
75      public FlowControlHandler() {
76          this(true);
77      }
78  
79      public FlowControlHandler(boolean releaseMessages) {
80          this.releaseMessages = releaseMessages;
81      }
82  
83      /**
84       * Determine if the underlying {@link Queue} is empty. This method exists for
85       * testing, debugging and inspection purposes and it is not Thread safe!
86       */
87      boolean isQueueEmpty() {
88          return queue == null || queue.isEmpty();
89      }
90  
91      /**
92       * Releases all messages and destroys the {@link Queue}.
93       */
94      private void destroy() {
95          if (queue != null) {
96  
97              if (!queue.isEmpty()) {
98                  logger.trace("Non-empty queue: {}", queue);
99  
100                 if (releaseMessages) {
101                     Object msg;
102                     while ((msg = queue.poll()) != null) {
103                         try {
104                             Resource.dispose(msg);
105                         } catch (Exception e) {
106                             logger.trace("Exception while disposing of message in flow control", e);
107                         }
108                     }
109                 }
110             }
111 
112             queue.recycle();
113             queue = null;
114         }
115     }
116 
117     @Override
118     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
119         if (!isQueueEmpty()) {
120             dequeue(ctx, queue.size());
121         }
122         destroy();
123     }
124 
125     @Override
126     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
127         destroy();
128         ctx.fireChannelInactive();
129     }
130 
131     @Override
132     public void read(ChannelHandlerContext ctx) {
133         if (dequeue(ctx, 1) == 0) {
134             // It seems no messages were consumed. We need to read() some
135             // messages from upstream and once one arrives it need to be
136             // relayed to downstream to keep the flow going.
137             shouldConsume = true;
138             ctx.read();
139         }
140     }
141 
142     @Override
143     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
144         if (queue == null) {
145             queue = RecyclableArrayDeque.newInstance();
146         }
147 
148         queue.offer(msg);
149 
150         // We just received one message. Do we need to relay it regardless
151         // of the auto reading configuration? The answer is yes if this
152         // method was called as a result of a prior read() call.
153         int minConsume = shouldConsume ? 1 : 0;
154         shouldConsume = false;
155 
156         dequeue(ctx, minConsume);
157     }
158 
159     @Override
160     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
161         if (isQueueEmpty()) {
162             ctx.fireChannelReadComplete();
163         } else {
164             // Don't relay completion events from upstream as they
165             // make no sense in this context. See dequeue() where
166             // a new set of completion events is being produced.
167         }
168     }
169 
170     /**
171      * Dequeues one or many (or none) messages depending on the channel's auto
172      * reading state and returns the number of messages that were consumed from
173      * the internal queue.
174      *
175      * The {@code minConsume} argument is used to force {@code dequeue()} into
176      * consuming that number of messages regardless of the channel's auto
177      * reading configuration.
178      *
179      * @see #read(ChannelHandlerContext)
180      * @see #channelRead(ChannelHandlerContext, Object)
181      */
182     private int dequeue(ChannelHandlerContext ctx, int minConsume) {
183         int consumed = 0;
184 
185         // fireChannelRead(...) may call ctx.read() and so this method may reentrance. Because of this we need to
186         // check if queue was set to null in the meantime and if so break the loop.
187         while (queue != null && (consumed < minConsume || ctx.channel().getOption(ChannelOption.AUTO_READ))) {
188             Object msg = queue.poll();
189             if (msg == null) {
190                 break;
191             }
192 
193             ++consumed;
194             ctx.fireChannelRead(msg);
195         }
196 
197         // We're firing a completion event every time one (or more)
198         // messages were consumed and the queue ended up being drained
199         // to an empty state.
200         if (queue != null && queue.isEmpty()) {
201             queue.recycle();
202             queue = null;
203 
204             if (consumed > 0) {
205                 ctx.fireChannelReadComplete();
206             }
207         }
208 
209         return consumed;
210     }
211 
212     /**
213      * A recyclable {@link ArrayDeque}.
214      */
215     private static final class RecyclableArrayDeque extends ArrayDeque<Object> {
216 
217         private static final long serialVersionUID = 0L;
218 
219         /**
220          * A value of {@code 2} should be a good choice for most scenarios.
221          */
222         private static final int DEFAULT_NUM_ELEMENTS = 2;
223 
224         private static final ObjectPool<RecyclableArrayDeque> RECYCLER = ObjectPool.newPool(
225                 handle -> new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle));
226 
227         public static RecyclableArrayDeque newInstance() {
228             return RECYCLER.get();
229         }
230 
231         private final Handle<RecyclableArrayDeque> handle;
232 
233         private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) {
234             super(numElements);
235             this.handle = handle;
236         }
237 
238         public void recycle() {
239             clear();
240             handle.recycle(this);
241         }
242     }
243 }