1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.flush;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelDuplexHandler;
20 import io.netty.channel.ChannelHandler;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelOutboundHandler;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25
26 import java.util.concurrent.Future;
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 public class FlushConsolidationHandler extends ChannelDuplexHandler {
58 private final int explicitFlushAfterFlushes;
59 private final boolean consolidateWhenNoReadInProgress;
60 private final Runnable flushTask;
61 private int flushPendingCount;
62 private boolean readInProgress;
63 private ChannelHandlerContext ctx;
64 private Future<?> nextScheduledFlush;
65
66
67
68
69 public FlushConsolidationHandler() {
70 this(256, false);
71 }
72
73
74
75
76
77
78 public FlushConsolidationHandler(int explicitFlushAfterFlushes) {
79 this(explicitFlushAfterFlushes, false);
80 }
81
82
83
84
85
86
87
88
89 public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
90 if (explicitFlushAfterFlushes <= 0) {
91 throw new IllegalArgumentException("explicitFlushAfterFlushes: "
92 + explicitFlushAfterFlushes + " (expected: > 0)");
93 }
94 this.explicitFlushAfterFlushes = explicitFlushAfterFlushes;
95 this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
96 this.flushTask = consolidateWhenNoReadInProgress ?
97 new Runnable() {
98 @Override
99 public void run() {
100 if (flushPendingCount > 0 && !readInProgress) {
101 flushPendingCount = 0;
102 ctx.flush();
103 nextScheduledFlush = null;
104 }
105 }
106 }
107 : null;
108 }
109
110 @Override
111 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
112 this.ctx = ctx;
113 }
114
115 @Override
116 public void flush(ChannelHandlerContext ctx) throws Exception {
117 if (readInProgress) {
118
119
120 if (++flushPendingCount == explicitFlushAfterFlushes) {
121 flushNow(ctx);
122 }
123 } else if (consolidateWhenNoReadInProgress) {
124
125 if (++flushPendingCount == explicitFlushAfterFlushes) {
126 flushNow(ctx);
127 } else {
128 scheduleFlush(ctx);
129 }
130 } else {
131
132 flushNow(ctx);
133 }
134 }
135
136 @Override
137 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
138
139 resetReadAndFlushIfNeeded(ctx);
140 ctx.fireChannelReadComplete();
141 }
142
143 @Override
144 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
145 readInProgress = true;
146 ctx.fireChannelRead(msg);
147 }
148
149 @Override
150 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
151
152 resetReadAndFlushIfNeeded(ctx);
153 ctx.fireExceptionCaught(cause);
154 }
155
156 @Override
157 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
158
159 resetReadAndFlushIfNeeded(ctx);
160 ctx.disconnect(promise);
161 }
162
163 @Override
164 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
165
166 resetReadAndFlushIfNeeded(ctx);
167 ctx.close(promise);
168 }
169
170 @Override
171 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
172 if (!ctx.channel().isWritable()) {
173
174 flushIfNeeded(ctx);
175 }
176 ctx.fireChannelWritabilityChanged();
177 }
178
179 @Override
180 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
181 flushIfNeeded(ctx);
182 }
183
184 private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) {
185 readInProgress = false;
186 flushIfNeeded(ctx);
187 }
188
189 private void flushIfNeeded(ChannelHandlerContext ctx) {
190 if (flushPendingCount > 0) {
191 flushNow(ctx);
192 }
193 }
194
195 private void flushNow(ChannelHandlerContext ctx) {
196 cancelScheduledFlush();
197 flushPendingCount = 0;
198 ctx.flush();
199 }
200
201 private void scheduleFlush(final ChannelHandlerContext ctx) {
202 if (nextScheduledFlush == null) {
203
204 nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask);
205 }
206 }
207
208 private void cancelScheduledFlush() {
209 if (nextScheduledFlush != null) {
210 nextScheduledFlush.cancel(false);
211 nextScheduledFlush = null;
212 }
213 }
214 }