1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.internal.ObjectUtil;
19
20 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21
22 import java.util.ArrayDeque;
23 import java.util.Queue;
24
25
26
27
28
29 public final class ChannelFlushPromiseNotifier {
30
31 private long writeCounter;
32 private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
33 private final boolean tryNotify;
34
35
36
37
38
39
40
41
42
43 public ChannelFlushPromiseNotifier(boolean tryNotify) {
44 this.tryNotify = tryNotify;
45 }
46
47
48
49
50
51 public ChannelFlushPromiseNotifier() {
52 this(false);
53 }
54
55
56
57
58 @Deprecated
59 public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) {
60 return add(promise, (long) pendingDataSize);
61 }
62
63
64
65
66
67 public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) {
68 ObjectUtil.checkNotNull(promise, "promise");
69 checkPositiveOrZero(pendingDataSize, "pendingDataSize");
70 long checkpoint = writeCounter + pendingDataSize;
71 if (promise instanceof FlushCheckpoint) {
72 FlushCheckpoint cp = (FlushCheckpoint) promise;
73 cp.flushCheckpoint(checkpoint);
74 flushCheckpoints.add(cp);
75 } else {
76 flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, promise));
77 }
78 return this;
79 }
80
81
82
83 public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) {
84 checkPositiveOrZero(delta, "delta");
85 writeCounter += delta;
86 return this;
87 }
88
89
90
91
92 public long writeCounter() {
93 return writeCounter;
94 }
95
96
97
98
99
100
101
102
103 public ChannelFlushPromiseNotifier notifyPromises() {
104 notifyPromises0(null);
105 return this;
106 }
107
108
109
110
111 @Deprecated
112 public ChannelFlushPromiseNotifier notifyFlushFutures() {
113 return notifyPromises();
114 }
115
116
117
118
119
120
121
122
123
124
125
126
127 public ChannelFlushPromiseNotifier notifyPromises(Throwable cause) {
128 notifyPromises();
129 for (;;) {
130 FlushCheckpoint cp = flushCheckpoints.poll();
131 if (cp == null) {
132 break;
133 }
134 if (tryNotify) {
135 cp.promise().tryFailure(cause);
136 } else {
137 cp.promise().setFailure(cause);
138 }
139 }
140 return this;
141 }
142
143
144
145
146 @Deprecated
147 public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) {
148 return notifyPromises(cause);
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167 public ChannelFlushPromiseNotifier notifyPromises(Throwable cause1, Throwable cause2) {
168 notifyPromises0(cause1);
169 for (;;) {
170 FlushCheckpoint cp = flushCheckpoints.poll();
171 if (cp == null) {
172 break;
173 }
174 if (tryNotify) {
175 cp.promise().tryFailure(cause2);
176 } else {
177 cp.promise().setFailure(cause2);
178 }
179 }
180 return this;
181 }
182
183
184
185
186 @Deprecated
187 public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
188 return notifyPromises(cause1, cause2);
189 }
190
191 private void notifyPromises0(Throwable cause) {
192 if (flushCheckpoints.isEmpty()) {
193 writeCounter = 0;
194 return;
195 }
196
197 final long writeCounter = this.writeCounter;
198 for (;;) {
199 FlushCheckpoint cp = flushCheckpoints.peek();
200 if (cp == null) {
201
202 this.writeCounter = 0;
203 break;
204 }
205
206 if (cp.flushCheckpoint() > writeCounter) {
207 if (writeCounter > 0 && flushCheckpoints.size() == 1) {
208 this.writeCounter = 0;
209 cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter);
210 }
211 break;
212 }
213
214 flushCheckpoints.remove();
215 ChannelPromise promise = cp.promise();
216 if (cause == null) {
217 if (tryNotify) {
218 promise.trySuccess();
219 } else {
220 promise.setSuccess();
221 }
222 } else {
223 if (tryNotify) {
224 promise.tryFailure(cause);
225 } else {
226 promise.setFailure(cause);
227 }
228 }
229 }
230
231
232 final long newWriteCounter = this.writeCounter;
233 if (newWriteCounter >= 0x8000000000L) {
234
235
236 this.writeCounter = 0;
237 for (FlushCheckpoint cp: flushCheckpoints) {
238 cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter);
239 }
240 }
241 }
242
243 interface FlushCheckpoint {
244 long flushCheckpoint();
245 void flushCheckpoint(long checkpoint);
246 ChannelPromise promise();
247 }
248
249 private static class DefaultFlushCheckpoint implements FlushCheckpoint {
250 private long checkpoint;
251 private final ChannelPromise future;
252
253 DefaultFlushCheckpoint(long checkpoint, ChannelPromise future) {
254 this.checkpoint = checkpoint;
255 this.future = future;
256 }
257
258 @Override
259 public long flushCheckpoint() {
260 return checkpoint;
261 }
262
263 @Override
264 public void flushCheckpoint(long checkpoint) {
265 this.checkpoint = checkpoint;
266 }
267
268 @Override
269 public ChannelPromise promise() {
270 return future;
271 }
272 }
273 }