View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import java.util.ArrayDeque;
19  import java.util.Queue;
20  
21  /**
22   * This implementation allows to register {@link ChannelFuture} instances which will get notified once some amount of
23   * data was written and so a checkpoint was reached.
24   */
25  public final class ChannelFlushPromiseNotifier {
26  
27      private long writeCounter;
28      private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
29      private final boolean tryNotify;
30  
31      /**
32       * Create a new instance
33       *
34       * @param tryNotify if {@code true} the {@link ChannelPromise}s will get notified with
35       *                  {@link ChannelPromise#trySuccess()} and {@link ChannelPromise#tryFailure(Throwable)}.
36       *                  Otherwise {@link ChannelPromise#setSuccess()} and {@link ChannelPromise#setFailure(Throwable)}
37       *                  is used
38       */
39      public ChannelFlushPromiseNotifier(boolean tryNotify) {
40          this.tryNotify = tryNotify;
41      }
42  
43      /**
44       * Create a new instance which will use {@link ChannelPromise#setSuccess()} and
45       * {@link ChannelPromise#setFailure(Throwable)} to notify the {@link ChannelPromise}s.
46       */
47      public ChannelFlushPromiseNotifier() {
48          this(false);
49      }
50  
51      /**
52       * Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given
53       * {@code pendingDataSize} was reached.
54       */
55      public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) {
56          if (promise == null) {
57              throw new NullPointerException("promise");
58          }
59          if (pendingDataSize < 0) {
60              throw new IllegalArgumentException("pendingDataSize must be >= 0 but was " + pendingDataSize);
61          }
62          long checkpoint = writeCounter + pendingDataSize;
63          if (promise instanceof FlushCheckpoint) {
64              FlushCheckpoint cp = (FlushCheckpoint) promise;
65              cp.flushCheckpoint(checkpoint);
66              flushCheckpoints.add(cp);
67          } else {
68              flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, promise));
69          }
70          return this;
71      }
72      /**
73       * Increase the current write counter by the given delta
74       */
75      public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) {
76          if (delta < 0) {
77              throw new IllegalArgumentException("delta must be >= 0 but was " + delta);
78          }
79          writeCounter += delta;
80          return this;
81      }
82  
83      /**
84       * Return the current write counter of this {@link ChannelFlushPromiseNotifier}
85       */
86      public long writeCounter() {
87          return writeCounter;
88      }
89  
90      /**
91       * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, long)} and
92       * their pendingDatasize is smaller after the the current writeCounter returned by {@link #writeCounter()}.
93       *
94       * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
95       * so not receive anymore notification.
96       */
97      public ChannelFlushPromiseNotifier notifyPromises() {
98          notifyPromises0(null);
99          return this;
100     }
101 
102     /**
103      * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, long)} and
104      * their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}.
105      *
106      * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
107      * so not receive anymore notification.
108      *
109      * The rest of the remaining {@link ChannelFuture}s will be failed with the given {@link Throwable}.
110      *
111      * So after this operation this {@link ChannelFutureListener} is empty.
112      */
113     public ChannelFlushPromiseNotifier notifyPromises(Throwable cause) {
114         notifyPromises();
115         for (;;) {
116             FlushCheckpoint cp = flushCheckpoints.poll();
117             if (cp == null) {
118                 break;
119             }
120             if (tryNotify) {
121                 cp.promise().tryFailure(cause);
122             } else {
123                 cp.promise().setFailure(cause);
124             }
125         }
126         return this;
127     }
128 
129     /**
130      * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, long)} and
131      * their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using
132      * the given cause1.
133      *
134      * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
135      * so not receive anymore notification.
136      *
137      * The rest of the remaining {@link ChannelFuture}s will be failed with the given {@link Throwable}.
138      *
139      * So after this operation this {@link ChannelFutureListener} is empty.
140      *
141      * @param cause1    the {@link Throwable} which will be used to fail all of the {@link ChannelFuture}s whichs
142      *                  pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()}
143      * @param cause2    the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s
144      */
145     public ChannelFlushPromiseNotifier notifyPromises(Throwable cause1, Throwable cause2) {
146         notifyPromises0(cause1);
147         for (;;) {
148             FlushCheckpoint cp = flushCheckpoints.poll();
149             if (cp == null) {
150                 break;
151             }
152             if (tryNotify) {
153                 cp.promise().tryFailure(cause2);
154             } else {
155                 cp.promise().setFailure(cause2);
156             }
157         }
158         return this;
159     }
160 
161     /**
162      * @deprecated use {@link #notifyPromises(Throwable, Throwable)}
163      */
164     @Deprecated
165     public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
166         return notifyPromises(cause1, cause2);
167     }
168 
169     private void notifyPromises0(Throwable cause) {
170         if (flushCheckpoints.isEmpty()) {
171             writeCounter = 0;
172             return;
173         }
174 
175         final long writeCounter = this.writeCounter;
176         for (;;) {
177             FlushCheckpoint cp = flushCheckpoints.peek();
178             if (cp == null) {
179                 // Reset the counter if there's nothing in the notification list.
180                 this.writeCounter = 0;
181                 break;
182             }
183 
184             if (cp.flushCheckpoint() > writeCounter) {
185                 if (writeCounter > 0 && flushCheckpoints.size() == 1) {
186                     this.writeCounter = 0;
187                     cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter);
188                 }
189                 break;
190             }
191 
192             flushCheckpoints.remove();
193             ChannelPromise promise = cp.promise();
194             if (cause == null) {
195                 if (tryNotify) {
196                     promise.trySuccess();
197                 } else {
198                     promise.setSuccess();
199                 }
200             } else {
201                 if (tryNotify) {
202                     promise.tryFailure(cause);
203                 } else {
204                     promise.setFailure(cause);
205                 }
206             }
207         }
208 
209         // Avoid overflow
210         final long newWriteCounter = this.writeCounter;
211         if (newWriteCounter >= 0x8000000000L) {
212             // Reset the counter only when the counter grew pretty large
213             // so that we can reduce the cost of updating all entries in the notification list.
214             this.writeCounter = 0;
215             for (FlushCheckpoint cp: flushCheckpoints) {
216                 cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter);
217             }
218         }
219     }
220 
221     interface FlushCheckpoint {
222         long flushCheckpoint();
223         void flushCheckpoint(long checkpoint);
224         ChannelPromise promise();
225     }
226 
227     private static class DefaultFlushCheckpoint implements FlushCheckpoint {
228         private long checkpoint;
229         private final ChannelPromise future;
230 
231         DefaultFlushCheckpoint(long checkpoint, ChannelPromise future) {
232             this.checkpoint = checkpoint;
233             this.future = future;
234         }
235 
236         @Override
237         public long flushCheckpoint() {
238             return checkpoint;
239         }
240 
241         @Override
242         public void flushCheckpoint(long checkpoint) {
243             this.checkpoint = checkpoint;
244         }
245 
246         @Override
247         public ChannelPromise promise() {
248             return future;
249         }
250     }
251 }