1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.Recycler;
19 import io.netty.util.ReferenceCountUtil;
20 import io.netty.util.concurrent.PromiseCombiner;
21 import io.netty.util.internal.SystemPropertyUtil;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24
25
26
27
28
29
30 public final class PendingWriteQueue {
31 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
32
33
34
35
36 private static final int PENDING_WRITE_OVERHEAD =
37 SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
38
39 private final ChannelHandlerContext ctx;
40 private final ChannelOutboundBuffer buffer;
41 private final MessageSizeEstimator.Handle estimatorHandle;
42
43
44 private PendingWrite head;
45 private PendingWrite tail;
46 private int size;
47 private long bytes;
48
49 public PendingWriteQueue(ChannelHandlerContext ctx) {
50 if (ctx == null) {
51 throw new NullPointerException("ctx");
52 }
53 this.ctx = ctx;
54 buffer = ctx.channel().unsafe().outboundBuffer();
55 estimatorHandle = ctx.channel().config().getMessageSizeEstimator().newHandle();
56 }
57
58
59
60
61 public boolean isEmpty() {
62 assert ctx.executor().inEventLoop();
63 return head == null;
64 }
65
66
67
68
69 public int size() {
70 assert ctx.executor().inEventLoop();
71 return size;
72 }
73
74
75
76
77
78 public long bytes() {
79 assert ctx.executor().inEventLoop();
80 return bytes;
81 }
82
83 private int size(Object msg) {
84
85
86 int messageSize = estimatorHandle.size(msg);
87 if (messageSize < 0) {
88
89 messageSize = 0;
90 }
91 return messageSize + PENDING_WRITE_OVERHEAD;
92 }
93
94
95
96
97 public void add(Object msg, ChannelPromise promise) {
98 assert ctx.executor().inEventLoop();
99 if (msg == null) {
100 throw new NullPointerException("msg");
101 }
102 if (promise == null) {
103 throw new NullPointerException("promise");
104 }
105
106
107 int messageSize = size(msg);
108
109 PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
110 PendingWrite currentTail = tail;
111 if (currentTail == null) {
112 tail = head = write;
113 } else {
114 currentTail.next = write;
115 tail = write;
116 }
117 size ++;
118 bytes += messageSize;
119
120
121
122 if (buffer != null) {
123 buffer.incrementPendingOutboundBytes(write.size);
124 }
125 }
126
127
128
129
130
131
132
133
134 public ChannelFuture removeAndWriteAll() {
135 assert ctx.executor().inEventLoop();
136
137 if (isEmpty()) {
138 return null;
139 }
140
141 ChannelPromise p = ctx.newPromise();
142 PromiseCombiner combiner = new PromiseCombiner();
143 try {
144
145
146 for (PendingWrite write = head; write != null; write = head) {
147 head = tail = null;
148 size = 0;
149 bytes = 0;
150
151 while (write != null) {
152 PendingWrite next = write.next;
153 Object msg = write.msg;
154 ChannelPromise promise = write.promise;
155 recycle(write, false);
156 combiner.add(promise);
157 ctx.write(msg, promise);
158 write = next;
159 }
160 }
161 combiner.finish(p);
162 } catch (Throwable cause) {
163 p.setFailure(cause);
164 }
165 assertEmpty();
166 return p;
167 }
168
169
170
171
172
173 public void removeAndFailAll(Throwable cause) {
174 assert ctx.executor().inEventLoop();
175 if (cause == null) {
176 throw new NullPointerException("cause");
177 }
178
179
180 for (PendingWrite write = head; write != null; write = head) {
181 head = tail = null;
182 size = 0;
183 bytes = 0;
184 while (write != null) {
185 PendingWrite next = write.next;
186 ReferenceCountUtil.safeRelease(write.msg);
187 ChannelPromise promise = write.promise;
188 recycle(write, false);
189 safeFail(promise, cause);
190 write = next;
191 }
192 }
193 assertEmpty();
194 }
195
196
197
198
199
200 public void removeAndFail(Throwable cause) {
201 assert ctx.executor().inEventLoop();
202 if (cause == null) {
203 throw new NullPointerException("cause");
204 }
205 PendingWrite write = head;
206
207 if (write == null) {
208 return;
209 }
210 ReferenceCountUtil.safeRelease(write.msg);
211 ChannelPromise promise = write.promise;
212 safeFail(promise, cause);
213 recycle(write, true);
214 }
215
216 private void assertEmpty() {
217 assert tail == null && head == null && size == 0;
218 }
219
220
221
222
223
224
225
226
227 public ChannelFuture removeAndWrite() {
228 assert ctx.executor().inEventLoop();
229 PendingWrite write = head;
230 if (write == null) {
231 return null;
232 }
233 Object msg = write.msg;
234 ChannelPromise promise = write.promise;
235 recycle(write, true);
236 return ctx.write(msg, promise);
237 }
238
239
240
241
242
243
244
245 public ChannelPromise remove() {
246 assert ctx.executor().inEventLoop();
247 PendingWrite write = head;
248 if (write == null) {
249 return null;
250 }
251 ChannelPromise promise = write.promise;
252 ReferenceCountUtil.safeRelease(write.msg);
253 recycle(write, true);
254 return promise;
255 }
256
257
258
259
260 public Object current() {
261 assert ctx.executor().inEventLoop();
262 PendingWrite write = head;
263 if (write == null) {
264 return null;
265 }
266 return write.msg;
267 }
268
269 private void recycle(PendingWrite write, boolean update) {
270 final PendingWrite next = write.next;
271 final long writeSize = write.size;
272
273 if (update) {
274 if (next == null) {
275
276
277 head = tail = null;
278 size = 0;
279 bytes = 0;
280 } else {
281 head = next;
282 size --;
283 bytes -= writeSize;
284 assert size > 0 && bytes >= 0;
285 }
286 }
287
288 write.recycle();
289
290
291
292 if (buffer != null) {
293 buffer.decrementPendingOutboundBytes(writeSize);
294 }
295 }
296
297 private static void safeFail(ChannelPromise promise, Throwable cause) {
298 if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
299 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
300 }
301 }
302
303
304
305
306 static final class PendingWrite {
307 private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() {
308 @Override
309 protected PendingWrite newObject(Handle handle) {
310 return new PendingWrite(handle);
311 }
312 };
313
314 private final Recycler.Handle handle;
315 private PendingWrite next;
316 private long size;
317 private ChannelPromise promise;
318 private Object msg;
319
320 private PendingWrite(Recycler.Handle handle) {
321 this.handle = handle;
322 }
323
324 static PendingWrite newInstance(Object msg, int size, ChannelPromise promise) {
325 PendingWrite write = RECYCLER.get();
326 write.size = size;
327 write.msg = msg;
328 write.promise = promise;
329 return write;
330 }
331
332 private void recycle() {
333 size = 0;
334 next = null;
335 msg = null;
336 promise = null;
337 RECYCLER.recycle(this, handle);
338 }
339 }
340 }