1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.Recycler;
19 import io.netty.util.ReferenceCountUtil;
20 import io.netty.util.internal.logging.InternalLogger;
21 import io.netty.util.internal.logging.InternalLoggerFactory;
22
23
24
25
26
27
28 public final class PendingWriteQueue {
29 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class);
30
31 private final ChannelHandlerContext ctx;
32 private final ChannelOutboundBuffer buffer;
33 private final MessageSizeEstimator.Handle estimatorHandle;
34
35
36 private PendingWrite head;
37 private PendingWrite tail;
38 private int size;
39
40 public PendingWriteQueue(ChannelHandlerContext ctx) {
41 if (ctx == null) {
42 throw new NullPointerException("ctx");
43 }
44 this.ctx = ctx;
45 buffer = ctx.channel().unsafe().outboundBuffer();
46 estimatorHandle = ctx.channel().config().getMessageSizeEstimator().newHandle();
47 }
48
49
50
51
52 public boolean isEmpty() {
53 assert ctx.executor().inEventLoop();
54 return head == null;
55 }
56
57
58
59
60 public int size() {
61 assert ctx.executor().inEventLoop();
62 return size;
63 }
64
65
66
67
68 public void add(Object msg, ChannelPromise promise) {
69 assert ctx.executor().inEventLoop();
70 if (msg == null) {
71 throw new NullPointerException("msg");
72 }
73 if (promise == null) {
74 throw new NullPointerException("promise");
75 }
76 int messageSize = estimatorHandle.size(msg);
77 if (messageSize < 0) {
78
79 messageSize = 0;
80 }
81 PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise);
82 PendingWrite currentTail = tail;
83 if (currentTail == null) {
84 tail = head = write;
85 } else {
86 currentTail.next = write;
87 tail = write;
88 }
89 size ++;
90 buffer.incrementPendingOutboundBytes(write.size);
91 }
92
93
94
95
96
97 public void removeAndFailAll(Throwable cause) {
98 assert ctx.executor().inEventLoop();
99 if (cause == null) {
100 throw new NullPointerException("cause");
101 }
102
103 PendingWrite write = head;
104 head = tail = null;
105 size = 0;
106
107 while (write != null) {
108 PendingWrite next = write.next;
109 ReferenceCountUtil.safeRelease(write.msg);
110 ChannelPromise promise = write.promise;
111 recycle(write, false);
112 safeFail(promise, cause);
113 write = next;
114 }
115 assertEmpty();
116 }
117
118
119
120
121
122 public void removeAndFail(Throwable cause) {
123 assert ctx.executor().inEventLoop();
124 if (cause == null) {
125 throw new NullPointerException("cause");
126 }
127 PendingWrite write = head;
128
129 if (write == null) {
130 return;
131 }
132 ReferenceCountUtil.safeRelease(write.msg);
133 ChannelPromise promise = write.promise;
134 safeFail(promise, cause);
135 recycle(write, true);
136 }
137
138
139
140
141
142
143
144
145 public ChannelFuture removeAndWriteAll() {
146 assert ctx.executor().inEventLoop();
147
148 if (size == 1) {
149
150 return removeAndWrite();
151 }
152 PendingWrite write = head;
153 if (write == null) {
154
155 return null;
156 }
157
158
159 head = tail = null;
160 size = 0;
161
162 ChannelPromise p = ctx.newPromise();
163 ChannelPromiseAggregator aggregator = new ChannelPromiseAggregator(p);
164 while (write != null) {
165 PendingWrite next = write.next;
166 Object msg = write.msg;
167 ChannelPromise promise = write.promise;
168 recycle(write, false);
169 ctx.write(msg, promise);
170 aggregator.add(promise);
171 write = next;
172 }
173 assertEmpty();
174 return p;
175 }
176
177 private void assertEmpty() {
178 assert tail == null && head == null && size == 0;
179 }
180
181
182
183
184
185
186
187
188 public ChannelFuture removeAndWrite() {
189 assert ctx.executor().inEventLoop();
190 PendingWrite write = head;
191 if (write == null) {
192 return null;
193 }
194 Object msg = write.msg;
195 ChannelPromise promise = write.promise;
196 recycle(write, true);
197 return ctx.write(msg, promise);
198 }
199
200
201
202
203
204
205
206 public ChannelPromise remove() {
207 assert ctx.executor().inEventLoop();
208 PendingWrite write = head;
209 if (write == null) {
210 return null;
211 }
212 ChannelPromise promise = write.promise;
213 ReferenceCountUtil.safeRelease(write.msg);
214 recycle(write, true);
215 return promise;
216 }
217
218
219
220
221 public Object current() {
222 assert ctx.executor().inEventLoop();
223 PendingWrite write = head;
224 if (write == null) {
225 return null;
226 }
227 return write.msg;
228 }
229
230 private void recycle(PendingWrite write, boolean update) {
231 final PendingWrite next = write.next;
232 final long writeSize = write.size;
233
234 if (update) {
235 if (next == null) {
236
237
238 head = tail = null;
239 size = 0;
240 } else {
241 head = next;
242 size --;
243 assert size > 0;
244 }
245 }
246
247 write.recycle();
248 buffer.decrementPendingOutboundBytes(writeSize);
249 }
250
251 private static void safeFail(ChannelPromise promise, Throwable cause) {
252 if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
253 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
254 }
255 }
256
257
258
259
260 static final class PendingWrite {
261 private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() {
262 @Override
263 protected PendingWrite newObject(Handle handle) {
264 return new PendingWrite(handle);
265 }
266 };
267
268 private final Recycler.Handle handle;
269 private PendingWrite next;
270 private long size;
271 private ChannelPromise promise;
272 private Object msg;
273
274 private PendingWrite(Recycler.Handle handle) {
275 this.handle = handle;
276 }
277
278 static PendingWrite newInstance(Object msg, int size, ChannelPromise promise) {
279 PendingWrite write = RECYCLER.get();
280 write.size = size;
281 write.msg = msg;
282 write.promise = promise;
283 return write;
284 }
285
286 private void recycle() {
287 size = 0;
288 next = null;
289 msg = null;
290 promise = null;
291 RECYCLER.recycle(this, handle);
292 }
293 }
294 }