1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel;
17
18 import io.netty.util.internal.ObjectUtil;
19
20 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21
22 import java.util.ArrayDeque;
23 import java.util.Queue;
24
25 /**
26 * This implementation allows to register {@link ChannelFuture} instances which will get notified once some amount of
27 * data was written and so a checkpoint was reached.
28 */
29 public final class ChannelFlushPromiseNotifier {
30
31 private long writeCounter;
32 private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
33 private final boolean tryNotify;
34
35 /**
36 * Create a new instance
37 *
38 * @param tryNotify if {@code true} the {@link ChannelPromise}s will get notified with
39 * {@link ChannelPromise#trySuccess()} and {@link ChannelPromise#tryFailure(Throwable)}.
40 * Otherwise {@link ChannelPromise#setSuccess()} and {@link ChannelPromise#setFailure(Throwable)}
41 * is used
42 */
43 public ChannelFlushPromiseNotifier(boolean tryNotify) {
44 this.tryNotify = tryNotify;
45 }
46
47 /**
48 * Create a new instance which will use {@link ChannelPromise#setSuccess()} and
49 * {@link ChannelPromise#setFailure(Throwable)} to notify the {@link ChannelPromise}s.
50 */
51 public ChannelFlushPromiseNotifier() {
52 this(false);
53 }
54
55 /**
56 * @deprecated use {@link #add(ChannelPromise, long)}
57 */
58 @Deprecated
59 public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) {
60 return add(promise, (long) pendingDataSize);
61 }
62
63 /**
64 * Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given
65 * {@code pendingDataSize} was reached.
66 */
67 public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) {
68 ObjectUtil.checkNotNull(promise, "promise");
69 checkPositiveOrZero(pendingDataSize, "pendingDataSize");
70 long checkpoint = writeCounter + pendingDataSize;
71 if (promise instanceof FlushCheckpoint) {
72 FlushCheckpoint cp = (FlushCheckpoint) promise;
73 cp.flushCheckpoint(checkpoint);
74 flushCheckpoints.add(cp);
75 } else {
76 flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, promise));
77 }
78 return this;
79 }
80 /**
81 * Increase the current write counter by the given delta
82 */
83 public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) {
84 checkPositiveOrZero(delta, "delta");
85 writeCounter += delta;
86 return this;
87 }
88
89 /**
90 * Return the current write counter of this {@link ChannelFlushPromiseNotifier}
91 */
92 public long writeCounter() {
93 return writeCounter;
94 }
95
96 /**
97 * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
98 * their pendingDatasize is smaller after the current writeCounter returned by {@link #writeCounter()}.
99 *
100 * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
101 * so not receive anymore notification.
102 */
103 public ChannelFlushPromiseNotifier notifyPromises() {
104 notifyPromises0(null);
105 return this;
106 }
107
108 /**
109 * @deprecated use {@link #notifyPromises()}
110 */
111 @Deprecated
112 public ChannelFlushPromiseNotifier notifyFlushFutures() {
113 return notifyPromises();
114 }
115
116 /**
117 * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
118 * their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}.
119 *
120 * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
121 * so not receive anymore notification.
122 *
123 * The rest of the remaining {@link ChannelFuture}s will be failed with the given {@link Throwable}.
124 *
125 * So after this operation this {@link ChannelFutureListener} is empty.
126 */
127 public ChannelFlushPromiseNotifier notifyPromises(Throwable cause) {
128 notifyPromises();
129 for (;;) {
130 FlushCheckpoint cp = flushCheckpoints.poll();
131 if (cp == null) {
132 break;
133 }
134 if (tryNotify) {
135 cp.promise().tryFailure(cause);
136 } else {
137 cp.promise().setFailure(cause);
138 }
139 }
140 return this;
141 }
142
143 /**
144 * @deprecated use {@link #notifyPromises(Throwable)}
145 */
146 @Deprecated
147 public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) {
148 return notifyPromises(cause);
149 }
150
151 /**
152 * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
153 * their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using
154 * the given cause1.
155 *
156 * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
157 * so not receive anymore notification.
158 *
159 * The rest of the remaining {@link ChannelFuture}s will be failed with the given {@link Throwable}.
160 *
161 * So after this operation this {@link ChannelFutureListener} is empty.
162 *
163 * @param cause1 the {@link Throwable} which will be used to fail all of the {@link ChannelFuture}s which
164 * pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()}
165 * @param cause2 the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s
166 */
167 public ChannelFlushPromiseNotifier notifyPromises(Throwable cause1, Throwable cause2) {
168 notifyPromises0(cause1);
169 for (;;) {
170 FlushCheckpoint cp = flushCheckpoints.poll();
171 if (cp == null) {
172 break;
173 }
174 if (tryNotify) {
175 cp.promise().tryFailure(cause2);
176 } else {
177 cp.promise().setFailure(cause2);
178 }
179 }
180 return this;
181 }
182
183 /**
184 * @deprecated use {@link #notifyPromises(Throwable, Throwable)}
185 */
186 @Deprecated
187 public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
188 return notifyPromises(cause1, cause2);
189 }
190
191 private void notifyPromises0(Throwable cause) {
192 if (flushCheckpoints.isEmpty()) {
193 writeCounter = 0;
194 return;
195 }
196
197 final long writeCounter = this.writeCounter;
198 for (;;) {
199 FlushCheckpoint cp = flushCheckpoints.peek();
200 if (cp == null) {
201 // Reset the counter if there's nothing in the notification list.
202 this.writeCounter = 0;
203 break;
204 }
205
206 if (cp.flushCheckpoint() > writeCounter) {
207 if (writeCounter > 0 && flushCheckpoints.size() == 1) {
208 this.writeCounter = 0;
209 cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter);
210 }
211 break;
212 }
213
214 flushCheckpoints.remove();
215 ChannelPromise promise = cp.promise();
216 if (cause == null) {
217 if (tryNotify) {
218 promise.trySuccess();
219 } else {
220 promise.setSuccess();
221 }
222 } else {
223 if (tryNotify) {
224 promise.tryFailure(cause);
225 } else {
226 promise.setFailure(cause);
227 }
228 }
229 }
230
231 // Avoid overflow
232 final long newWriteCounter = this.writeCounter;
233 if (newWriteCounter >= 0x8000000000L) {
234 // Reset the counter only when the counter grew pretty large
235 // so that we can reduce the cost of updating all entries in the notification list.
236 this.writeCounter = 0;
237 for (FlushCheckpoint cp: flushCheckpoints) {
238 cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter);
239 }
240 }
241 }
242
243 interface FlushCheckpoint {
244 long flushCheckpoint();
245 void flushCheckpoint(long checkpoint);
246 ChannelPromise promise();
247 }
248
249 private static class DefaultFlushCheckpoint implements FlushCheckpoint {
250 private long checkpoint;
251 private final ChannelPromise future;
252
253 DefaultFlushCheckpoint(long checkpoint, ChannelPromise future) {
254 this.checkpoint = checkpoint;
255 this.future = future;
256 }
257
258 @Override
259 public long flushCheckpoint() {
260 return checkpoint;
261 }
262
263 @Override
264 public void flushCheckpoint(long checkpoint) {
265 this.checkpoint = checkpoint;
266 }
267
268 @Override
269 public ChannelPromise promise() {
270 return future;
271 }
272 }
273 }