View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.flush;
17  
18  import io.netty5.channel.Channel;
19  import io.netty5.channel.ChannelHandler;
20  import io.netty5.channel.ChannelHandlerContext;
21  import io.netty5.channel.ChannelOutboundInvoker;
22  import io.netty5.channel.ChannelPipeline;
23  import io.netty5.util.concurrent.Future;
24  import io.netty5.util.internal.ObjectUtil;
25  
26  /**
27   * {@link ChannelHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()}
28   * operations (which also includes
29   * {@link Channel#writeAndFlush(Object)} and
30   * {@link ChannelOutboundInvoker#writeAndFlush(Object)}.
31   * <p>
32   * Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is
33   * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
34   * as much as possible.
35   * <p>
36   * If a read loop is currently ongoing, {@link ChannelHandler#flush(ChannelHandlerContext)} will not be passed on to
37   * the next {@link ChannelHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when
38   * {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
39   * If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
40   * <ul>
41   *     <li>if {@code false}, flushes are passed on to the next handler directly;</li>
42   *     <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under
43   *     high throughput, this gives the opportunity to process other flushes before the task gets executed, thus
44   *     batching multiple flushes into one.</li>
45   * </ul>
46   * If {@code explicitFlushAfterFlushes} is reached the flush will be forwarded as well (whether while in a read loop, or
47   * while batching outside of a read loop).
48   * <p>
49   * If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
50   * <p>
51   * The {@link FlushConsolidationHandler} should be put as first {@link ChannelHandler} in the
52   * {@link ChannelPipeline} to have the best effect.
53   */
54  public class FlushConsolidationHandler implements ChannelHandler {
55      private final int explicitFlushAfterFlushes;
56      private final boolean consolidateWhenNoReadInProgress;
57      private final Runnable flushTask;
58      private int flushPendingCount;
59      private boolean readInProgress;
60      private ChannelHandlerContext ctx;
61      private Future<?> nextScheduledFlush;
62  
63      /**
64       * The default number of flushes after which a flush will be forwarded to downstream handlers (whether while in a
65       * read loop, or while batching outside of a read loop).
66       */
67      public static final int DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES = 256;
68  
69      /**
70       * Create new instance which explicit flush after {@value DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES} pending flush
71       * operations at the latest.
72       */
73      public FlushConsolidationHandler() {
74          this(DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, false);
75      }
76  
77      /**
78       * Create new instance which doesn't consolidate flushes when no read is in progress.
79       *
80       * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
81       */
82      public FlushConsolidationHandler(int explicitFlushAfterFlushes) {
83          this(explicitFlushAfterFlushes, false);
84      }
85  
86      /**
87       * Create new instance.
88       *
89       * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
90       * @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently
91       *                                        ongoing.
92       */
93      public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
94          this.explicitFlushAfterFlushes =
95                  ObjectUtil.checkPositive(explicitFlushAfterFlushes, "explicitFlushAfterFlushes");
96          this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
97          flushTask = consolidateWhenNoReadInProgress ?
98                  () -> {
99                      if (flushPendingCount > 0 && !readInProgress) {
100                         flushPendingCount = 0;
101                         nextScheduledFlush = null;
102                         ctx.flush();
103                     } // else we'll flush when the read completes
104                 }
105                 : null;
106     }
107 
108     @Override
109     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
110         this.ctx = ctx;
111     }
112 
113     @Override
114     public void flush(ChannelHandlerContext ctx) {
115         if (readInProgress) {
116             // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
117             // we only need to flush if we reach the explicitFlushAfterFlushes limit.
118             if (++flushPendingCount == explicitFlushAfterFlushes) {
119                 flushNow(ctx);
120             }
121         } else if (consolidateWhenNoReadInProgress) {
122             // Flush immediately if we reach the threshold, otherwise schedule
123             if (++flushPendingCount == explicitFlushAfterFlushes) {
124                 flushNow(ctx);
125             } else {
126                 scheduleFlush(ctx);
127             }
128         } else {
129             // Always flush directly
130             flushNow(ctx);
131         }
132     }
133 
134     @Override
135     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
136         // This may be the last event in the read loop, so flush now!
137         resetReadAndFlushIfNeeded(ctx);
138         ctx.fireChannelReadComplete();
139     }
140 
141     @Override
142     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
143         readInProgress = true;
144         ctx.fireChannelRead(msg);
145     }
146 
147     @Override
148     public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
149         // To ensure we not miss to flush anything, do it now.
150         resetReadAndFlushIfNeeded(ctx);
151         ctx.fireChannelExceptionCaught(cause);
152     }
153 
154     @Override
155     public Future<Void> disconnect(ChannelHandlerContext ctx) {
156         // Try to flush one last time if flushes are pending before disconnect the channel.
157         resetReadAndFlushIfNeeded(ctx);
158         return ctx.disconnect();
159     }
160 
161     @Override
162     public Future<Void> close(ChannelHandlerContext ctx) {
163         // Try to flush one last time if flushes are pending before close the channel.
164         resetReadAndFlushIfNeeded(ctx);
165         return ctx.close();
166     }
167 
168     @Override
169     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
170         if (!ctx.channel().isWritable()) {
171             // The writability of the channel changed to false, so flush all consolidated flushes now to free up memory.
172             flushIfNeeded(ctx);
173         }
174         ctx.fireChannelWritabilityChanged();
175     }
176 
177     @Override
178     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
179         flushIfNeeded(ctx);
180     }
181 
182     private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) {
183         readInProgress = false;
184         flushIfNeeded(ctx);
185     }
186 
187     private void flushIfNeeded(ChannelHandlerContext ctx) {
188         if (flushPendingCount > 0) {
189             flushNow(ctx);
190         }
191     }
192 
193     private void flushNow(ChannelHandlerContext ctx) {
194         cancelScheduledFlush();
195         flushPendingCount = 0;
196         ctx.flush();
197     }
198 
199     private void scheduleFlush(final ChannelHandlerContext ctx) {
200         if (nextScheduledFlush == null) {
201             // Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
202             nextScheduledFlush = ctx.channel().executor().submit(flushTask);
203         }
204     }
205 
206     private void cancelScheduledFlush() {
207         if (nextScheduledFlush != null) {
208             nextScheduledFlush.cancel();
209             nextScheduledFlush = null;
210         }
211     }
212 }