1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.kqueue;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.channel.AdaptiveRecvBufferAllocator;
21 import io.netty5.channel.ChannelException;
22 import io.netty5.channel.ChannelMetadata;
23 import io.netty5.channel.ChannelOption;
24 import io.netty5.channel.ChannelOutboundBuffer;
25 import io.netty5.channel.ChannelPipeline;
26 import io.netty5.channel.ChannelShutdownDirection;
27 import io.netty5.channel.DefaultFileRegion;
28 import io.netty5.channel.EventLoop;
29 import io.netty5.channel.FileRegion;
30 import io.netty5.channel.RecvBufferAllocator;
31 import io.netty5.channel.internal.ChannelUtils;
32 import io.netty5.channel.socket.SocketChannel;
33 import io.netty5.channel.socket.SocketProtocolFamily;
34 import io.netty5.channel.unix.DomainSocketReadMode;
35 import io.netty5.channel.unix.FileDescriptor;
36 import io.netty5.channel.unix.IntegerUnixChannelOption;
37 import io.netty5.channel.unix.IovArray;
38 import io.netty5.channel.unix.PeerCredentials;
39 import io.netty5.channel.unix.RawUnixChannelOption;
40 import io.netty5.channel.unix.SocketWritableByteChannel;
41 import io.netty5.channel.unix.UnixChannel;
42 import io.netty5.channel.unix.UnixChannelOption;
43 import io.netty5.channel.unix.UnixChannelUtil;
44 import io.netty5.util.Resource;
45 import io.netty5.util.concurrent.Future;
46 import io.netty5.util.concurrent.GlobalEventExecutor;
47 import io.netty5.util.internal.PlatformDependent;
48 import io.netty5.util.internal.StringUtil;
49 import io.netty5.util.internal.UnstableApi;
50
51 import java.io.IOException;
52 import java.net.InetSocketAddress;
53 import java.net.ProtocolFamily;
54 import java.net.SocketAddress;
55 import java.nio.ByteBuffer;
56 import java.nio.channels.NotYetConnectedException;
57 import java.nio.channels.WritableByteChannel;
58 import java.util.Set;
59 import java.util.concurrent.Executor;
60 import java.util.function.Predicate;
61
62 import static io.netty5.channel.ChannelOption.IP_TOS;
63 import static io.netty5.channel.ChannelOption.SO_KEEPALIVE;
64 import static io.netty5.channel.ChannelOption.SO_LINGER;
65 import static io.netty5.channel.ChannelOption.SO_RCVBUF;
66 import static io.netty5.channel.ChannelOption.SO_REUSEADDR;
67 import static io.netty5.channel.ChannelOption.SO_SNDBUF;
68 import static io.netty5.channel.ChannelOption.TCP_NODELAY;
69 import static io.netty5.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
70 import static io.netty5.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
71 import static io.netty5.channel.kqueue.KQueueChannelOption.SO_SNDLOWAT;
72 import static io.netty5.channel.kqueue.KQueueChannelOption.TCP_NOPUSH;
73 import static io.netty5.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE;
74 import static java.util.Objects.requireNonNull;
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 @UnstableApi
103 public final class KQueueSocketChannel
104 extends AbstractKQueueChannel<KQueueServerSocketChannel>
105 implements SocketChannel {
106 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
107 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN_SOCKET = supportedOptionsDomainSocket();
108
109 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
110 private static final String EXPECTED_TYPES =
111 " (expected: " + StringUtil.simpleClassName(Buffer.class) + ", " +
112 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
113 private WritableByteChannel byteChannel;
114
115
116
117 private final Runnable flushTask = this::writeFlushed;
118
119 private volatile DomainSocketReadMode mode = DomainSocketReadMode.BYTES;
120
121 private volatile boolean tcpFastopen;
122
123 public KQueueSocketChannel(EventLoop eventLoop) {
124 this(eventLoop, (ProtocolFamily) null);
125 }
126
127 public KQueueSocketChannel(EventLoop eventLoop, ProtocolFamily protocolFamily) {
128 super(null, eventLoop, METADATA, new AdaptiveRecvBufferAllocator(), BsdSocket.newSocket(protocolFamily), false);
129 enableTcpNoDelayIfSupported();
130 calculateMaxBytesPerGatheringWrite();
131 }
132
133 public KQueueSocketChannel(EventLoop eventLoop, int fd, ProtocolFamily protocolFamily) {
134 this(eventLoop, new BsdSocket(fd, SocketProtocolFamily.of(protocolFamily)));
135 }
136
137 private KQueueSocketChannel(EventLoop eventLoop, BsdSocket fd) {
138 super(null, eventLoop, METADATA, new AdaptiveRecvBufferAllocator(), fd, isSoErrorZero(fd));
139 enableTcpNoDelayIfSupported();
140 calculateMaxBytesPerGatheringWrite();
141 }
142
143 KQueueSocketChannel(KQueueServerSocketChannel parent, EventLoop eventLoop,
144 BsdSocket fd, SocketAddress remoteAddress) {
145 super(parent, eventLoop, METADATA, new AdaptiveRecvBufferAllocator(), fd, remoteAddress);
146 enableTcpNoDelayIfSupported();
147 calculateMaxBytesPerGatheringWrite();
148 }
149
150 private void enableTcpNoDelayIfSupported() {
151 if (socket.protocolFamily() != SocketProtocolFamily.UNIX && PlatformDependent.canEnableTcpNoDelayByDefault()) {
152 setTcpNoDelay(true);
153 }
154 }
155
156 @SuppressWarnings("unchecked")
157 @Override
158 protected <T> T getExtendedOption(ChannelOption<T> option) {
159 if (isSupported(socket.protocolFamily(), option)) {
160 if (option == SO_RCVBUF) {
161 return (T) Integer.valueOf(getReceiveBufferSize());
162 }
163 if (option == SO_SNDBUF) {
164 return (T) Integer.valueOf(getSendBufferSize());
165 }
166 if (option == TCP_NODELAY) {
167 return (T) Boolean.valueOf(isTcpNoDelay());
168 }
169 if (option == SO_KEEPALIVE) {
170 return (T) Boolean.valueOf(isKeepAlive());
171 }
172 if (option == SO_REUSEADDR) {
173 return (T) Boolean.valueOf(isReuseAddress());
174 }
175 if (option == SO_LINGER) {
176 return (T) Integer.valueOf(getSoLinger());
177 }
178 if (option == IP_TOS) {
179 return (T) Integer.valueOf(getTrafficClass());
180 }
181 if (option == SO_SNDLOWAT) {
182 return (T) Integer.valueOf(getSndLowAt());
183 }
184 if (option == TCP_NOPUSH) {
185 return (T) Boolean.valueOf(isTcpNoPush());
186 }
187 if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
188 return (T) Boolean.valueOf(isTcpFastOpenConnect());
189 }
190 if (option == DOMAIN_SOCKET_READ_MODE) {
191 return (T) getReadMode();
192 }
193 if (option == UnixChannelOption.SO_PEERCRED) {
194 return (T) getPeerCredentials();
195 }
196 }
197 return super.getExtendedOption(option);
198 }
199
200 @Override
201 protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
202 if (isSupported(socket.protocolFamily(), option)) {
203 if (option == SO_RCVBUF) {
204 setReceiveBufferSize((Integer) value);
205 } else if (option == SO_SNDBUF) {
206 setSendBufferSize((Integer) value);
207 } else if (option == TCP_NODELAY) {
208 setTcpNoDelay((Boolean) value);
209 } else if (option == SO_KEEPALIVE) {
210 setKeepAlive((Boolean) value);
211 } else if (option == SO_REUSEADDR) {
212 setReuseAddress((Boolean) value);
213 } else if (option == SO_LINGER) {
214 setSoLinger((Integer) value);
215 } else if (option == IP_TOS) {
216 setTrafficClass((Integer) value);
217 } else if (option == SO_SNDLOWAT) {
218 setSndLowAt((Integer) value);
219 } else if (option == TCP_NOPUSH) {
220 setTcpNoPush((Boolean) value);
221 } else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
222 setTcpFastOpenConnect((Boolean) value);
223 } else if (option == DOMAIN_SOCKET_READ_MODE) {
224 setReadMode((DomainSocketReadMode) value);
225 } else if (option == UnixChannelOption.SO_PEERCRED) {
226 throw new UnsupportedOperationException("read-only option: " + option);
227 }
228 } else {
229 super.setExtendedOption(option, value);
230 }
231 }
232
233 private boolean isSupported(SocketProtocolFamily protocolFamily, ChannelOption<?> option) {
234 if (protocolFamily == SocketProtocolFamily.UNIX) {
235 return SUPPORTED_OPTIONS_DOMAIN_SOCKET.contains(option);
236 }
237 return SUPPORTED_OPTIONS.contains(option);
238 }
239
240 @Override
241 protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
242 return isSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
243 }
244
245 private static Set<ChannelOption<?>> supportedOptions() {
246 return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_SNDBUF, TCP_NODELAY,
247 SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS, SO_SNDLOWAT, TCP_NOPUSH,
248 ChannelOption.TCP_FASTOPEN_CONNECT);
249 }
250
251 private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
252 return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_SNDBUF, DOMAIN_SOCKET_READ_MODE,
253 UnixChannelOption.SO_PEERCRED);
254 }
255
256 private void setReadMode(DomainSocketReadMode mode) {
257 requireNonNull(mode, "mode");
258 this.mode = mode;
259 }
260
261 private DomainSocketReadMode getReadMode() {
262 return mode;
263 }
264
265 private int getReceiveBufferSize() {
266 try {
267 return socket.getReceiveBufferSize();
268 } catch (IOException e) {
269 throw new ChannelException(e);
270 }
271 }
272
273 private int getSendBufferSize() {
274 try {
275 return socket.getSendBufferSize();
276 } catch (IOException e) {
277 throw new ChannelException(e);
278 }
279 }
280
281 private int getSoLinger() {
282 try {
283 return socket.getSoLinger();
284 } catch (IOException e) {
285 throw new ChannelException(e);
286 }
287 }
288
289 private int getTrafficClass() {
290 try {
291 return socket.getTrafficClass();
292 } catch (IOException e) {
293 throw new ChannelException(e);
294 }
295 }
296
297 private boolean isKeepAlive() {
298 try {
299 return socket.isKeepAlive();
300 } catch (IOException e) {
301 throw new ChannelException(e);
302 }
303 }
304
305 private boolean isReuseAddress() {
306 try {
307 return socket.isReuseAddress();
308 } catch (IOException e) {
309 throw new ChannelException(e);
310 }
311 }
312
313 private boolean isTcpNoDelay() {
314 try {
315 return socket.isTcpNoDelay();
316 } catch (IOException e) {
317 throw new ChannelException(e);
318 }
319 }
320
321 private int getSndLowAt() {
322 try {
323 return socket.getSndLowAt();
324 } catch (IOException e) {
325 throw new ChannelException(e);
326 }
327 }
328
329 private void setSndLowAt(int sndLowAt) {
330 try {
331 socket.setSndLowAt(sndLowAt);
332 } catch (IOException e) {
333 throw new ChannelException(e);
334 }
335 }
336
337 private boolean isTcpNoPush() {
338 try {
339 return socket.isTcpNoPush();
340 } catch (IOException e) {
341 throw new ChannelException(e);
342 }
343 }
344
345 private void setTcpNoPush(boolean tcpNoPush) {
346 try {
347 socket.setTcpNoPush(tcpNoPush);
348 } catch (IOException e) {
349 throw new ChannelException(e);
350 }
351 }
352
353 private void setKeepAlive(boolean keepAlive) {
354 try {
355 socket.setKeepAlive(keepAlive);
356 } catch (IOException e) {
357 throw new ChannelException(e);
358 }
359 }
360
361 private void setReceiveBufferSize(int receiveBufferSize) {
362 try {
363 socket.setReceiveBufferSize(receiveBufferSize);
364 } catch (IOException e) {
365 throw new ChannelException(e);
366 }
367 }
368
369 private void setReuseAddress(boolean reuseAddress) {
370 try {
371 socket.setReuseAddress(reuseAddress);
372 } catch (IOException e) {
373 throw new ChannelException(e);
374 }
375 }
376
377 private void setSendBufferSize(int sendBufferSize) {
378 try {
379 socket.setSendBufferSize(sendBufferSize);
380 calculateMaxBytesPerGatheringWrite();
381 } catch (IOException e) {
382 throw new ChannelException(e);
383 }
384 }
385
386 private void setSoLinger(int soLinger) {
387 try {
388 socket.setSoLinger(soLinger);
389 } catch (IOException e) {
390 throw new ChannelException(e);
391 }
392 }
393
394 private void setTcpNoDelay(boolean tcpNoDelay) {
395 try {
396 socket.setTcpNoDelay(tcpNoDelay);
397 } catch (IOException e) {
398 throw new ChannelException(e);
399 }
400 }
401
402 private void setTrafficClass(int trafficClass) {
403 try {
404 socket.setTrafficClass(trafficClass);
405 } catch (IOException e) {
406 throw new ChannelException(e);
407 }
408 }
409
410
411
412
413 private void setTcpFastOpenConnect(boolean fastOpenConnect) {
414 tcpFastopen = fastOpenConnect;
415 }
416
417
418
419
420 private boolean isTcpFastOpenConnect() {
421 return tcpFastopen;
422 }
423
424 private void calculateMaxBytesPerGatheringWrite() {
425
426 int newSendBufferSize = getSendBufferSize() << 1;
427 if (newSendBufferSize > 0) {
428 setMaxBytesPerGatheringWrite(getSendBufferSize() << 1);
429 }
430 }
431
432 @Override
433 protected Object filterOutboundMessage(Object msg) {
434 if (socket.protocolFamily() == SocketProtocolFamily.UNIX && msg instanceof FileDescriptor) {
435 return msg;
436 }
437 if (msg instanceof Buffer) {
438 Buffer buf = (Buffer) msg;
439 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
440 }
441
442 if (msg instanceof FileRegion) {
443 return msg;
444 }
445
446 throw new UnsupportedOperationException(
447 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
448 }
449
450 @Override
451 protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
452 if (isTcpFastOpenConnect()) {
453 ChannelOutboundBuffer outbound = outboundBuffer();
454 outbound.addFlush();
455 Object curr;
456 if ((curr = outbound.current()) instanceof Buffer) {
457 Buffer initialData = (Buffer) curr;
458
459 if (initialData.readableBytes() > 0) {
460 IovArray iov = new IovArray();
461 try {
462 initialData.forEachReadable(0, iov);
463 int bytesSent = socket.connectx(
464 (InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);
465 writeFilter(true);
466 outbound.removeBytes(Math.abs(bytesSent));
467
468
469 return bytesSent > 0;
470 } finally {
471 iov.release();
472 }
473 }
474 }
475 }
476 return super.doConnect0(remoteAddress, localAddress);
477 }
478
479 @Override
480 protected Future<Executor> prepareToClose() {
481 if (socket.protocolFamily() != SocketProtocolFamily.UNIX) {
482 try {
483
484
485 if (isOpen() && getSoLinger() > 0) {
486
487
488
489
490 return executor().deregisterForIo(this).map(v -> GlobalEventExecutor.INSTANCE);
491 }
492 } catch (Throwable ignore) {
493
494
495
496 }
497 }
498 return null;
499 }
500
501 @Override
502 void readReady(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
503 Predicate<RecvBufferAllocator.Handle> maybeMoreData) {
504 if (socket.protocolFamily() == SocketProtocolFamily.UNIX &&
505 getReadMode() == DomainSocketReadMode.FILE_DESCRIPTORS) {
506 readReadyFd(allocHandle);
507 } else {
508 readReadyBytes(allocHandle, recvBufferAllocator, maybeMoreData);
509 }
510 }
511
512 private void readReadyFd(RecvBufferAllocator.Handle allocHandle) {
513 final ChannelPipeline pipeline = pipeline();
514 try {
515 readLoop: do {
516
517
518
519 int recvFd = socket.recvFd();
520 switch(recvFd) {
521 case 0:
522 allocHandle.lastBytesRead(0);
523 break readLoop;
524 case -1:
525 allocHandle.lastBytesRead(-1);
526 closeTransportNow();
527 return;
528 default:
529 allocHandle.lastBytesRead(1);
530 allocHandle.incMessagesRead(1);
531 readPending = false;
532 pipeline.fireChannelRead(new FileDescriptor(recvFd));
533 break;
534 }
535 } while (allocHandle.continueReading(isAutoRead()) && !isShutdown(ChannelShutdownDirection.Inbound));
536
537 allocHandle.readComplete();
538 pipeline.fireChannelReadComplete();
539 } catch (Throwable t) {
540 allocHandle.readComplete();
541 pipeline.fireChannelReadComplete();
542 pipeline.fireChannelExceptionCaught(t);
543 } finally {
544 readIfIsAutoRead();
545 }
546 }
547
548 private PeerCredentials getPeerCredentials() {
549 try {
550 return socket.getPeerCredentials();
551 } catch (IOException e) {
552 throw new ChannelException(e);
553 }
554 }
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570 private int writeBytes(ChannelOutboundBuffer in, Buffer buf) throws Exception {
571 int readableBytes = buf.readableBytes();
572 if (readableBytes == 0) {
573 in.remove();
574 return 0;
575 }
576
577 int readableComponents = buf.countReadableComponents();
578 if (readableComponents == 1) {
579 return doWriteBytes(in, buf);
580 }
581 ByteBuffer[] nioBuffers = new ByteBuffer[readableComponents];
582 buf.forEachReadable(0, (index, component) -> {
583 nioBuffers[index] = component.readableBuffer();
584 return true;
585 });
586 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
587 getMaxBytesPerGatheringWrite());
588 }
589
590 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
591
592
593
594 if (attempted == written) {
595 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
596 setMaxBytesPerGatheringWrite(attempted << 1);
597 }
598 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
599 setMaxBytesPerGatheringWrite(attempted >>> 1);
600 }
601 }
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
619 final long expectedWrittenBytes = array.size();
620 assert expectedWrittenBytes != 0;
621 final int cnt = array.count();
622 assert cnt != 0;
623
624 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
625 if (localWrittenBytes > 0) {
626 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
627 in.removeBytes(localWrittenBytes);
628 return 1;
629 }
630 return WRITE_STATUS_SNDBUF_FULL;
631 }
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651 private int writeBytesMultiple(
652 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
653 long maxBytesPerGatheringWrite) throws IOException {
654 assert expectedWrittenBytes != 0;
655 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
656 expectedWrittenBytes = maxBytesPerGatheringWrite;
657 }
658
659 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
660 if (localWrittenBytes > 0) {
661 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
662 in.removeBytes(localWrittenBytes);
663 return 1;
664 }
665 return WRITE_STATUS_SNDBUF_FULL;
666 }
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
683 final long regionCount = region.count();
684 final long offset = region.transferred();
685
686 if (offset >= regionCount) {
687 in.remove();
688 return 0;
689 }
690
691 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
692 if (flushedAmount > 0) {
693 in.progress(flushedAmount);
694 if (region.transferred() >= regionCount) {
695 in.remove();
696 }
697 return 1;
698 }
699 if (flushedAmount == 0) {
700 validateFileRegion(region, offset);
701 }
702 return WRITE_STATUS_SNDBUF_FULL;
703 }
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
720 if (region.transferred() >= region.count()) {
721 in.remove();
722 return 0;
723 }
724
725 if (byteChannel == null) {
726 byteChannel = new KQueueSocketWritableByteChannel();
727 }
728 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
729 if (flushedAmount > 0) {
730 in.progress(flushedAmount);
731 if (region.transferred() >= region.count()) {
732 in.remove();
733 }
734 return 1;
735 }
736 return WRITE_STATUS_SNDBUF_FULL;
737 }
738
739 @Override
740 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
741 int writeSpinCount = getWriteSpinCount();
742 do {
743 final int msgCount = in.size();
744
745 if (msgCount > 1 && in.current() instanceof Buffer) {
746 writeSpinCount -= doWriteMultiple(in);
747 } else if (msgCount == 0) {
748
749 writeFilter(false);
750
751 return;
752 } else {
753 writeSpinCount -= doWriteSingle(in);
754 }
755
756
757
758
759 } while (writeSpinCount > 0);
760
761 if (writeSpinCount == 0) {
762
763
764
765
766 writeFilter(false);
767
768
769 executor().execute(flushTask);
770 } else {
771
772
773 writeFilter(true);
774 }
775 }
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791 private int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
792
793 Object msg = in.current();
794 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
795 if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
796
797 in.remove();
798 return 1;
799 }
800 }
801
802 if (msg instanceof Buffer) {
803 return writeBytes(in, (Buffer) msg);
804 } else if (msg instanceof DefaultFileRegion) {
805 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
806 } else if (msg instanceof FileRegion) {
807 return writeFileRegion(in, (FileRegion) msg);
808 } else {
809
810 throw new Error();
811 }
812 }
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
829 final long maxBytesPerGatheringWrite = getMaxBytesPerGatheringWrite();
830 IovArray array = registration().cleanArray();
831 array.maxBytes(maxBytesPerGatheringWrite);
832 in.forEachFlushedMessage(array);
833
834 if (array.count() >= 1) {
835 return writeBytesMultiple(in, array);
836 }
837
838 in.removeBytes(0);
839 return 0;
840 }
841
842 @Override
843 protected void doShutdown(ChannelShutdownDirection direction) throws Exception {
844 switch (direction) {
845 case Outbound:
846 socket.shutdown(false, true);
847 break;
848 case Inbound:
849 try {
850 socket.shutdown(true, false);
851 } catch (NotYetConnectedException ignore) {
852
853
854 }
855 break;
856 default:
857 throw new AssertionError();
858 }
859 }
860
861 @Override
862 public boolean isShutdown(ChannelShutdownDirection direction) {
863 if (!isActive()) {
864 return true;
865 }
866 switch (direction) {
867 case Outbound:
868 return socket.isOutputShutdown();
869 case Inbound:
870 return socket.isInputShutdown();
871 default:
872 throw new AssertionError();
873 }
874 }
875
876 private void readReadyBytes(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
877 Predicate<RecvBufferAllocator.Handle> maybeMoreData) {
878 final ChannelPipeline pipeline = pipeline();
879 allocHandle.reset();
880 Buffer buffer = null;
881 boolean close = false;
882 try {
883 do {
884
885
886 buffer = allocHandle.allocate(recvBufferAllocator);
887 doReadBytes(buffer);
888 if (allocHandle.lastBytesRead() <= 0) {
889
890 Resource.dispose(buffer);
891 buffer = null;
892 close = allocHandle.lastBytesRead() < 0;
893 if (close) {
894
895 readPending = false;
896 }
897 break;
898 }
899 allocHandle.incMessagesRead(1);
900 readPending = false;
901 pipeline.fireChannelRead(buffer);
902 buffer = null;
903
904 if (shouldBreakReadReady()) {
905
906
907
908
909
910
911
912
913
914
915
916 break;
917 }
918 } while (allocHandle.continueReading(isAutoRead(), maybeMoreData)
919 && !isShutdown(ChannelShutdownDirection.Inbound));
920
921 allocHandle.readComplete();
922 pipeline.fireChannelReadComplete();
923
924 if (close) {
925 shutdownInput(false);
926 } else {
927 readIfIsAutoRead();
928 }
929 } catch (Throwable t) {
930 handleReadException(pipeline, buffer, t, close, allocHandle);
931 }
932 }
933
934 private void handleReadException(ChannelPipeline pipeline, Buffer buffer, Throwable cause, boolean close,
935 RecvBufferAllocator.Handle allocHandle) {
936 if (buffer.readableBytes() > 0) {
937 readPending = false;
938 pipeline.fireChannelRead(buffer);
939 } else {
940 buffer.close();
941 }
942 if (isConnectPending()) {
943 finishConnect();
944 } else {
945 allocHandle.readComplete();
946 pipeline.fireChannelReadComplete();
947 pipeline.fireChannelExceptionCaught(cause);
948
949
950
951 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
952 shutdownInput(false);
953 } else {
954 readIfIsAutoRead();
955 }
956 }
957 }
958
959 private final class KQueueSocketWritableByteChannel extends SocketWritableByteChannel {
960 KQueueSocketWritableByteChannel() {
961 super(socket);
962 }
963
964 @Override
965 protected BufferAllocator alloc() {
966 return bufferAllocator();
967 }
968 }
969 }