1 /*
2 * Copyright 2016 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License, version
5 * 2.0 (the "License"); you may not use this file except in compliance with the
6 * License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
15 */
16 package io.netty.handler.flow;
17
18 import java.util.ArrayDeque;
19 import java.util.Queue;
20
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelDuplexHandler;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.MessageToByteEncoder;
27 import io.netty.util.Recycler;
28 import io.netty.util.ReferenceCountUtil;
29 import io.netty.util.internal.ObjectPool.Handle;
30 import io.netty.util.internal.logging.InternalLogger;
31 import io.netty.util.internal.logging.InternalLoggerFactory;
32
33 /**
34 * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
35 * <p>
36 * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
37 * many events as they like for any given input. A channel's auto reading configuration doesn't usually
38 * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
39 * like to hold subsequent events while they're processing one event. It's a common problem with the
40 * {@code HttpObjectDecoder} that will very often fire an {@code HttpRequest} that is immediately followed
41 * by a {@code LastHttpContent} event.
42 *
43 * <pre>{@code
44 * ChannelPipeline pipeline = ...;
45 *
46 * pipeline.addLast(new HttpServerCodec());
47 * pipeline.addLast(new FlowControlHandler());
48 *
49 * pipeline.addLast(new MyExampleHandler());
50 *
51 * class MyExampleHandler extends ChannelInboundHandlerAdapter {
52 * @Override
53 * public void channelRead(ChannelHandlerContext ctx, Object msg) {
54 * if (msg instanceof HttpRequest) {
55 * ctx.channel().config().setAutoRead(false);
56 *
57 * // The FlowControlHandler will hold any subsequent events that
58 * // were emitted by HttpObjectDecoder until auto reading is turned
59 * // back on or Channel#read() is being called.
60 * }
61 * }
62 * }
63 * }</pre>
64 *
65 * @see ChannelConfig#setAutoRead(boolean)
66 */
67 public class FlowControlHandler extends ChannelDuplexHandler {
68 private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
69
70 private final boolean releaseMessages;
71
72 private RecyclableArrayDeque queue;
73
74 private ChannelConfig config;
75
76 /**
77 * Number of unsatisfied downstream {@code read()} calls. A downstream {@code read()} is considered unsatisfied
78 * if auto-read is off and if it has not yet been paired with a {@code fireChannelRead} or
79 * a cumulative {@code fireChannelReadComplete}.
80 * <p>
81 * A {@code read()} can be satisfied in three ways, whichever comes first:
82 * <ul>
83 * <li>inside the {@code read()} call itself, by {@code dequeue()}ing a message</li>
84 * <li>in a {@code channelRead()}</li>
85 * <li>in a {@code channelReadComplete()}</li>
86 * </ul>
87 * A {@code read()} can be satisfied with auto-read on.
88 * <p>
89 * When one or more {@code read()} calls are unsatisfied, a downstream {@code channelReadComplete} is fired
90 * only when either of the following happens:
91 * <ul>
92 * <li>auto-read is off and {@code unsatisfiedReads} returns to zero after {@code dequeue()}ing, or</li>
93 * <li>an upstream {@code channelReadComplete} arrives</li>
94 * </ul>
95 */
96 private int unsatisfiedReads;
97
98 public FlowControlHandler() {
99 this(true);
100 }
101
102 public FlowControlHandler(boolean releaseMessages) {
103 this.releaseMessages = releaseMessages;
104 }
105
106 /**
107 * Determine if the underlying {@link Queue} is empty. This method exists for
108 * testing, debugging and inspection purposes and it is not Thread safe!
109 */
110 boolean isQueueEmpty() {
111 return queue == null || queue.isEmpty();
112 }
113
114 /**
115 * Releases all messages and destroys the {@link Queue}.
116 */
117 private void destroy() {
118 if (queue != null) {
119
120 if (!queue.isEmpty()) {
121 logger.trace("Non-empty queue: {}", queue);
122
123 if (releaseMessages) {
124 Object msg;
125 while ((msg = queue.poll()) != null) {
126 ReferenceCountUtil.safeRelease(msg);
127 }
128 }
129 }
130
131 queue.recycle();
132 queue = null;
133 }
134 }
135
136 @Override
137 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
138 config = ctx.channel().config();
139 }
140
141 @Override
142 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
143 super.handlerRemoved(ctx);
144 if (!isQueueEmpty()) {
145 dequeueAll(ctx);
146 ctx.fireChannelReadComplete();
147 }
148 destroy();
149 }
150
151 @Override
152 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
153 destroy();
154 ctx.fireChannelInactive();
155 }
156
157 @Override
158 public void read(ChannelHandlerContext ctx) throws Exception {
159 if (config.isAutoRead()) {
160 dequeueAll(ctx);
161 ctx.read();
162 } else {
163 unsatisfiedReads++;
164
165 if (dequeueOne(ctx)) {
166 if (unsatisfiedReads == 0) {
167 ctx.fireChannelReadComplete();
168 }
169 } else {
170 // Could not satisfy the read() from the queue.
171 // We need to request data from upstream so we can satisfy the read() in channelRead() or
172 // channelReadComplete() if it is going to be an empty read.
173 ctx.read();
174 }
175 }
176 }
177
178 @Override
179 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
180 if (queue == null) {
181 queue = RecyclableArrayDeque.newInstance();
182 }
183
184 queue.offer(msg);
185
186 if (config.isAutoRead()) {
187 dequeueAll(ctx);
188 } else if (unsatisfiedReads > 0) {
189 dequeueOne(ctx);
190
191 if (unsatisfiedReads == 0) {
192 ctx.fireChannelReadComplete();
193 }
194 }
195 }
196
197 @Override
198 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
199 // Upstream closed the read cycle. Collapse every outstanding read() into a single downstream
200 // channelReadComplete; spurious upstream completions with no pending read are dropped.
201 if (config.isAutoRead() || unsatisfiedReads > 0) {
202 unsatisfiedReads = 0;
203 ctx.fireChannelReadComplete();
204 }
205 }
206
207 private boolean dequeueOne(ChannelHandlerContext ctx) {
208 return dequeue(ctx, 1) > 0;
209 }
210
211 private int dequeueAll(ChannelHandlerContext ctx) {
212 return dequeue(ctx, -1);
213 }
214
215 /**
216 * Dequeues up to {@code maxConsume} messages, fires them downstream and
217 * updates {@code unsatisfiedReads} accordingly. If {@code maxConsume} is negative,
218 * there is no upper limit on the number of messages to dequeue and fire downstream.
219 *
220 * @see #read(ChannelHandlerContext)
221 * @see #channelRead(ChannelHandlerContext, Object)
222 */
223 private int dequeue(ChannelHandlerContext ctx, int maxConsume) {
224 int consumed = 0;
225
226 // fireChannelRead(...) may call ctx.read() and so this method may be re-entered. Because of that
227 // we need to check if queue was set to null in the meantime and, if so, break out of the loop.
228 while (queue != null && (consumed < maxConsume || maxConsume < 0)) {
229 Object msg = queue.poll();
230 if (msg == null) {
231 break;
232 }
233
234 ++consumed;
235 ctx.fireChannelRead(msg);
236 }
237
238 if (queue != null && queue.isEmpty()) {
239 queue.recycle();
240 queue = null;
241 }
242
243 unsatisfiedReads = Math.max(unsatisfiedReads - consumed, 0);
244
245 return consumed;
246 }
247
248 /**
249 * A recyclable {@link ArrayDeque}.
250 */
251 private static final class RecyclableArrayDeque extends ArrayDeque<Object> {
252
253 private static final long serialVersionUID = 0L;
254
255 /**
256 * A value of {@code 2} should be a good choice for most scenarios.
257 */
258 private static final int DEFAULT_NUM_ELEMENTS = 2;
259
260 private static final Recycler<RecyclableArrayDeque> RECYCLER =
261 new Recycler<RecyclableArrayDeque>() {
262 @Override
263 protected RecyclableArrayDeque newObject(Handle<RecyclableArrayDeque> handle) {
264 return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle);
265 }
266 };
267
268 public static RecyclableArrayDeque newInstance() {
269 return RECYCLER.get();
270 }
271
272 private final Handle<RecyclableArrayDeque> handle;
273
274 private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) {
275 super(numElements);
276 this.handle = handle;
277 }
278
279 public void recycle() {
280 clear();
281 handle.recycle(this);
282 }
283 }
284 }