1 /*
2 * Copyright 2016 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.handler.flush;
17
18 import io.netty5.channel.Channel;
19 import io.netty5.channel.ChannelHandler;
20 import io.netty5.channel.ChannelHandlerContext;
21 import io.netty5.channel.ChannelOutboundInvoker;
22 import io.netty5.channel.ChannelPipeline;
23 import io.netty5.util.concurrent.Future;
24 import io.netty5.util.internal.ObjectUtil;
25
26 /**
27 * {@link ChannelHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()}
28 * operations (which also includes
29 * {@link Channel#writeAndFlush(Object)} and
30 * {@link ChannelOutboundInvoker#writeAndFlush(Object)}.
31 * <p>
32 * Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is
33 * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
34 * as much as possible.
35 * <p>
36 * If a read loop is currently ongoing, {@link ChannelHandler#flush(ChannelHandlerContext)} will not be passed on to
37 * the next {@link ChannelHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when
38 * {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
39 * If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
40 * <ul>
41 * <li>if {@code false}, flushes are passed on to the next handler directly;</li>
42 * <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under
43 * high throughput, this gives the opportunity to process other flushes before the task gets executed, thus
44 * batching multiple flushes into one.</li>
45 * </ul>
46 * If {@code explicitFlushAfterFlushes} is reached the flush will be forwarded as well (whether while in a read loop, or
47 * while batching outside of a read loop).
48 * <p>
49 * If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
50 * <p>
51 * The {@link FlushConsolidationHandler} should be put as first {@link ChannelHandler} in the
52 * {@link ChannelPipeline} to have the best effect.
53 */
54 public class FlushConsolidationHandler implements ChannelHandler {
55 private final int explicitFlushAfterFlushes;
56 private final boolean consolidateWhenNoReadInProgress;
57 private final Runnable flushTask;
58 private int flushPendingCount;
59 private boolean readInProgress;
60 private ChannelHandlerContext ctx;
61 private Future<?> nextScheduledFlush;
62
63 /**
64 * The default number of flushes after which a flush will be forwarded to downstream handlers (whether while in a
65 * read loop, or while batching outside of a read loop).
66 */
67 public static final int DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES = 256;
68
69 /**
70 * Create new instance which explicit flush after {@value DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES} pending flush
71 * operations at the latest.
72 */
73 public FlushConsolidationHandler() {
74 this(DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, false);
75 }
76
77 /**
78 * Create new instance which doesn't consolidate flushes when no read is in progress.
79 *
80 * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
81 */
82 public FlushConsolidationHandler(int explicitFlushAfterFlushes) {
83 this(explicitFlushAfterFlushes, false);
84 }
85
86 /**
87 * Create new instance.
88 *
89 * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
90 * @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently
91 * ongoing.
92 */
93 public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
94 this.explicitFlushAfterFlushes =
95 ObjectUtil.checkPositive(explicitFlushAfterFlushes, "explicitFlushAfterFlushes");
96 this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
97 flushTask = consolidateWhenNoReadInProgress ?
98 () -> {
99 if (flushPendingCount > 0 && !readInProgress) {
100 flushPendingCount = 0;
101 nextScheduledFlush = null;
102 ctx.flush();
103 } // else we'll flush when the read completes
104 }
105 : null;
106 }
107
108 @Override
109 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
110 this.ctx = ctx;
111 }
112
113 @Override
114 public void flush(ChannelHandlerContext ctx) {
115 if (readInProgress) {
116 // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
117 // we only need to flush if we reach the explicitFlushAfterFlushes limit.
118 if (++flushPendingCount == explicitFlushAfterFlushes) {
119 flushNow(ctx);
120 }
121 } else if (consolidateWhenNoReadInProgress) {
122 // Flush immediately if we reach the threshold, otherwise schedule
123 if (++flushPendingCount == explicitFlushAfterFlushes) {
124 flushNow(ctx);
125 } else {
126 scheduleFlush(ctx);
127 }
128 } else {
129 // Always flush directly
130 flushNow(ctx);
131 }
132 }
133
134 @Override
135 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
136 // This may be the last event in the read loop, so flush now!
137 resetReadAndFlushIfNeeded(ctx);
138 ctx.fireChannelReadComplete();
139 }
140
141 @Override
142 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
143 readInProgress = true;
144 ctx.fireChannelRead(msg);
145 }
146
147 @Override
148 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
149 // To ensure we not miss to flush anything, do it now.
150 resetReadAndFlushIfNeeded(ctx);
151 ctx.fireChannelExceptionCaught(cause);
152 }
153
154 @Override
155 public Future<Void> disconnect(ChannelHandlerContext ctx) {
156 // Try to flush one last time if flushes are pending before disconnect the channel.
157 resetReadAndFlushIfNeeded(ctx);
158 return ctx.disconnect();
159 }
160
161 @Override
162 public Future<Void> close(ChannelHandlerContext ctx) {
163 // Try to flush one last time if flushes are pending before close the channel.
164 resetReadAndFlushIfNeeded(ctx);
165 return ctx.close();
166 }
167
168 @Override
169 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
170 if (!ctx.channel().isWritable()) {
171 // The writability of the channel changed to false, so flush all consolidated flushes now to free up memory.
172 flushIfNeeded(ctx);
173 }
174 ctx.fireChannelWritabilityChanged();
175 }
176
177 @Override
178 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
179 flushIfNeeded(ctx);
180 }
181
182 private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) {
183 readInProgress = false;
184 flushIfNeeded(ctx);
185 }
186
187 private void flushIfNeeded(ChannelHandlerContext ctx) {
188 if (flushPendingCount > 0) {
189 flushNow(ctx);
190 }
191 }
192
193 private void flushNow(ChannelHandlerContext ctx) {
194 cancelScheduledFlush();
195 flushPendingCount = 0;
196 ctx.flush();
197 }
198
199 private void scheduleFlush(final ChannelHandlerContext ctx) {
200 if (nextScheduledFlush == null) {
201 // Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
202 nextScheduledFlush = ctx.channel().executor().submit(flushTask);
203 }
204 }
205
206 private void cancelScheduledFlush() {
207 if (nextScheduledFlush != null) {
208 nextScheduledFlush.cancel();
209 nextScheduledFlush = null;
210 }
211 }
212 }