1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import java.util.ArrayDeque;
19 import java.util.Queue;
20
21
22
23
24
25 public final class ChannelFlushPromiseNotifier {
26
27 private long writeCounter;
28 private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
29 private final boolean tryNotify;
30
31
32
33
34
35
36
37
38
39 public ChannelFlushPromiseNotifier(boolean tryNotify) {
40 this.tryNotify = tryNotify;
41 }
42
43
44
45
46
47 public ChannelFlushPromiseNotifier() {
48 this(false);
49 }
50
51
52
53
54 @Deprecated
55 public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) {
56 return add(promise, (long) pendingDataSize);
57 }
58
59
60
61
62
63 public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) {
64 if (promise == null) {
65 throw new NullPointerException("promise");
66 }
67 if (pendingDataSize < 0) {
68 throw new IllegalArgumentException("pendingDataSize must be >= 0 but was " + pendingDataSize);
69 }
70 long checkpoint = writeCounter + pendingDataSize;
71 if (promise instanceof FlushCheckpoint) {
72 FlushCheckpoint cp = (FlushCheckpoint) promise;
73 cp.flushCheckpoint(checkpoint);
74 flushCheckpoints.add(cp);
75 } else {
76 flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, promise));
77 }
78 return this;
79 }
80
81
82
83 public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) {
84 if (delta < 0) {
85 throw new IllegalArgumentException("delta must be >= 0 but was " + delta);
86 }
87 writeCounter += delta;
88 return this;
89 }
90
91
92
93
94 public long writeCounter() {
95 return writeCounter;
96 }
97
98
99
100
101
102
103
104
105 public ChannelFlushPromiseNotifier notifyPromises() {
106 notifyPromises0(null);
107 return this;
108 }
109
110
111
112
113 @Deprecated
114 public ChannelFlushPromiseNotifier notifyFlushFutures() {
115 return notifyPromises();
116 }
117
118
119
120
121
122
123
124
125
126
127
128
129 public ChannelFlushPromiseNotifier notifyPromises(Throwable cause) {
130 notifyPromises();
131 for (;;) {
132 FlushCheckpoint cp = flushCheckpoints.poll();
133 if (cp == null) {
134 break;
135 }
136 if (tryNotify) {
137 cp.promise().tryFailure(cause);
138 } else {
139 cp.promise().setFailure(cause);
140 }
141 }
142 return this;
143 }
144
145
146
147
148 @Deprecated
149 public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) {
150 return notifyPromises(cause);
151 }
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 public ChannelFlushPromiseNotifier notifyPromises(Throwable cause1, Throwable cause2) {
170 notifyPromises0(cause1);
171 for (;;) {
172 FlushCheckpoint cp = flushCheckpoints.poll();
173 if (cp == null) {
174 break;
175 }
176 if (tryNotify) {
177 cp.promise().tryFailure(cause2);
178 } else {
179 cp.promise().setFailure(cause2);
180 }
181 }
182 return this;
183 }
184
185
186
187
188 @Deprecated
189 public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
190 return notifyPromises(cause1, cause2);
191 }
192
193 private void notifyPromises0(Throwable cause) {
194 if (flushCheckpoints.isEmpty()) {
195 writeCounter = 0;
196 return;
197 }
198
199 final long writeCounter = this.writeCounter;
200 for (;;) {
201 FlushCheckpoint cp = flushCheckpoints.peek();
202 if (cp == null) {
203
204 this.writeCounter = 0;
205 break;
206 }
207
208 if (cp.flushCheckpoint() > writeCounter) {
209 if (writeCounter > 0 && flushCheckpoints.size() == 1) {
210 this.writeCounter = 0;
211 cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter);
212 }
213 break;
214 }
215
216 flushCheckpoints.remove();
217 ChannelPromise promise = cp.promise();
218 if (cause == null) {
219 if (tryNotify) {
220 promise.trySuccess();
221 } else {
222 promise.setSuccess();
223 }
224 } else {
225 if (tryNotify) {
226 promise.tryFailure(cause);
227 } else {
228 promise.setFailure(cause);
229 }
230 }
231 }
232
233
234 final long newWriteCounter = this.writeCounter;
235 if (newWriteCounter >= 0x8000000000L) {
236
237
238 this.writeCounter = 0;
239 for (FlushCheckpoint cp: flushCheckpoints) {
240 cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter);
241 }
242 }
243 }
244
245 interface FlushCheckpoint {
246 long flushCheckpoint();
247 void flushCheckpoint(long checkpoint);
248 ChannelPromise promise();
249 }
250
251 private static class DefaultFlushCheckpoint implements FlushCheckpoint {
252 private long checkpoint;
253 private final ChannelPromise future;
254
255 DefaultFlushCheckpoint(long checkpoint, ChannelPromise future) {
256 this.checkpoint = checkpoint;
257 this.future = future;
258 }
259
260 @Override
261 public long flushCheckpoint() {
262 return checkpoint;
263 }
264
265 @Override
266 public void flushCheckpoint(long checkpoint) {
267 this.checkpoint = checkpoint;
268 }
269
270 @Override
271 public ChannelPromise promise() {
272 return future;
273 }
274 }
275 }