View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.internal.ObjectUtil;
19  
20  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21  
22  import java.util.ArrayDeque;
23  import java.util.Queue;
24  
25  /**
26   * This implementation allows to register {@link ChannelFuture} instances which will get notified once some amount of
27   * data was written and so a checkpoint was reached.
28   */
29  public final class ChannelFlushPromiseNotifier {
30  
31      private long writeCounter;
32      private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
33      private final boolean tryNotify;
34  
35      /**
36       * Create a new instance
37       *
38       * @param tryNotify if {@code true} the {@link ChannelPromise}s will get notified with
39       *                  {@link ChannelPromise#trySuccess()} and {@link ChannelPromise#tryFailure(Throwable)}.
40       *                  Otherwise {@link ChannelPromise#setSuccess()} and {@link ChannelPromise#setFailure(Throwable)}
41       *                  is used
42       */
43      public ChannelFlushPromiseNotifier(boolean tryNotify) {
44          this.tryNotify = tryNotify;
45      }
46  
47      /**
48       * Create a new instance which will use {@link ChannelPromise#setSuccess()} and
49       * {@link ChannelPromise#setFailure(Throwable)} to notify the {@link ChannelPromise}s.
50       */
51      public ChannelFlushPromiseNotifier() {
52          this(false);
53      }
54  
55      /**
56       * @deprecated use {@link #add(ChannelPromise, long)}
57       */
58      @Deprecated
59      public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) {
60          return add(promise, (long) pendingDataSize);
61      }
62  
63      /**
64       * Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given
65       * {@code pendingDataSize} was reached.
66       */
67      public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) {
68          ObjectUtil.checkNotNull(promise, "promise");
69          checkPositiveOrZero(pendingDataSize, "pendingDataSize");
70          long checkpoint = writeCounter + pendingDataSize;
71          if (promise instanceof FlushCheckpoint) {
72              FlushCheckpoint cp = (FlushCheckpoint) promise;
73              cp.flushCheckpoint(checkpoint);
74              flushCheckpoints.add(cp);
75          } else {
76              flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, promise));
77          }
78          return this;
79      }
80      /**
81       * Increase the current write counter by the given delta
82       */
83      public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) {
84          checkPositiveOrZero(delta, "delta");
85          writeCounter += delta;
86          return this;
87      }
88  
89      /**
90       * Return the current write counter of this {@link ChannelFlushPromiseNotifier}
91       */
92      public long writeCounter() {
93          return writeCounter;
94      }
95  
96      /**
97       * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
98       * their pendingDatasize is smaller after the current writeCounter returned by {@link #writeCounter()}.
99       *
100      * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
101      * so not receive anymore notification.
102      */
103     public ChannelFlushPromiseNotifier notifyPromises() {
104         notifyPromises0(null);
105         return this;
106     }
107 
108     /**
109      * @deprecated use {@link #notifyPromises()}
110      */
111     @Deprecated
112     public ChannelFlushPromiseNotifier notifyFlushFutures() {
113         return notifyPromises();
114     }
115 
116     /**
117      * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
118      * their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}.
119      *
120      * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
121      * so not receive anymore notification.
122      *
123      * The rest of the remaining {@link ChannelFuture}s will be failed with the given {@link Throwable}.
124      *
125      * So after this operation this {@link ChannelFutureListener} is empty.
126      */
127     public ChannelFlushPromiseNotifier notifyPromises(Throwable cause) {
128         notifyPromises();
129         for (;;) {
130             FlushCheckpoint cp = flushCheckpoints.poll();
131             if (cp == null) {
132                 break;
133             }
134             if (tryNotify) {
135                 cp.promise().tryFailure(cause);
136             } else {
137                 cp.promise().setFailure(cause);
138             }
139         }
140         return this;
141     }
142 
143     /**
144      * @deprecated use {@link #notifyPromises(Throwable)}
145      */
146     @Deprecated
147     public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) {
148         return notifyPromises(cause);
149     }
150 
151     /**
152      * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
153      * their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using
154      * the given cause1.
155      *
156      * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
157      * so not receive anymore notification.
158      *
159      * The rest of the remaining {@link ChannelFuture}s will be failed with the given {@link Throwable}.
160      *
161      * So after this operation this {@link ChannelFutureListener} is empty.
162      *
163      * @param cause1    the {@link Throwable} which will be used to fail all of the {@link ChannelFuture}s which
164      *                  pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()}
165      * @param cause2    the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s
166      */
167     public ChannelFlushPromiseNotifier notifyPromises(Throwable cause1, Throwable cause2) {
168         notifyPromises0(cause1);
169         for (;;) {
170             FlushCheckpoint cp = flushCheckpoints.poll();
171             if (cp == null) {
172                 break;
173             }
174             if (tryNotify) {
175                 cp.promise().tryFailure(cause2);
176             } else {
177                 cp.promise().setFailure(cause2);
178             }
179         }
180         return this;
181     }
182 
183     /**
184      * @deprecated use {@link #notifyPromises(Throwable, Throwable)}
185      */
186     @Deprecated
187     public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
188         return notifyPromises(cause1, cause2);
189     }
190 
191     private void notifyPromises0(Throwable cause) {
192         if (flushCheckpoints.isEmpty()) {
193             writeCounter = 0;
194             return;
195         }
196 
197         final long writeCounter = this.writeCounter;
198         for (;;) {
199             FlushCheckpoint cp = flushCheckpoints.peek();
200             if (cp == null) {
201                 // Reset the counter if there's nothing in the notification list.
202                 this.writeCounter = 0;
203                 break;
204             }
205 
206             if (cp.flushCheckpoint() > writeCounter) {
207                 if (writeCounter > 0 && flushCheckpoints.size() == 1) {
208                     this.writeCounter = 0;
209                     cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter);
210                 }
211                 break;
212             }
213 
214             flushCheckpoints.remove();
215             ChannelPromise promise = cp.promise();
216             if (cause == null) {
217                 if (tryNotify) {
218                     promise.trySuccess();
219                 } else {
220                     promise.setSuccess();
221                 }
222             } else {
223                 if (tryNotify) {
224                     promise.tryFailure(cause);
225                 } else {
226                     promise.setFailure(cause);
227                 }
228             }
229         }
230 
231         // Avoid overflow
232         final long newWriteCounter = this.writeCounter;
233         if (newWriteCounter >= 0x8000000000L) {
234             // Reset the counter only when the counter grew pretty large
235             // so that we can reduce the cost of updating all entries in the notification list.
236             this.writeCounter = 0;
237             for (FlushCheckpoint cp: flushCheckpoints) {
238                 cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter);
239             }
240         }
241     }
242 
243     interface FlushCheckpoint {
244         long flushCheckpoint();
245         void flushCheckpoint(long checkpoint);
246         ChannelPromise promise();
247     }
248 
249     private static class DefaultFlushCheckpoint implements FlushCheckpoint {
250         private long checkpoint;
251         private final ChannelPromise future;
252 
253         DefaultFlushCheckpoint(long checkpoint, ChannelPromise future) {
254             this.checkpoint = checkpoint;
255             this.future = future;
256         }
257 
258         @Override
259         public long flushCheckpoint() {
260             return checkpoint;
261         }
262 
263         @Override
264         public void flushCheckpoint(long checkpoint) {
265             this.checkpoint = checkpoint;
266         }
267 
268         @Override
269         public ChannelPromise promise() {
270             return future;
271         }
272     }
273 }