View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import java.util.ArrayDeque;
19  import java.util.Queue;
20  
21  /**
22   * This implementation allows to register {@link ChannelFuture} instances which will get notified once some amount of
23   * data was written and so a checkpoint was reached.
24   */
25  public final class ChannelFlushPromiseNotifier {
26  
27      private long writeCounter;
28      private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
29      private final boolean tryNotify;
30  
31      /**
32       * Create a new instance
33       *
34       * @param tryNotify if {@code true} the {@link ChannelPromise}s will get notified with
35       *                  {@link ChannelPromise#trySuccess()} and {@link ChannelPromise#tryFailure(Throwable)}.
36       *                  Otherwise {@link ChannelPromise#setSuccess()} and {@link ChannelPromise#setFailure(Throwable)}
37       *                  is used
38       */
39      public ChannelFlushPromiseNotifier(boolean tryNotify) {
40          this.tryNotify = tryNotify;
41      }
42  
43      /**
44       * Create a new instance which will use {@link ChannelPromise#setSuccess()} and
45       * {@link ChannelPromise#setFailure(Throwable)} to notify the {@link ChannelPromise}s.
46       */
47      public ChannelFlushPromiseNotifier() {
48          this(false);
49      }
50  
51      /**
52       * @deprecated use {@link #add(ChannelPromise, long)}
53       */
54      @Deprecated
55      public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) {
56          return add(promise, (long) pendingDataSize);
57      }
58  
59      /**
60       * Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given
61       * {@code pendingDataSize} was reached.
62       */
63      public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) {
64          if (promise == null) {
65              throw new NullPointerException("promise");
66          }
67          if (pendingDataSize < 0) {
68              throw new IllegalArgumentException("pendingDataSize must be >= 0 but was " + pendingDataSize);
69          }
70          long checkpoint = writeCounter + pendingDataSize;
71          if (promise instanceof FlushCheckpoint) {
72              FlushCheckpoint cp = (FlushCheckpoint) promise;
73              cp.flushCheckpoint(checkpoint);
74              flushCheckpoints.add(cp);
75          } else {
76              flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, promise));
77          }
78          return this;
79      }
80      /**
81       * Increase the current write counter by the given delta
82       */
83      public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) {
84          if (delta < 0) {
85              throw new IllegalArgumentException("delta must be >= 0 but was " + delta);
86          }
87          writeCounter += delta;
88          return this;
89      }
90  
91      /**
92       * Return the current write counter of this {@link ChannelFlushPromiseNotifier}
93       */
94      public long writeCounter() {
95          return writeCounter;
96      }
97  
98      /**
99       * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
100      * their pendingDatasize is smaller after the the current writeCounter returned by {@link #writeCounter()}.
101      *
102      * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
103      * so not receive anymore notification.
104      */
105     public ChannelFlushPromiseNotifier notifyPromises() {
106         notifyPromises0(null);
107         return this;
108     }
109 
110     /**
111      * @deprecated use {@link #notifyPromises()}
112      */
113     @Deprecated
114     public ChannelFlushPromiseNotifier notifyFlushFutures() {
115         return notifyPromises();
116     }
117 
118     /**
119      * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
120      * their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}.
121      *
122      * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
123      * so not receive anymore notification.
124      *
125      * The rest of the remaining {@link ChannelFuture}s will be failed with the given {@link Throwable}.
126      *
127      * So after this operation this {@link ChannelFutureListener} is empty.
128      */
129     public ChannelFlushPromiseNotifier notifyPromises(Throwable cause) {
130         notifyPromises();
131         for (;;) {
132             FlushCheckpoint cp = flushCheckpoints.poll();
133             if (cp == null) {
134                 break;
135             }
136             if (tryNotify) {
137                 cp.promise().tryFailure(cause);
138             } else {
139                 cp.promise().setFailure(cause);
140             }
141         }
142         return this;
143     }
144 
145     /**
146      * @deprecated use {@link #notifyPromises(Throwable)}
147      */
148     @Deprecated
149     public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) {
150         return notifyPromises(cause);
151     }
152 
153     /**
154      * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
155      * their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using
156      * the given cause1.
157      *
158      * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
159      * so not receive anymore notification.
160      *
161      * The rest of the remaining {@link ChannelFuture}s will be failed with the given {@link Throwable}.
162      *
163      * So after this operation this {@link ChannelFutureListener} is empty.
164      *
165      * @param cause1    the {@link Throwable} which will be used to fail all of the {@link ChannelFuture}s which
166      *                  pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()}
167      * @param cause2    the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s
168      */
169     public ChannelFlushPromiseNotifier notifyPromises(Throwable cause1, Throwable cause2) {
170         notifyPromises0(cause1);
171         for (;;) {
172             FlushCheckpoint cp = flushCheckpoints.poll();
173             if (cp == null) {
174                 break;
175             }
176             if (tryNotify) {
177                 cp.promise().tryFailure(cause2);
178             } else {
179                 cp.promise().setFailure(cause2);
180             }
181         }
182         return this;
183     }
184 
185     /**
186      * @deprecated use {@link #notifyPromises(Throwable, Throwable)}
187      */
188     @Deprecated
189     public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
190         return notifyPromises(cause1, cause2);
191     }
192 
193     private void notifyPromises0(Throwable cause) {
194         if (flushCheckpoints.isEmpty()) {
195             writeCounter = 0;
196             return;
197         }
198 
199         final long writeCounter = this.writeCounter;
200         for (;;) {
201             FlushCheckpoint cp = flushCheckpoints.peek();
202             if (cp == null) {
203                 // Reset the counter if there's nothing in the notification list.
204                 this.writeCounter = 0;
205                 break;
206             }
207 
208             if (cp.flushCheckpoint() > writeCounter) {
209                 if (writeCounter > 0 && flushCheckpoints.size() == 1) {
210                     this.writeCounter = 0;
211                     cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter);
212                 }
213                 break;
214             }
215 
216             flushCheckpoints.remove();
217             ChannelPromise promise = cp.promise();
218             if (cause == null) {
219                 if (tryNotify) {
220                     promise.trySuccess();
221                 } else {
222                     promise.setSuccess();
223                 }
224             } else {
225                 if (tryNotify) {
226                     promise.tryFailure(cause);
227                 } else {
228                     promise.setFailure(cause);
229                 }
230             }
231         }
232 
233         // Avoid overflow
234         final long newWriteCounter = this.writeCounter;
235         if (newWriteCounter >= 0x8000000000L) {
236             // Reset the counter only when the counter grew pretty large
237             // so that we can reduce the cost of updating all entries in the notification list.
238             this.writeCounter = 0;
239             for (FlushCheckpoint cp: flushCheckpoints) {
240                 cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter);
241             }
242         }
243     }
244 
245     interface FlushCheckpoint {
246         long flushCheckpoint();
247         void flushCheckpoint(long checkpoint);
248         ChannelPromise promise();
249     }
250 
251     private static class DefaultFlushCheckpoint implements FlushCheckpoint {
252         private long checkpoint;
253         private final ChannelPromise future;
254 
255         DefaultFlushCheckpoint(long checkpoint, ChannelPromise future) {
256             this.checkpoint = checkpoint;
257             this.future = future;
258         }
259 
260         @Override
261         public long flushCheckpoint() {
262             return checkpoint;
263         }
264 
265         @Override
266         public void flushCheckpoint(long checkpoint) {
267             this.checkpoint = checkpoint;
268         }
269 
270         @Override
271         public ChannelPromise promise() {
272             return future;
273         }
274     }
275 }