View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.flush;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelDuplexHandler;
20  import io.netty.channel.ChannelHandler;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelOutboundHandler;
23  import io.netty.channel.ChannelOutboundInvoker;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  
27  import java.util.concurrent.Future;
28  
29  /**
30   * {@link ChannelDuplexHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()}
31   * operations (which also includes
32   * {@link Channel#writeAndFlush(Object)} / {@link Channel#writeAndFlush(Object, ChannelPromise)} and
33   * {@link ChannelOutboundInvoker#writeAndFlush(Object)} /
34   * {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}).
35   * <p>
36   * Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is
37   * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
38   * as much as possible.
39   * <p>
40   * If a read loop is currently ongoing, {@link #flush(ChannelHandlerContext)} will not be passed on to the next
41   * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when
42   * {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
43   * If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
44   * <ul>
45   *     <li>if {@code false}, flushes are passed on to the next handler directly;</li>
46   *     <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under
47   *     high throughput, this gives the opportunity to process other flushes before the task gets executed, thus
48   *     batching multiple flushes into one.</li>
49   * </ul>
50   * If {@code explicitFlushAfterFlushes} is reached the flush will also be forwarded as well (whether while in a read
51   * loop, or while batching outside of a read loop).
52   * <p>
53   * If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
54   * <p>
55   * The {@link FlushConsolidationHandler} should be put as first {@link ChannelHandler} in the
56   * {@link ChannelPipeline} to have the best effect.
57   */
58  public class FlushConsolidationHandler extends ChannelDuplexHandler {
59      private final int explicitFlushAfterFlushes;
60      private final boolean consolidateWhenNoReadInProgress;
61      private final Runnable flushTask;
62      private int flushPendingCount;
63      private boolean readInProgress;
64      private ChannelHandlerContext ctx;
65      private Future<?> nextScheduledFlush;
66  
67      /**
68       * Create new instance which explicit flush after 256 pending flush operations latest.
69       */
70      public FlushConsolidationHandler() {
71          this(256, false);
72      }
73  
74      /**
75       * Create new instance which doesn't consolidate flushes when no read is in progress.
76       *
77       * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
78       */
79      public FlushConsolidationHandler(int explicitFlushAfterFlushes) {
80          this(explicitFlushAfterFlushes, false);
81      }
82  
83      /**
84       * Create new instance.
85       *
86       * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
87       * @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently
88       *                                        ongoing.
89       */
90      public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
91          if (explicitFlushAfterFlushes <= 0) {
92              throw new IllegalArgumentException("explicitFlushAfterFlushes: "
93                      + explicitFlushAfterFlushes + " (expected: > 0)");
94          }
95          this.explicitFlushAfterFlushes = explicitFlushAfterFlushes;
96          this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
97          flushTask = consolidateWhenNoReadInProgress ?
98                  new Runnable() {
99                      @Override
100                     public void run() {
101                         if (flushPendingCount > 0 && !readInProgress) {
102                             flushPendingCount = 0;
103                             ctx.flush();
104                             nextScheduledFlush = null;
105                         } // else we'll flush when the read completes
106                     }
107                 }
108                 : null;
109     }
110 
111     @Override
112     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
113         this.ctx = ctx;
114     }
115 
116     @Override
117     public void flush(ChannelHandlerContext ctx) throws Exception {
118         if (readInProgress) {
119             // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
120             // we only need to flush if we reach the explicitFlushAfterFlushes limit.
121             if (++flushPendingCount == explicitFlushAfterFlushes) {
122                 flushNow(ctx);
123             }
124         } else if (consolidateWhenNoReadInProgress) {
125             // Flush immediately if we reach the threshold, otherwise schedule
126             if (++flushPendingCount == explicitFlushAfterFlushes) {
127                 flushNow(ctx);
128             } else {
129                 scheduleFlush(ctx);
130             }
131         } else {
132             // Always flush directly
133             flushNow(ctx);
134         }
135     }
136 
137     @Override
138     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
139         // This may be the last event in the read loop, so flush now!
140         resetReadAndFlushIfNeeded(ctx);
141         ctx.fireChannelReadComplete();
142     }
143 
144     @Override
145     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
146         readInProgress = true;
147         ctx.fireChannelRead(msg);
148     }
149 
150     @Override
151     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
152         // To ensure we not miss to flush anything, do it now.
153         resetReadAndFlushIfNeeded(ctx);
154         ctx.fireExceptionCaught(cause);
155     }
156 
157     @Override
158     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
159         // Try to flush one last time if flushes are pending before disconnect the channel.
160         resetReadAndFlushIfNeeded(ctx);
161         ctx.disconnect(promise);
162     }
163 
164     @Override
165     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
166         // Try to flush one last time if flushes are pending before close the channel.
167         resetReadAndFlushIfNeeded(ctx);
168         ctx.close(promise);
169     }
170 
171     @Override
172     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
173         if (!ctx.channel().isWritable()) {
174             // The writability of the channel changed to false, so flush all consolidated flushes now to free up memory.
175             flushIfNeeded(ctx);
176         }
177         ctx.fireChannelWritabilityChanged();
178     }
179 
180     @Override
181     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
182         flushIfNeeded(ctx);
183     }
184 
185     private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) {
186         readInProgress = false;
187         flushIfNeeded(ctx);
188     }
189 
190     private void flushIfNeeded(ChannelHandlerContext ctx) {
191         if (flushPendingCount > 0) {
192             flushNow(ctx);
193         }
194     }
195 
196     private void flushNow(ChannelHandlerContext ctx) {
197         cancelScheduledFlush();
198         flushPendingCount = 0;
199         ctx.flush();
200     }
201 
202     private void scheduleFlush(final ChannelHandlerContext ctx) {
203         if (nextScheduledFlush == null) {
204             // Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
205             nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask);
206         }
207     }
208 
209     private void cancelScheduledFlush() {
210         if (nextScheduledFlush != null) {
211             nextScheduledFlush.cancel(false);
212             nextScheduledFlush = null;
213         }
214     }
215 }