1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.ReferenceCountUtil;
19 import io.netty.util.concurrent.EventExecutor;
20 import io.netty.util.concurrent.PromiseCombiner;
21 import io.netty.util.internal.ObjectPool;
22 import io.netty.util.internal.ObjectPool.ObjectCreator;
23 import io.netty.util.internal.ObjectUtil;
24 import io.netty.util.internal.SystemPropertyUtil;
25 import io.netty.util.internal.logging.InternalLogger;
26 import io.netty.util.internal.logging.InternalLoggerFactory;
27
28
29
30
31
32
33 public final class PendingWriteQueue {
34 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
35
36
37
38
39 private static final int PENDING_WRITE_OVERHEAD =
40 SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
41
42 private final ChannelOutboundInvoker invoker;
43 private final EventExecutor executor;
44 private final PendingBytesTracker tracker;
45
46
47 private PendingWrite head;
48 private PendingWrite tail;
49 private int size;
50 private long bytes;
51
52 public PendingWriteQueue(ChannelHandlerContext ctx) {
53 tracker = PendingBytesTracker.newTracker(ctx.channel());
54 this.invoker = ctx;
55 this.executor = ctx.executor();
56 }
57
58 public PendingWriteQueue(Channel channel) {
59 tracker = PendingBytesTracker.newTracker(channel);
60 this.invoker = channel;
61 this.executor = channel.eventLoop();
62 }
63
64
65
66
67 public boolean isEmpty() {
68 assert executor.inEventLoop();
69 return head == null;
70 }
71
72
73
74
75 public int size() {
76 assert executor.inEventLoop();
77 return size;
78 }
79
80
81
82
83
84 public long bytes() {
85 assert executor.inEventLoop();
86 return bytes;
87 }
88
89 private int size(Object msg) {
90
91
92 int messageSize = tracker.size(msg);
93 if (messageSize < 0) {
94
95 messageSize = 0;
96 }
97 return messageSize + PENDING_WRITE_OVERHEAD;
98 }
99
100
101
102
103 public void add(Object msg, ChannelPromise promise) {
104 assert executor.inEventLoop();
105 ObjectUtil.checkNotNull(msg, "msg");
106 ObjectUtil.checkNotNull(promise, "promise");
107
108
109 int messageSize = size(msg);
110
111 PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
112 PendingWrite currentTail = tail;
113 if (currentTail == null) {
114 tail = head = write;
115 } else {
116 currentTail.next = write;
117 tail = write;
118 }
119 size ++;
120 bytes += messageSize;
121 tracker.incrementPendingOutboundBytes(write.size);
122 }
123
124
125
126
127
128
129
130
131 public ChannelFuture removeAndWriteAll() {
132 assert executor.inEventLoop();
133
134 if (isEmpty()) {
135 return null;
136 }
137
138 ChannelPromise p = invoker.newPromise();
139 PromiseCombiner combiner = new PromiseCombiner(executor);
140 try {
141
142
143 for (PendingWrite write = head; write != null; write = head) {
144 head = tail = null;
145 size = 0;
146 bytes = 0;
147
148 while (write != null) {
149 PendingWrite next = write.next;
150 Object msg = write.msg;
151 ChannelPromise promise = write.promise;
152 recycle(write, false);
153 if (!(promise instanceof VoidChannelPromise)) {
154 combiner.add(promise);
155 }
156 invoker.write(msg, promise);
157 write = next;
158 }
159 }
160 combiner.finish(p);
161 } catch (Throwable cause) {
162 p.setFailure(cause);
163 }
164 assertEmpty();
165 return p;
166 }
167
168
169
170
171
172 public void removeAndFailAll(Throwable cause) {
173 assert executor.inEventLoop();
174 ObjectUtil.checkNotNull(cause, "cause");
175
176
177 for (PendingWrite write = head; write != null; write = head) {
178 head = tail = null;
179 size = 0;
180 bytes = 0;
181 while (write != null) {
182 PendingWrite next = write.next;
183 ReferenceCountUtil.safeRelease(write.msg);
184 ChannelPromise promise = write.promise;
185 recycle(write, false);
186 safeFail(promise, cause);
187 write = next;
188 }
189 }
190 assertEmpty();
191 }
192
193
194
195
196
197 public void removeAndFail(Throwable cause) {
198 assert executor.inEventLoop();
199 ObjectUtil.checkNotNull(cause, "cause");
200
201 PendingWrite write = head;
202 if (write == null) {
203 return;
204 }
205 ReferenceCountUtil.safeRelease(write.msg);
206 ChannelPromise promise = write.promise;
207 safeFail(promise, cause);
208 recycle(write, true);
209 }
210
211 private void assertEmpty() {
212 assert tail == null && head == null && size == 0;
213 }
214
215
216
217
218
219
220
221
222 public ChannelFuture removeAndWrite() {
223 assert executor.inEventLoop();
224 PendingWrite write = head;
225 if (write == null) {
226 return null;
227 }
228 Object msg = write.msg;
229 ChannelPromise promise = write.promise;
230 recycle(write, true);
231 return invoker.write(msg, promise);
232 }
233
234
235
236
237
238
239
240 public ChannelPromise remove() {
241 assert executor.inEventLoop();
242 PendingWrite write = head;
243 if (write == null) {
244 return null;
245 }
246 ChannelPromise promise = write.promise;
247 ReferenceCountUtil.safeRelease(write.msg);
248 recycle(write, true);
249 return promise;
250 }
251
252
253
254
255 public Object current() {
256 assert executor.inEventLoop();
257 PendingWrite write = head;
258 if (write == null) {
259 return null;
260 }
261 return write.msg;
262 }
263
264 private void recycle(PendingWrite write, boolean update) {
265 final PendingWrite next = write.next;
266 final long writeSize = write.size;
267
268 if (update) {
269 if (next == null) {
270
271
272 head = tail = null;
273 size = 0;
274 bytes = 0;
275 } else {
276 head = next;
277 size --;
278 bytes -= writeSize;
279 assert size > 0 && bytes >= 0;
280 }
281 }
282
283 write.recycle();
284 tracker.decrementPendingOutboundBytes(writeSize);
285 }
286
287 private static void safeFail(ChannelPromise promise, Throwable cause) {
288 if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
289 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
290 }
291 }
292
293
294
295
296 static final class PendingWrite {
297 private static final ObjectPool<PendingWrite> RECYCLER = ObjectPool.newPool(new ObjectCreator<PendingWrite>() {
298 @Override
299 public PendingWrite newObject(ObjectPool.Handle<PendingWrite> handle) {
300 return new PendingWrite(handle);
301 }
302 });
303
304 private final ObjectPool.Handle<PendingWrite> handle;
305 private PendingWrite next;
306 private long size;
307 private ChannelPromise promise;
308 private Object msg;
309
310 private PendingWrite(ObjectPool.Handle<PendingWrite> handle) {
311 this.handle = handle;
312 }
313
314 static PendingWrite newInstance(Object msg, int size, ChannelPromise promise) {
315 PendingWrite write = RECYCLER.get();
316 write.size = size;
317 write.msg = msg;
318 write.promise = promise;
319 return write;
320 }
321
322 private void recycle() {
323 size = 0;
324 next = null;
325 msg = null;
326 promise = null;
327 handle.recycle(this);
328 }
329 }
330 }