View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.flush;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelDuplexHandler;
20  import io.netty.channel.ChannelHandler;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelOutboundHandler;
23  import io.netty.channel.ChannelOutboundInvoker;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.util.internal.ObjectUtil;
27  
28  import java.util.concurrent.Future;
29  
30  /**
31   * {@link ChannelDuplexHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()}
32   * operations (which also includes
33   * {@link Channel#writeAndFlush(Object)} / {@link Channel#writeAndFlush(Object, ChannelPromise)} and
34   * {@link ChannelOutboundInvoker#writeAndFlush(Object)} /
35   * {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}).
36   * <p>
37   * Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is
38   * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
39   * as much as possible.
40   * <p>
41   * If a read loop is currently ongoing, {@link #flush(ChannelHandlerContext)} will not be passed on to the next
42   * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when
43   * {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
44   * If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
45   * <ul>
46   *     <li>if {@code false}, flushes are passed on to the next handler directly;</li>
47   *     <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under
48   *     high throughput, this gives the opportunity to process other flushes before the task gets executed, thus
49   *     batching multiple flushes into one.</li>
50   * </ul>
51   * If {@code explicitFlushAfterFlushes} is reached the flush will be forwarded as well (whether while in a read loop, or
52   * while batching outside of a read loop).
53   * <p>
54   * If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
55   * <p>
56   * The {@link FlushConsolidationHandler} should be put as first {@link ChannelHandler} in the
57   * {@link ChannelPipeline} to have the best effect.
58   */
59  public class FlushConsolidationHandler extends ChannelDuplexHandler {
60      private final int explicitFlushAfterFlushes;
61      private final boolean consolidateWhenNoReadInProgress;
62      private final Runnable flushTask;
63      private int flushPendingCount;
64      private boolean readInProgress;
65      private ChannelHandlerContext ctx;
66      private Future<?> nextScheduledFlush;
67  
68      /**
69       * The default number of flushes after which a flush will be forwarded to downstream handlers (whether while in a
70       * read loop, or while batching outside of a read loop).
71       */
72      public static final int DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES = 256;
73  
74      /**
75       * Create new instance which explicit flush after {@value DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES} pending flush
76       * operations at the latest.
77       */
78      public FlushConsolidationHandler() {
79          this(DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, false);
80      }
81  
82      /**
83       * Create new instance which doesn't consolidate flushes when no read is in progress.
84       *
85       * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
86       */
87      public FlushConsolidationHandler(int explicitFlushAfterFlushes) {
88          this(explicitFlushAfterFlushes, false);
89      }
90  
91      /**
92       * Create new instance.
93       *
94       * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
95       * @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently
96       *                                        ongoing.
97       */
98      public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
99          this.explicitFlushAfterFlushes =
100                 ObjectUtil.checkPositive(explicitFlushAfterFlushes, "explicitFlushAfterFlushes");
101         this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
102         this.flushTask = consolidateWhenNoReadInProgress ?
103                 new Runnable() {
104                     @Override
105                     public void run() {
106                         if (flushPendingCount > 0 && !readInProgress) {
107                             flushPendingCount = 0;
108                             nextScheduledFlush = null;
109                             ctx.flush();
110                         } // else we'll flush when the read completes
111                     }
112                 }
113                 : null;
114     }
115 
116     @Override
117     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
118         this.ctx = ctx;
119     }
120 
121     @Override
122     public void flush(ChannelHandlerContext ctx) throws Exception {
123         if (readInProgress) {
124             // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
125             // we only need to flush if we reach the explicitFlushAfterFlushes limit.
126             if (++flushPendingCount == explicitFlushAfterFlushes) {
127                 flushNow(ctx);
128             }
129         } else if (consolidateWhenNoReadInProgress) {
130             // Flush immediately if we reach the threshold, otherwise schedule
131             if (++flushPendingCount == explicitFlushAfterFlushes) {
132                 flushNow(ctx);
133             } else {
134                 scheduleFlush(ctx);
135             }
136         } else {
137             // Always flush directly
138             flushNow(ctx);
139         }
140     }
141 
142     @Override
143     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
144         // This may be the last event in the read loop, so flush now!
145         resetReadAndFlushIfNeeded(ctx);
146         ctx.fireChannelReadComplete();
147     }
148 
149     @Override
150     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
151         readInProgress = true;
152         ctx.fireChannelRead(msg);
153     }
154 
155     @Override
156     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
157         // To ensure we not miss to flush anything, do it now.
158         resetReadAndFlushIfNeeded(ctx);
159         ctx.fireExceptionCaught(cause);
160     }
161 
162     @Override
163     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
164         // Try to flush one last time if flushes are pending before disconnect the channel.
165         resetReadAndFlushIfNeeded(ctx);
166         ctx.disconnect(promise);
167     }
168 
169     @Override
170     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
171         // Try to flush one last time if flushes are pending before close the channel.
172         resetReadAndFlushIfNeeded(ctx);
173         ctx.close(promise);
174     }
175 
176     @Override
177     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
178         if (!ctx.channel().isWritable()) {
179             // The writability of the channel changed to false, so flush all consolidated flushes now to free up memory.
180             flushIfNeeded(ctx);
181         }
182         ctx.fireChannelWritabilityChanged();
183     }
184 
185     @Override
186     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
187         flushIfNeeded(ctx);
188     }
189 
190     private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) {
191         readInProgress = false;
192         flushIfNeeded(ctx);
193     }
194 
195     private void flushIfNeeded(ChannelHandlerContext ctx) {
196         if (flushPendingCount > 0) {
197             flushNow(ctx);
198         }
199     }
200 
201     private void flushNow(ChannelHandlerContext ctx) {
202         cancelScheduledFlush();
203         flushPendingCount = 0;
204         ctx.flush();
205     }
206 
207     private void scheduleFlush(final ChannelHandlerContext ctx) {
208         if (nextScheduledFlush == null) {
209             // Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
210             nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask);
211         }
212     }
213 
214     private void cancelScheduledFlush() {
215         if (nextScheduledFlush != null) {
216             nextScheduledFlush.cancel(false);
217             nextScheduledFlush = null;
218         }
219     }
220 }