1 /*
2 * Copyright 2016 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.flush;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelDuplexHandler;
20 import io.netty.channel.ChannelHandler;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelOutboundHandler;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25
26 import java.util.concurrent.Future;
27
28 /**
29 * {@link ChannelDuplexHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()}
30 * operations (which also includes
31 * {@link Channel#writeAndFlush(Object)} / {@link Channel#writeAndFlush(Object, ChannelPromise)} and
32 * {@link ChannelHandlerContext#writeAndFlush(Object)} /
33 * {@link ChannelHandlerContext#writeAndFlush(Object, ChannelPromise)}).
34 * <p>
35 * Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is
36 * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
37 * as much as possible.
38 * <p>
39 * If a read loop is currently ongoing, {@link #flush(ChannelHandlerContext)} will not be passed on to the next
40 * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when
41 * {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
42 * If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
43 * <ul>
44 * <li>if {@code false}, flushes are passed on to the next handler directly;</li>
45 * <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under
46 * high throughput, this gives the opportunity to process other flushes before the task gets executed, thus
47 * batching multiple flushes into one.</li>
48 * </ul>
49 * If {@code explicitFlushAfterFlushes} is reached the flush will also be forwarded as well (whether while in a read
50 * loop, or while batching outside of a read loop).
51 * <p>
52 * If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
53 * <p>
54 * The {@link FlushConsolidationHandler} should be put as first {@link ChannelHandler} in the
55 * {@link ChannelPipeline} to have the best effect.
56 */
57 public class FlushConsolidationHandler extends ChannelDuplexHandler {
58 private final int explicitFlushAfterFlushes;
59 private final boolean consolidateWhenNoReadInProgress;
60 private final Runnable flushTask;
61 private int flushPendingCount;
62 private boolean readInProgress;
63 private ChannelHandlerContext ctx;
64 private Future<?> nextScheduledFlush;
65
66 /**
67 * Create new instance which explicit flush after 256 pending flush operations latest.
68 */
69 public FlushConsolidationHandler() {
70 this(256, false);
71 }
72
73 /**
74 * Create new instance which doesn't consolidate flushes when no read is in progress.
75 *
76 * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
77 */
78 public FlushConsolidationHandler(int explicitFlushAfterFlushes) {
79 this(explicitFlushAfterFlushes, false);
80 }
81
82 /**
83 * Create new instance.
84 *
85 * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
86 * @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently
87 * ongoing.
88 */
89 public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
90 if (explicitFlushAfterFlushes <= 0) {
91 throw new IllegalArgumentException("explicitFlushAfterFlushes: "
92 + explicitFlushAfterFlushes + " (expected: > 0)");
93 }
94 this.explicitFlushAfterFlushes = explicitFlushAfterFlushes;
95 this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
96 this.flushTask = consolidateWhenNoReadInProgress ?
97 new Runnable() {
98 @Override
99 public void run() {
100 if (flushPendingCount > 0 && !readInProgress) {
101 flushPendingCount = 0;
102 ctx.flush();
103 nextScheduledFlush = null;
104 } // else we'll flush when the read completes
105 }
106 }
107 : null;
108 }
109
110 @Override
111 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
112 this.ctx = ctx;
113 }
114
115 @Override
116 public void flush(ChannelHandlerContext ctx) throws Exception {
117 if (readInProgress) {
118 // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
119 // we only need to flush if we reach the explicitFlushAfterFlushes limit.
120 if (++flushPendingCount == explicitFlushAfterFlushes) {
121 flushNow(ctx);
122 }
123 } else if (consolidateWhenNoReadInProgress) {
124 // Flush immediately if we reach the threshold, otherwise schedule
125 if (++flushPendingCount == explicitFlushAfterFlushes) {
126 flushNow(ctx);
127 } else {
128 scheduleFlush(ctx);
129 }
130 } else {
131 // Always flush directly
132 flushNow(ctx);
133 }
134 }
135
136 @Override
137 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
138 // This may be the last event in the read loop, so flush now!
139 resetReadAndFlushIfNeeded(ctx);
140 ctx.fireChannelReadComplete();
141 }
142
143 @Override
144 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
145 readInProgress = true;
146 ctx.fireChannelRead(msg);
147 }
148
149 @Override
150 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
151 // To ensure we not miss to flush anything, do it now.
152 resetReadAndFlushIfNeeded(ctx);
153 ctx.fireExceptionCaught(cause);
154 }
155
156 @Override
157 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
158 // Try to flush one last time if flushes are pending before disconnect the channel.
159 resetReadAndFlushIfNeeded(ctx);
160 ctx.disconnect(promise);
161 }
162
163 @Override
164 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
165 // Try to flush one last time if flushes are pending before close the channel.
166 resetReadAndFlushIfNeeded(ctx);
167 ctx.close(promise);
168 }
169
170 @Override
171 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
172 if (!ctx.channel().isWritable()) {
173 // The writability of the channel changed to false, so flush all consolidated flushes now to free up memory.
174 flushIfNeeded(ctx);
175 }
176 ctx.fireChannelWritabilityChanged();
177 }
178
179 @Override
180 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
181 flushIfNeeded(ctx);
182 }
183
184 private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) {
185 readInProgress = false;
186 flushIfNeeded(ctx);
187 }
188
189 private void flushIfNeeded(ChannelHandlerContext ctx) {
190 if (flushPendingCount > 0) {
191 flushNow(ctx);
192 }
193 }
194
195 private void flushNow(ChannelHandlerContext ctx) {
196 cancelScheduledFlush();
197 flushPendingCount = 0;
198 ctx.flush();
199 }
200
201 private void scheduleFlush(final ChannelHandlerContext ctx) {
202 if (nextScheduledFlush == null) {
203 // Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
204 nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask);
205 }
206 }
207
208 private void cancelScheduledFlush() {
209 if (nextScheduledFlush != null) {
210 nextScheduledFlush.cancel(false);
211 nextScheduledFlush = null;
212 }
213 }
214 }