View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.Recycler;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.internal.logging.InternalLogger;
21  import io.netty.util.internal.logging.InternalLoggerFactory;
22  
23  /**
24   * A queue of write operations which are pending for later execution. It also updates the
25   * {@linkplain Channel#isWritable() writability} of the associated {@link Channel}, so that
26   * the pending write operations are also considered to determine the writability.
27   */
28  public final class PendingWriteQueue {
29      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
30  
31      private final ChannelHandlerContext ctx;
32      private final ChannelOutboundBuffer buffer;
33      private final MessageSizeEstimator.Handle estimatorHandle;
34  
35      // head and tail pointers for the linked-list structure. If empty head and tail are null.
36      private PendingWrite head;
37      private PendingWrite tail;
38      private int size;
39  
40      public PendingWriteQueue(ChannelHandlerContext ctx) {
41          if (ctx == null) {
42              throw new NullPointerException("ctx");
43          }
44          this.ctx = ctx;
45          buffer = ctx.channel().unsafe().outboundBuffer();
46          estimatorHandle = ctx.channel().config().getMessageSizeEstimator().newHandle();
47      }
48  
49      /**
50       * Returns {@code true} if there are no pending write operations left in this queue.
51       */
52      public boolean isEmpty() {
53          assert ctx.executor().inEventLoop();
54          return head == null;
55      }
56  
57      /**
58       * Returns the number of pending write operations.
59       */
60      public int size() {
61          assert ctx.executor().inEventLoop();
62          return size;
63      }
64  
65      /**
66       * Add the given {@code msg} and {@link ChannelPromise}.
67       */
68      public void add(Object msg, ChannelPromise promise) {
69          assert ctx.executor().inEventLoop();
70          if (msg == null) {
71              throw new NullPointerException("msg");
72          }
73          if (promise == null) {
74              throw new NullPointerException("promise");
75          }
76          int messageSize = estimatorHandle.size(msg);
77          if (messageSize < 0) {
78              // Size may be unknow so just use 0
79              messageSize = 0;
80          }
81          PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
82          PendingWrite currentTail = tail;
83          if (currentTail == null) {
84              tail = head = write;
85          } else {
86              currentTail.next = write;
87              tail = write;
88          }
89          size ++;
90          buffer.incrementPendingOutboundBytes(write.size);
91      }
92  
93      /**
94       * Remove all pending write operation and fail them with the given {@link Throwable}. The message will be released
95       * via {@link ReferenceCountUtil#safeRelease(Object)}.
96       */
97      public void removeAndFailAll(Throwable cause) {
98          assert ctx.executor().inEventLoop();
99          if (cause == null) {
100             throw new NullPointerException("cause");
101         }
102         // Guard against re-entrance by directly reset
103         PendingWrite write = head;
104         head = tail = null;
105         size = 0;
106 
107         while (write != null) {
108             PendingWrite next = write.next;
109             ReferenceCountUtil.safeRelease(write.msg);
110             ChannelPromise promise = write.promise;
111             recycle(write, false);
112             safeFail(promise, cause);
113             write = next;
114         }
115         assertEmpty();
116     }
117 
118     /**
119      * Remove a pending write operation and fail it with the given {@link Throwable}. The message will be released via
120      * {@link ReferenceCountUtil#safeRelease(Object)}.
121      */
122     public void removeAndFail(Throwable cause) {
123         assert ctx.executor().inEventLoop();
124         if (cause == null) {
125             throw new NullPointerException("cause");
126         }
127         PendingWrite write = head;
128 
129         if (write == null) {
130             return;
131         }
132         ReferenceCountUtil.safeRelease(write.msg);
133         ChannelPromise promise = write.promise;
134         safeFail(promise, cause);
135         recycle(write, true);
136     }
137 
138     /**
139      * Remove all pending write operation and performs them via
140      * {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
141      *
142      * @return  {@link ChannelFuture} if something was written and {@code null}
143      *          if the {@link PendingWriteQueue} is empty.
144      */
145     public ChannelFuture removeAndWriteAll() {
146         assert ctx.executor().inEventLoop();
147 
148         if (size == 1) {
149             // No need to use ChannelPromiseAggregator for this case.
150             return removeAndWrite();
151         }
152         PendingWrite write = head;
153         if (write == null) {
154             // empty so just return null
155             return null;
156         }
157 
158         // Guard against re-entrance by directly reset
159         head = tail = null;
160         size = 0;
161 
162         ChannelPromise p = ctx.newPromise();
163         ChannelPromiseAggregator aggregator = new ChannelPromiseAggregator(p);
164         while (write != null) {
165             PendingWrite next = write.next;
166             Object msg = write.msg;
167             ChannelPromise promise = write.promise;
168             recycle(write, false);
169             ctx.write(msg, promise);
170             aggregator.add(promise);
171             write = next;
172         }
173         assertEmpty();
174         return p;
175     }
176 
177     private void assertEmpty() {
178         assert tail == null && head == null && size == 0;
179     }
180 
181     /**
182      * Removes a pending write operation and performs it via
183      * {@link ChannelHandlerContext#write(Object, ChannelPromise)}.
184      *
185      * @return  {@link ChannelFuture} if something was written and {@code null}
186      *          if the {@link PendingWriteQueue} is empty.
187      */
188     public ChannelFuture removeAndWrite() {
189         assert ctx.executor().inEventLoop();
190         PendingWrite write = head;
191         if (write == null) {
192             return null;
193         }
194         Object msg = write.msg;
195         ChannelPromise promise = write.promise;
196         recycle(write, true);
197         return ctx.write(msg, promise);
198     }
199 
200     /**
201      * Removes a pending write operation and release it's message via {@link ReferenceCountUtil#safeRelease(Object)}.
202      *
203      * @return  {@link ChannelPromise} of the pending write or {@code null} if the queue is empty.
204      *
205      */
206     public ChannelPromise remove() {
207         assert ctx.executor().inEventLoop();
208         PendingWrite write = head;
209         if (write == null) {
210             return null;
211         }
212         ChannelPromise promise = write.promise;
213         ReferenceCountUtil.safeRelease(write.msg);
214         recycle(write, true);
215         return promise;
216     }
217 
218     /**
219      * Return the current message or {@code null} if empty.
220      */
221     public Object current() {
222         assert ctx.executor().inEventLoop();
223         PendingWrite write = head;
224         if (write == null) {
225             return null;
226         }
227         return write.msg;
228     }
229 
230     private void recycle(PendingWrite write, boolean update) {
231         final PendingWrite next = write.next;
232         final long writeSize = write.size;
233 
234         if (update) {
235             if (next == null) {
236                 // Handled last PendingWrite so rest head and tail
237                 // Guard against re-entrance by directly reset
238                 head = tail = null;
239                 size = 0;
240             } else {
241                 head = next;
242                 size --;
243                 assert size > 0;
244             }
245         }
246 
247         write.recycle();
248         buffer.decrementPendingOutboundBytes(writeSize);
249     }
250 
251     private static void safeFail(ChannelPromise promise, Throwable cause) {
252         if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
253             logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
254         }
255     }
256 
257     /**
258      * Holds all meta-data and construct the linked-list structure.
259      */
260     static final class PendingWrite {
261         private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() {
262             @Override
263             protected PendingWrite newObject(Handle handle) {
264                 return new PendingWrite(handle);
265             }
266         };
267 
268         private final Recycler.Handle handle;
269         private PendingWrite next;
270         private long size;
271         private ChannelPromise promise;
272         private Object msg;
273 
274         private PendingWrite(Recycler.Handle handle) {
275             this.handle = handle;
276         }
277 
278         static PendingWrite newInstance(Object msg, int size, ChannelPromise promise) {
279             PendingWrite write = RECYCLER.get();
280             write.size = size;
281             write.msg = msg;
282             write.promise = promise;
283             return write;
284         }
285 
286         private void recycle() {
287             size = 0;
288             next = null;
289             msg = null;
290             promise = null;
291             RECYCLER.recycle(this, handle);
292         }
293     }
294 }