View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.Recycler;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.concurrent.PromiseCombiner;
21  import io.netty.util.internal.ObjectUtil;
22  import io.netty.util.internal.SystemPropertyUtil;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  
26  /**
27   * A queue of write operations which are pending for later execution. It also updates the
28   * {@linkplain Channel#isWritable() writability} of the associated {@link Channel}, so that
29   * the pending write operations are also considered to determine the writability.
30   */
31  public final class PendingWriteQueue {
32      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
33      // Assuming a 64-bit JVM:
34      //  - 16 bytes object header
35      //  - 4 reference fields
36      //  - 1 long fields
37      private static final int PENDING_WRITE_OVERHEAD =
38              SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
39  
40      private final ChannelHandlerContext ctx;
41      private final PendingBytesTracker tracker;
42  
43      // head and tail pointers for the linked-list structure. If empty head and tail are null.
44      private PendingWrite head;
45      private PendingWrite tail;
46      private int size;
47      private long bytes;
48  
49      public PendingWriteQueue(ChannelHandlerContext ctx) {
50          tracker = PendingBytesTracker.newTracker(ctx.channel());
51          this.ctx = ctx;
52      }
53  
54      /**
55       * Returns {@code true} if there are no pending write operations left in this queue.
56       */
57      public boolean isEmpty() {
58          assert ctx.executor().inEventLoop();
59          return head == null;
60      }
61  
62      /**
63       * Returns the number of pending write operations.
64       */
65      public int size() {
66          assert ctx.executor().inEventLoop();
67          return size;
68      }
69  
70      /**
71       * Returns the total number of bytes that are pending because of pending messages. This is only an estimate so
72       * it should only be treated as a hint.
73       */
74      public long bytes() {
75          assert ctx.executor().inEventLoop();
76          return bytes;
77      }
78  
79      private int size(Object msg) {
80          // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
81          // we should add them to the queue and let removeAndFailAll() fail them later.
82          int messageSize = tracker.size(msg);
83          if (messageSize < 0) {
84              // Size may be unknown so just use 0
85              messageSize = 0;
86          }
87          return messageSize + PENDING_WRITE_OVERHEAD;
88      }
89  
90      /**
91       * Add the given {@code msg} and {@link ChannelPromise}.
92       */
93      public void add(Object msg, ChannelPromise promise) {
94          assert ctx.executor().inEventLoop();
95          if (msg == null) {
96              throw new NullPointerException("msg");
97          }
98          if (promise == null) {
99              throw new NullPointerException("promise");
100         }
101         // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering,
102         // we should add them to the queue and let removeAndFailAll() fail them later.
103         int messageSize = size(msg);
104 
105         PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
106         PendingWrite currentTail = tail;
107         if (currentTail == null) {
108             tail = head = write;
109         } else {
110             currentTail.next = write;
111             tail = write;
112         }
113         size ++;
114         bytes += messageSize;
115         tracker.incrementPendingOutboundBytes(write.size);
116     }
117 
118     /**
119      * Remove all pending write operation and performs them via
120      * {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
121      *
122      * @return  {@link ChannelFuture} if something was written and {@code null}
123      *          if the {@link PendingWriteQueue} is empty.
124      */
125     public ChannelFuture removeAndWriteAll() {
126         assert ctx.executor().inEventLoop();
127 
128         if (isEmpty()) {
129             return null;
130         }
131 
132         ChannelPromise p = ctx.newPromise();
133         PromiseCombiner combiner = new PromiseCombiner();
134         try {
135             // It is possible for some of the written promises to trigger more writes. The new writes
136             // will "revive" the queue, so we need to write them up until the queue is empty.
137             for (PendingWrite write = head; write != null; write = head) {
138                 head = tail = null;
139                 size = 0;
140                 bytes = 0;
141 
142                 while (write != null) {
143                     PendingWrite next = write.next;
144                     Object msg = write.msg;
145                     ChannelPromise promise = write.promise;
146                     recycle(write, false);
147                     if (!(promise instanceof VoidChannelPromise)) {
148                         combiner.add(promise);
149                     }
150                     ctx.write(msg, promise);
151                     write = next;
152                 }
153             }
154             combiner.finish(p);
155         } catch (Throwable cause) {
156             p.setFailure(cause);
157         }
158         assertEmpty();
159         return p;
160     }
161 
162     /**
163      * Remove all pending write operation and fail them with the given {@link Throwable}. The message will be released
164      * via {@link ReferenceCountUtil#safeRelease(Object)}.
165      */
166     public void removeAndFailAll(Throwable cause) {
167         assert ctx.executor().inEventLoop();
168         if (cause == null) {
169             throw new NullPointerException("cause");
170         }
171         // It is possible for some of the failed promises to trigger more writes. The new writes
172         // will "revive" the queue, so we need to clean them up until the queue is empty.
173         for (PendingWrite write = head; write != null; write = head) {
174             head = tail = null;
175             size = 0;
176             bytes = 0;
177             while (write != null) {
178                 PendingWrite next = write.next;
179                 ReferenceCountUtil.safeRelease(write.msg);
180                 ChannelPromise promise = write.promise;
181                 recycle(write, false);
182                 safeFail(promise, cause);
183                 write = next;
184             }
185         }
186         assertEmpty();
187     }
188 
189     /**
190      * Remove a pending write operation and fail it with the given {@link Throwable}. The message will be released via
191      * {@link ReferenceCountUtil#safeRelease(Object)}.
192      */
193     public void removeAndFail(Throwable cause) {
194         assert ctx.executor().inEventLoop();
195         if (cause == null) {
196             throw new NullPointerException("cause");
197         }
198         PendingWrite write = head;
199 
200         if (write == null) {
201             return;
202         }
203         ReferenceCountUtil.safeRelease(write.msg);
204         ChannelPromise promise = write.promise;
205         safeFail(promise, cause);
206         recycle(write, true);
207     }
208 
209     private void assertEmpty() {
210         assert tail == null && head == null && size == 0;
211     }
212 
213     /**
214      * Removes a pending write operation and performs it via
215      * {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
216      *
217      * @return  {@link ChannelFuture} if something was written and {@code null}
218      *          if the {@link PendingWriteQueue} is empty.
219      */
220     public ChannelFuture removeAndWrite() {
221         assert ctx.executor().inEventLoop();
222         PendingWrite write = head;
223         if (write == null) {
224             return null;
225         }
226         Object msg = write.msg;
227         ChannelPromise promise = write.promise;
228         recycle(write, true);
229         return ctx.write(msg, promise);
230     }
231 
232     /**
233      * Removes a pending write operation and release it's message via {@link ReferenceCountUtil#safeRelease(Object)}.
234      *
235      * @return  {@link ChannelPromise} of the pending write or {@code null} if the queue is empty.
236      *
237      */
238     public ChannelPromise remove() {
239         assert ctx.executor().inEventLoop();
240         PendingWrite write = head;
241         if (write == null) {
242             return null;
243         }
244         ChannelPromise promise = write.promise;
245         ReferenceCountUtil.safeRelease(write.msg);
246         recycle(write, true);
247         return promise;
248     }
249 
250     /**
251      * Return the current message or {@code null} if empty.
252      */
253     public Object current() {
254         assert ctx.executor().inEventLoop();
255         PendingWrite write = head;
256         if (write == null) {
257             return null;
258         }
259         return write.msg;
260     }
261 
262     private void recycle(PendingWrite write, boolean update) {
263         final PendingWrite next = write.next;
264         final long writeSize = write.size;
265 
266         if (update) {
267             if (next == null) {
268                 // Handled last PendingWrite so rest head and tail
269                 // Guard against re-entrance by directly reset
270                 head = tail = null;
271                 size = 0;
272                 bytes = 0;
273             } else {
274                 head = next;
275                 size --;
276                 bytes -= writeSize;
277                 assert size > 0 && bytes >= 0;
278             }
279         }
280 
281         write.recycle();
282         tracker.decrementPendingOutboundBytes(writeSize);
283     }
284 
285     private static void safeFail(ChannelPromise promise, Throwable cause) {
286         if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
287             logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
288         }
289     }
290 
291     /**
292      * Holds all meta-data and construct the linked-list structure.
293      */
294     static final class PendingWrite {
295         private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() {
296             @Override
297             protected PendingWrite newObject(Handle<PendingWrite> handle) {
298                 return new PendingWrite(handle);
299             }
300         };
301 
302         private final Recycler.Handle<PendingWrite> handle;
303         private PendingWrite next;
304         private long size;
305         private ChannelPromise promise;
306         private Object msg;
307 
308         private PendingWrite(Recycler.Handle<PendingWrite> handle) {
309             this.handle = handle;
310         }
311 
312         static PendingWrite newInstance(Object msg, int size, ChannelPromise promise) {
313             PendingWrite write = RECYCLER.get();
314             write.size = size;
315             write.msg = msg;
316             write.promise = promise;
317             return write;
318         }
319 
320         private void recycle() {
321             size = 0;
322             next = null;
323             msg = null;
324             promise = null;
325             handle.recycle(this);
326         }
327     }
328 }