View Javadoc

1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.flush;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelDuplexHandler;
20  import io.netty.channel.ChannelHandler;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelOutboundHandler;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  
26  import java.util.concurrent.Future;
27  
28  /**
29   * {@link ChannelDuplexHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()}
30   * operations (which also includes
31   * {@link Channel#writeAndFlush(Object)} / {@link Channel#writeAndFlush(Object, ChannelPromise)} and
32   * {@link ChannelHandlerContext#writeAndFlush(Object)} /
33   * {@link ChannelHandlerContext#writeAndFlush(Object, ChannelPromise)}).
34   * <p>
35   * Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is
36   * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
37   * as much as possible.
38   * <p>
39   * If a read loop is currently ongoing, {@link #flush(ChannelHandlerContext)} will not be passed on to the next
40   * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when
41   * {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
42   * If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
43   * <ul>
44   *     <li>if {@code false}, flushes are passed on to the next handler directly;</li>
45   *     <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under
46   *     high throughput, this gives the opportunity to process other flushes before the task gets executed, thus
47   *     batching multiple flushes into one.</li>
48   * </ul>
49   * If {@code explicitFlushAfterFlushes} is reached the flush will also be forwarded as well (whether while in a read
50   * loop, or while batching outside of a read loop).
51   * <p>
52   * If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
53   * <p>
54   * The {@link FlushConsolidationHandler} should be put as first {@link ChannelHandler} in the
55   * {@link ChannelPipeline} to have the best effect.
56   */
57  public class FlushConsolidationHandler extends ChannelDuplexHandler {
58      private final int explicitFlushAfterFlushes;
59      private final boolean consolidateWhenNoReadInProgress;
60      private final Runnable flushTask;
61      private int flushPendingCount;
62      private boolean readInProgress;
63      private ChannelHandlerContext ctx;
64      private Future<?> nextScheduledFlush;
65  
66      /**
67       * Create new instance which explicit flush after 256 pending flush operations latest.
68       */
69      public FlushConsolidationHandler() {
70          this(256, false);
71      }
72  
73      /**
74       * Create new instance which doesn't consolidate flushes when no read is in progress.
75       *
76       * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
77       */
78      public FlushConsolidationHandler(int explicitFlushAfterFlushes) {
79          this(explicitFlushAfterFlushes, false);
80      }
81  
82      /**
83       * Create new instance.
84       *
85       * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
86       * @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently
87       *                                        ongoing.
88       */
89      public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
90          if (explicitFlushAfterFlushes <= 0) {
91              throw new IllegalArgumentException("explicitFlushAfterFlushes: "
92                      + explicitFlushAfterFlushes + " (expected: > 0)");
93          }
94          this.explicitFlushAfterFlushes = explicitFlushAfterFlushes;
95          this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
96          this.flushTask = consolidateWhenNoReadInProgress ?
97                  new Runnable() {
98                      @Override
99                      public void run() {
100                         if (flushPendingCount > 0 && !readInProgress) {
101                             flushPendingCount = 0;
102                             ctx.flush();
103                             nextScheduledFlush = null;
104                         } // else we'll flush when the read completes
105                     }
106                 }
107                 : null;
108     }
109 
110     @Override
111     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
112         this.ctx = ctx;
113     }
114 
115     @Override
116     public void flush(ChannelHandlerContext ctx) throws Exception {
117         if (readInProgress) {
118             // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
119             // we only need to flush if we reach the explicitFlushAfterFlushes limit.
120             if (++flushPendingCount == explicitFlushAfterFlushes) {
121                 flushNow(ctx);
122             }
123         } else if (consolidateWhenNoReadInProgress) {
124             // Flush immediately if we reach the threshold, otherwise schedule
125             if (++flushPendingCount == explicitFlushAfterFlushes) {
126                 flushNow(ctx);
127             } else {
128                 scheduleFlush(ctx);
129             }
130         } else {
131             // Always flush directly
132             flushNow(ctx);
133         }
134     }
135 
136     @Override
137     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
138         // This may be the last event in the read loop, so flush now!
139         resetReadAndFlushIfNeeded(ctx);
140         ctx.fireChannelReadComplete();
141     }
142 
143     @Override
144     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
145         readInProgress = true;
146         ctx.fireChannelRead(msg);
147     }
148 
149     @Override
150     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
151         // To ensure we not miss to flush anything, do it now.
152         resetReadAndFlushIfNeeded(ctx);
153         ctx.fireExceptionCaught(cause);
154     }
155 
156     @Override
157     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
158         // Try to flush one last time if flushes are pending before disconnect the channel.
159         resetReadAndFlushIfNeeded(ctx);
160         ctx.disconnect(promise);
161     }
162 
163     @Override
164     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
165         // Try to flush one last time if flushes are pending before close the channel.
166         resetReadAndFlushIfNeeded(ctx);
167         ctx.close(promise);
168     }
169 
170     @Override
171     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
172         if (!ctx.channel().isWritable()) {
173             // The writability of the channel changed to false, so flush all consolidated flushes now to free up memory.
174             flushIfNeeded(ctx);
175         }
176         ctx.fireChannelWritabilityChanged();
177     }
178 
179     @Override
180     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
181         flushIfNeeded(ctx);
182     }
183 
184     private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) {
185         readInProgress = false;
186         flushIfNeeded(ctx);
187     }
188 
189     private void flushIfNeeded(ChannelHandlerContext ctx) {
190         if (flushPendingCount > 0) {
191             flushNow(ctx);
192         }
193     }
194 
195     private void flushNow(ChannelHandlerContext ctx) {
196         cancelScheduledFlush();
197         flushPendingCount = 0;
198         ctx.flush();
199     }
200 
201     private void scheduleFlush(final ChannelHandlerContext ctx) {
202         if (nextScheduledFlush == null) {
203             // Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
204             nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask);
205         }
206     }
207 
208     private void cancelScheduledFlush() {
209         if (nextScheduledFlush != null) {
210             nextScheduledFlush.cancel(false);
211             nextScheduledFlush = null;
212         }
213     }
214 }