1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.internal.ChannelUtils;
32 import io.netty.channel.socket.DuplexChannel;
33 import io.netty.channel.unix.IovArray;
34 import io.netty.channel.unix.SocketWritableByteChannel;
35 import io.netty.channel.unix.UnixChannelUtil;
36 import io.netty.util.internal.StringUtil;
37 import io.netty.util.internal.UnstableApi;
38 import io.netty.util.internal.logging.InternalLogger;
39 import io.netty.util.internal.logging.InternalLoggerFactory;
40
41 import java.io.IOException;
42 import java.net.SocketAddress;
43 import java.nio.ByteBuffer;
44 import java.nio.channels.WritableByteChannel;
45 import java.util.concurrent.Executor;
46
47 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
48 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
49
50 public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
51 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class);
52 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
53 private static final String EXPECTED_TYPES =
54 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
55 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
56 private WritableByteChannel byteChannel;
57 private final Runnable flushTask = new Runnable() {
58 @Override
59 public void run() {
60
61
62 ((AbstractKQueueUnsafe) unsafe()).flush0();
63 }
64 };
65
66 AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) {
67 super(parent, fd, active);
68 }
69
70 AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
71 super(parent, fd, remote);
72 }
73
74 AbstractKQueueStreamChannel(BsdSocket fd) {
75 this(null, fd, isSoErrorZero(fd));
76 }
77
78 @Override
79 protected AbstractKQueueUnsafe newUnsafe() {
80 return new KQueueStreamUnsafe();
81 }
82
83 @Override
84 public ChannelMetadata metadata() {
85 return METADATA;
86 }
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
103 int readableBytes = buf.readableBytes();
104 if (readableBytes == 0) {
105 in.remove();
106 return 0;
107 }
108
109 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
110 return doWriteBytes(in, buf);
111 } else {
112 ByteBuffer[] nioBuffers = buf.nioBuffers();
113 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
114 config().getMaxBytesPerGatheringWrite());
115 }
116 }
117
118 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
119
120
121
122 if (attempted == written) {
123 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
124 config().setMaxBytesPerGatheringWrite(attempted << 1);
125 }
126 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
127 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
128 }
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
147 final long expectedWrittenBytes = array.size();
148 assert expectedWrittenBytes != 0;
149 final int cnt = array.count();
150 assert cnt != 0;
151
152 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
153 if (localWrittenBytes > 0) {
154 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
155 in.removeBytes(localWrittenBytes);
156 return 1;
157 }
158 return WRITE_STATUS_SNDBUF_FULL;
159 }
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179 private int writeBytesMultiple(
180 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
181 long maxBytesPerGatheringWrite) throws IOException {
182 assert expectedWrittenBytes != 0;
183 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
184 expectedWrittenBytes = maxBytesPerGatheringWrite;
185 }
186
187 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
188 if (localWrittenBytes > 0) {
189 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
190 in.removeBytes(localWrittenBytes);
191 return 1;
192 }
193 return WRITE_STATUS_SNDBUF_FULL;
194 }
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
211 final long regionCount = region.count();
212 final long offset = region.transferred();
213
214 if (offset >= regionCount) {
215 in.remove();
216 return 0;
217 }
218
219 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
220 if (flushedAmount > 0) {
221 in.progress(flushedAmount);
222 if (region.transferred() >= regionCount) {
223 in.remove();
224 }
225 return 1;
226 } else if (flushedAmount == 0) {
227 validateFileRegion(region, offset);
228 }
229 return WRITE_STATUS_SNDBUF_FULL;
230 }
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
247 if (region.transferred() >= region.count()) {
248 in.remove();
249 return 0;
250 }
251
252 if (byteChannel == null) {
253 byteChannel = new KQueueSocketWritableByteChannel();
254 }
255 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
256 if (flushedAmount > 0) {
257 in.progress(flushedAmount);
258 if (region.transferred() >= region.count()) {
259 in.remove();
260 }
261 return 1;
262 }
263 return WRITE_STATUS_SNDBUF_FULL;
264 }
265
266 @Override
267 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
268 int writeSpinCount = config().getWriteSpinCount();
269 do {
270 final int msgCount = in.size();
271
272 if (msgCount > 1 && in.current() instanceof ByteBuf) {
273 writeSpinCount -= doWriteMultiple(in);
274 } else if (msgCount == 0) {
275
276 writeFilter(false);
277
278 return;
279 } else {
280 writeSpinCount -= doWriteSingle(in);
281 }
282
283
284
285
286 } while (writeSpinCount > 0);
287
288 if (writeSpinCount == 0) {
289
290
291
292
293 writeFilter(false);
294
295
296 eventLoop().execute(flushTask);
297 } else {
298
299
300 writeFilter(true);
301 }
302 }
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
319
320 Object msg = in.current();
321 if (msg instanceof ByteBuf) {
322 return writeBytes(in, (ByteBuf) msg);
323 } else if (msg instanceof DefaultFileRegion) {
324 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
325 } else if (msg instanceof FileRegion) {
326 return writeFileRegion(in, (FileRegion) msg);
327 } else {
328
329 throw new Error();
330 }
331 }
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
348 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
349 IovArray array = registration().ioHandler().cleanArray();
350 array.maxBytes(maxBytesPerGatheringWrite);
351 in.forEachFlushedMessage(array);
352
353 if (array.count() >= 1) {
354
355 return writeBytesMultiple(in, array);
356 }
357
358 in.removeBytes(0);
359 return 0;
360 }
361
362 @Override
363 protected Object filterOutboundMessage(Object msg) {
364 if (msg instanceof ByteBuf) {
365 ByteBuf buf = (ByteBuf) msg;
366 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
367 }
368
369 if (msg instanceof FileRegion) {
370 return msg;
371 }
372
373 throw new UnsupportedOperationException(
374 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
375 }
376
377 @UnstableApi
378 @Override
379 protected final void doShutdownOutput() throws Exception {
380 socket.shutdown(false, true);
381 }
382
383 @Override
384 public boolean isOutputShutdown() {
385 return socket.isOutputShutdown();
386 }
387
388 @Override
389 public boolean isInputShutdown() {
390 return socket.isInputShutdown();
391 }
392
393 @Override
394 public boolean isShutdown() {
395 return socket.isShutdown();
396 }
397
398 @Override
399 public ChannelFuture shutdownOutput() {
400 return shutdownOutput(newPromise());
401 }
402
403 @Override
404 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
405 EventLoop loop = eventLoop();
406 if (loop.inEventLoop()) {
407 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
408 } else {
409 loop.execute(new Runnable() {
410 @Override
411 public void run() {
412 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
413 }
414 });
415 }
416 return promise;
417 }
418
419 @Override
420 public ChannelFuture shutdownInput() {
421 return shutdownInput(newPromise());
422 }
423
424 @Override
425 public ChannelFuture shutdownInput(final ChannelPromise promise) {
426 EventLoop loop = eventLoop();
427 if (loop.inEventLoop()) {
428 shutdownInput0(promise);
429 } else {
430 loop.execute(new Runnable() {
431 @Override
432 public void run() {
433 shutdownInput0(promise);
434 }
435 });
436 }
437 return promise;
438 }
439
440 private void shutdownInput0(ChannelPromise promise) {
441 try {
442 socket.shutdown(true, false);
443 } catch (Throwable cause) {
444 promise.setFailure(cause);
445 return;
446 }
447 promise.setSuccess();
448 }
449
450 @Override
451 public ChannelFuture shutdown() {
452 return shutdown(newPromise());
453 }
454
455 @Override
456 public ChannelFuture shutdown(final ChannelPromise promise) {
457 ChannelFuture shutdownOutputFuture = shutdownOutput();
458 if (shutdownOutputFuture.isDone()) {
459 shutdownOutputDone(shutdownOutputFuture, promise);
460 } else {
461 shutdownOutputFuture.addListener(new ChannelFutureListener() {
462 @Override
463 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
464 shutdownOutputDone(shutdownOutputFuture, promise);
465 }
466 });
467 }
468 return promise;
469 }
470
471 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
472 ChannelFuture shutdownInputFuture = shutdownInput();
473 if (shutdownInputFuture.isDone()) {
474 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
475 } else {
476 shutdownInputFuture.addListener(new ChannelFutureListener() {
477 @Override
478 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
479 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
480 }
481 });
482 }
483 }
484
485 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
486 ChannelFuture shutdownInputFuture,
487 ChannelPromise promise) {
488 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
489 Throwable shutdownInputCause = shutdownInputFuture.cause();
490 if (shutdownOutputCause != null) {
491 if (shutdownInputCause != null) {
492 logger.debug("Exception suppressed because a previous exception occurred.",
493 shutdownInputCause);
494 }
495 promise.setFailure(shutdownOutputCause);
496 } else if (shutdownInputCause != null) {
497 promise.setFailure(shutdownInputCause);
498 } else {
499 promise.setSuccess();
500 }
501 }
502
503 class KQueueStreamUnsafe extends AbstractKQueueUnsafe {
504
505 @Override
506 protected Executor prepareToClose() {
507 return super.prepareToClose();
508 }
509
510 @Override
511 void readReady(final KQueueRecvByteAllocatorHandle allocHandle) {
512 final ChannelConfig config = config();
513 if (shouldBreakReadReady(config)) {
514 clearReadFilter0();
515 return;
516 }
517 final ChannelPipeline pipeline = pipeline();
518 final ByteBufAllocator allocator = config.getAllocator();
519 allocHandle.reset(config);
520 readReadyBefore();
521
522 ByteBuf byteBuf = null;
523 boolean close = false;
524 try {
525 do {
526
527
528 byteBuf = allocHandle.allocate(allocator);
529 allocHandle.lastBytesRead(doReadBytes(byteBuf));
530 if (allocHandle.lastBytesRead() <= 0) {
531
532 byteBuf.release();
533 byteBuf = null;
534 close = allocHandle.lastBytesRead() < 0;
535 if (close) {
536
537 readPending = false;
538 }
539 break;
540 }
541 allocHandle.incMessagesRead(1);
542 readPending = false;
543 pipeline.fireChannelRead(byteBuf);
544 byteBuf = null;
545
546 if (shouldBreakReadReady(config)) {
547
548
549
550
551
552
553
554
555
556
557
558 break;
559 }
560 } while (allocHandle.continueReading());
561
562 allocHandle.readComplete();
563 pipeline.fireChannelReadComplete();
564
565 if (close) {
566 shutdownInput(false);
567 }
568 } catch (Throwable t) {
569 handleReadException(pipeline, byteBuf, t, close, allocHandle);
570 } finally {
571 readReadyFinally(config);
572 }
573 }
574
575 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
576 KQueueRecvByteAllocatorHandle allocHandle) {
577 if (byteBuf != null) {
578 if (byteBuf.isReadable()) {
579 readPending = false;
580 pipeline.fireChannelRead(byteBuf);
581 } else {
582 byteBuf.release();
583 }
584 }
585 if (!failConnectPromise(cause)) {
586 allocHandle.readComplete();
587 pipeline.fireChannelReadComplete();
588 pipeline.fireExceptionCaught(cause);
589
590
591
592 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
593 shutdownInput(false);
594 }
595 }
596 }
597 }
598
599 private final class KQueueSocketWritableByteChannel extends SocketWritableByteChannel {
600 KQueueSocketWritableByteChannel() {
601 super(socket);
602 }
603
604 @Override
605 protected ByteBufAllocator alloc() {
606 return AbstractKQueueStreamChannel.this.alloc();
607 }
608 }
609 }