1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.FileDescriptor;
41 import io.netty.channel.unix.UnixChannel;
42 import io.netty.util.ReferenceCountUtil;
43 import io.netty.util.concurrent.Future;
44 import io.netty.util.internal.UnstableApi;
45
46 import java.io.IOException;
47 import java.net.ConnectException;
48 import java.net.InetSocketAddress;
49 import java.net.SocketAddress;
50 import java.nio.ByteBuffer;
51 import java.nio.channels.AlreadyConnectedException;
52 import java.nio.channels.ConnectionPendingException;
53 import java.nio.channels.NotYetConnectedException;
54 import java.nio.channels.UnresolvedAddressException;
55 import java.util.concurrent.TimeUnit;
56
57 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
58 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
59 import static io.netty.util.internal.ObjectUtil.checkNotNull;
60
61 abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
62 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
63
64
65
66
67 private ChannelPromise connectPromise;
68 private Future<?> connectTimeoutFuture;
69 private SocketAddress requestedRemoteAddress;
70
71 final BsdSocket socket;
72 private KQueueIoRegistration registration;
73 private boolean readFilterEnabled;
74 private boolean writeFilterEnabled;
75
76 private final KQueueIoOps readEnabledOps;
77 private final KQueueIoOps writeEnabledOps;
78 private final KQueueIoOps readDisabledOps;
79 private final KQueueIoOps writeDisabledOps;
80
81 boolean readReadyRunnablePending;
82 boolean inputClosedSeenErrorOnRead;
83 protected volatile boolean active;
84 private volatile SocketAddress local;
85 private volatile SocketAddress remote;
86
87 AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
88 super(parent);
89 socket = checkNotNull(fd, "fd");
90 this.active = active;
91 if (active) {
92
93
94 local = fd.localAddress();
95 remote = fd.remoteAddress();
96 }
97
98 readEnabledOps = KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE, 0);
99 writeEnabledOps = KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE, 0);
100 readDisabledOps = KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_DELETE_DISABLE, 0);
101 writeDisabledOps = KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_DELETE_DISABLE, 0);
102 }
103
104 AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
105 super(parent);
106 socket = checkNotNull(fd, "fd");
107 active = true;
108
109
110 this.remote = remote;
111 local = fd.localAddress();
112
113 readEnabledOps = KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE, 0);
114 writeEnabledOps = KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE, 0);
115 readDisabledOps = KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_DELETE_DISABLE, 0);
116 writeDisabledOps = KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_DELETE_DISABLE, 0);
117 }
118
119 @Override
120 protected boolean isCompatible(EventLoop loop) {
121 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractKQueueUnsafe.class);
122 }
123
124 static boolean isSoErrorZero(BsdSocket fd) {
125 try {
126 return fd.getSoError() == 0;
127 } catch (IOException e) {
128 throw new ChannelException(e);
129 }
130 }
131
132 protected final KQueueIoRegistration registration() {
133 assert registration != null;
134 return registration;
135 }
136
137 @Override
138 public final FileDescriptor fd() {
139 return socket;
140 }
141
142 @Override
143 public boolean isActive() {
144 return active;
145 }
146
147 @Override
148 public ChannelMetadata metadata() {
149 return METADATA;
150 }
151
152 @Override
153 protected void doClose() throws Exception {
154 active = false;
155
156
157 inputClosedSeenErrorOnRead = true;
158 socket.close();
159 }
160
161 @Override
162 protected void doDisconnect() throws Exception {
163 doClose();
164 }
165
166 void resetCachedAddresses() {
167 local = socket.localAddress();
168 remote = socket.remoteAddress();
169 }
170
171 @Override
172 public boolean isOpen() {
173 return socket.isOpen();
174 }
175
176 @Override
177 protected void doDeregister() throws Exception {
178
179
180
181 readFilter(false);
182 writeFilter(false);
183 clearRdHup0();
184
185 KQueueIoRegistration registration = this.registration;
186 if (registration != null) {
187 registration.cancel();
188 }
189 }
190
191 private void clearRdHup0() {
192 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP));
193 }
194
195 private void submit(KQueueIoOps ops) {
196 try {
197 registration.submit(ops);
198 } catch (Exception e) {
199 throw new ChannelException(e);
200 }
201 }
202
203 @Override
204 protected final void doBeginRead() throws Exception {
205
206 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
207 unsafe.readPending = true;
208
209
210
211
212 readFilter(true);
213
214
215
216 if (unsafe.maybeMoreDataToRead) {
217 unsafe.executeReadReadyRunnable(config());
218 }
219 }
220
221 @Override
222 protected void doRegister(ChannelPromise promise) {
223 ((IoEventLoop) eventLoop()).register((AbstractKQueueUnsafe) unsafe()).addListener(f -> {
224 if (f.isSuccess()) {
225 this.registration = (KQueueIoRegistration) f.getNow();
226
227
228
229 readReadyRunnablePending = false;
230
231 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP));
232
233
234 if (writeFilterEnabled) {
235 submit(writeEnabledOps);
236 }
237 if (readFilterEnabled) {
238 submit(readEnabledOps);
239 }
240 promise.setSuccess();
241 } else {
242 promise.setFailure(f.cause());
243 }
244 });
245 }
246
247 @Override
248 protected abstract AbstractKQueueUnsafe newUnsafe();
249
250 @Override
251 public abstract KQueueChannelConfig config();
252
253
254
255
256 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
257 return newDirectBuffer(buf, buf);
258 }
259
260
261
262
263
264
265 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
266 final int readableBytes = buf.readableBytes();
267 if (readableBytes == 0) {
268 ReferenceCountUtil.release(holder);
269 return Unpooled.EMPTY_BUFFER;
270 }
271
272 final ByteBufAllocator alloc = alloc();
273 if (alloc.isDirectBufferPooled()) {
274 return newDirectBuffer0(holder, buf, alloc, readableBytes);
275 }
276
277 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
278 if (directBuf == null) {
279 return newDirectBuffer0(holder, buf, alloc, readableBytes);
280 }
281
282 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
283 ReferenceCountUtil.safeRelease(holder);
284 return directBuf;
285 }
286
287 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
288 final ByteBuf directBuf = alloc.directBuffer(capacity);
289 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
290 ReferenceCountUtil.safeRelease(holder);
291 return directBuf;
292 }
293
294 protected static void checkResolvable(InetSocketAddress addr) {
295 if (addr.isUnresolved()) {
296 throw new UnresolvedAddressException();
297 }
298 }
299
300
301
302
303 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
304 int writerIndex = byteBuf.writerIndex();
305 int localReadAmount;
306 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
307 if (byteBuf.hasMemoryAddress()) {
308 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
309 } else {
310 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
311 localReadAmount = socket.read(buf, buf.position(), buf.limit());
312 }
313 if (localReadAmount > 0) {
314 byteBuf.writerIndex(writerIndex + localReadAmount);
315 }
316 return localReadAmount;
317 }
318
319 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
320 if (buf.hasMemoryAddress()) {
321 int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
322 if (localFlushedAmount > 0) {
323 in.removeBytes(localFlushedAmount);
324 return 1;
325 }
326 } else {
327 final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
328 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
329 int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
330 if (localFlushedAmount > 0) {
331 nioBuf.position(nioBuf.position() + localFlushedAmount);
332 in.removeBytes(localFlushedAmount);
333 return 1;
334 }
335 }
336 return WRITE_STATUS_SNDBUF_FULL;
337 }
338
339 final boolean shouldBreakReadReady(ChannelConfig config) {
340 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
341 }
342
343 private static boolean isAllowHalfClosure(ChannelConfig config) {
344 if (config instanceof KQueueDomainSocketChannelConfig) {
345 return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
346 }
347
348 return config instanceof SocketChannelConfig &&
349 ((SocketChannelConfig) config).isAllowHalfClosure();
350 }
351
352 final void clearReadFilter() {
353
354 if (isRegistered()) {
355 final EventLoop loop = eventLoop();
356 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
357 if (loop.inEventLoop()) {
358 unsafe.clearReadFilter0();
359 } else {
360
361 loop.execute(new Runnable() {
362 @Override
363 public void run() {
364 if (!unsafe.readPending && !config().isAutoRead()) {
365
366 unsafe.clearReadFilter0();
367 }
368 }
369 });
370 }
371 } else {
372
373
374 readFilterEnabled = false;
375 }
376 }
377
378 void readFilter(boolean readFilterEnabled) throws IOException {
379 if (this.readFilterEnabled != readFilterEnabled) {
380 this.readFilterEnabled = readFilterEnabled;
381 submit(readFilterEnabled ? readEnabledOps : readDisabledOps);
382 }
383 }
384
385 void writeFilter(boolean writeFilterEnabled) throws IOException {
386 if (this.writeFilterEnabled != writeFilterEnabled) {
387 this.writeFilterEnabled = writeFilterEnabled;
388 submit(writeFilterEnabled ? writeEnabledOps : writeDisabledOps);
389 }
390 }
391
392 @UnstableApi
393 public abstract class AbstractKQueueUnsafe extends AbstractUnsafe implements KQueueIoHandle {
394 boolean readPending;
395 boolean maybeMoreDataToRead;
396 private KQueueRecvByteAllocatorHandle allocHandle;
397 private final Runnable readReadyRunnable = new Runnable() {
398 @Override
399 public void run() {
400 readReadyRunnablePending = false;
401 readReady(recvBufAllocHandle());
402 }
403 };
404
405 Channel channel() {
406 return AbstractKQueueChannel.this;
407 }
408
409 @Override
410 public int ident() {
411 return fd().intValue();
412 }
413
414 @Override
415 public void close() {
416 close(voidPromise());
417 }
418
419 @Override
420 public void handle(IoRegistration registration, IoEvent event) {
421 KQueueIoEvent kqueueEvent = (KQueueIoEvent) event;
422 final short filter = kqueueEvent.filter();
423 final short flags = kqueueEvent.flags();
424 final int fflags = kqueueEvent.fflags();
425 final long data = kqueueEvent.data();
426
427
428
429 if (filter == Native.EVFILT_WRITE) {
430 writeReady();
431 } else if (filter == Native.EVFILT_READ) {
432
433 readReady(data);
434 } else if (filter == Native.EVFILT_SOCK && (fflags & Native.NOTE_RDHUP) != 0) {
435 readEOF();
436 }
437
438
439
440
441 if ((flags & Native.EV_EOF) != 0) {
442 readEOF();
443 }
444 }
445
446 private void readReady(long numberBytesPending) {
447 KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
448 allocHandle.numberBytesPending(numberBytesPending);
449 readReady(allocHandle);
450 }
451
452 abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
453
454 final void readReadyBefore() {
455 maybeMoreDataToRead = false;
456 }
457
458 final void readReadyFinally(ChannelConfig config) {
459 maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
460
461 if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) {
462
463
464
465
466
467
468
469 executeReadReadyRunnable(config);
470 } else if (!readPending && !config.isAutoRead()) {
471
472
473
474
475
476
477 clearReadFilter0();
478 }
479 }
480
481 final boolean failConnectPromise(Throwable cause) {
482 if (connectPromise != null) {
483
484
485
486 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
487 AbstractKQueueChannel.this.connectPromise = null;
488 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
489 : new ConnectException("failed to connect").initCause(cause))) {
490 closeIfClosed();
491 return true;
492 }
493 }
494 return false;
495 }
496
497 private void writeReady() {
498 if (connectPromise != null) {
499
500 finishConnect();
501 } else if (!socket.isOutputShutdown()) {
502
503 super.flush0();
504 }
505 }
506
507
508
509
510 void shutdownInput(boolean readEOF) {
511
512
513
514
515
516 if (readEOF && connectPromise != null) {
517 finishConnect();
518 }
519 if (!socket.isInputShutdown()) {
520 if (isAllowHalfClosure(config())) {
521 try {
522 socket.shutdown(true, false);
523 } catch (IOException ignored) {
524
525
526 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
527 return;
528 } catch (NotYetConnectedException ignore) {
529
530
531 }
532 clearReadFilter0();
533 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
534 } else {
535 close(voidPromise());
536 }
537 } else if (!readEOF && !inputClosedSeenErrorOnRead) {
538 inputClosedSeenErrorOnRead = true;
539 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
540 }
541 }
542
543 private void readEOF() {
544
545 final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
546 allocHandle.readEOF();
547
548 if (isActive()) {
549
550
551
552 readReady(allocHandle);
553 } else {
554
555 shutdownInput(true);
556 }
557
558
559 clearRdHup0();
560 }
561
562 @Override
563 public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
564 if (allocHandle == null) {
565 allocHandle = new KQueueRecvByteAllocatorHandle(
566 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
567 }
568 return allocHandle;
569 }
570
571 @Override
572 protected final void flush0() {
573
574
575
576 if (!writeFilterEnabled) {
577 super.flush0();
578 }
579 }
580
581 final void executeReadReadyRunnable(ChannelConfig config) {
582 if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
583 return;
584 }
585 readReadyRunnablePending = true;
586 eventLoop().execute(readReadyRunnable);
587 }
588
589 protected final void clearReadFilter0() {
590 assert eventLoop().inEventLoop();
591 try {
592 readPending = false;
593 readFilter(false);
594 } catch (IOException e) {
595
596
597 pipeline().fireExceptionCaught(e);
598 unsafe().close(unsafe().voidPromise());
599 }
600 }
601
602 private void fireEventAndClose(Object evt) {
603 pipeline().fireUserEventTriggered(evt);
604 close(voidPromise());
605 }
606
607 @Override
608 public void connect(
609 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
610
611
612 if (promise.isDone() || !ensureOpen(promise)) {
613 return;
614 }
615
616 try {
617 if (connectPromise != null) {
618 throw new ConnectionPendingException();
619 }
620
621 boolean wasActive = isActive();
622 if (doConnect(remoteAddress, localAddress)) {
623 fulfillConnectPromise(promise, wasActive);
624 } else {
625 connectPromise = promise;
626 requestedRemoteAddress = remoteAddress;
627
628
629 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
630 if (connectTimeoutMillis > 0) {
631 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
632 @Override
633 public void run() {
634 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
635 if (connectPromise != null && !connectPromise.isDone()
636 && connectPromise.tryFailure(new ConnectTimeoutException(
637 "connection timed out after " + connectTimeoutMillis + " ms: " +
638 remoteAddress))) {
639 close(voidPromise());
640 }
641 }
642 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
643 }
644
645 promise.addListener(new ChannelFutureListener() {
646 @Override
647 public void operationComplete(ChannelFuture future) {
648
649
650 if (future.isCancelled()) {
651 if (connectTimeoutFuture != null) {
652 connectTimeoutFuture.cancel(false);
653 }
654 connectPromise = null;
655 close(voidPromise());
656 }
657 }
658 });
659 }
660 } catch (Throwable t) {
661 closeIfClosed();
662 promise.tryFailure(annotateConnectException(t, remoteAddress));
663 }
664 }
665
666 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
667 if (promise == null) {
668
669 return;
670 }
671 active = true;
672
673
674
675 boolean active = isActive();
676
677
678 boolean promiseSet = promise.trySuccess();
679
680
681
682 if (!wasActive && active) {
683 pipeline().fireChannelActive();
684 }
685
686
687 if (!promiseSet) {
688 close(voidPromise());
689 }
690 }
691
692 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
693 if (promise == null) {
694
695 return;
696 }
697
698
699 promise.tryFailure(cause);
700 closeIfClosed();
701 }
702
703 private void finishConnect() {
704
705
706
707 assert eventLoop().inEventLoop();
708
709 boolean connectStillInProgress = false;
710 try {
711 boolean wasActive = isActive();
712 if (!doFinishConnect()) {
713 connectStillInProgress = true;
714 return;
715 }
716 fulfillConnectPromise(connectPromise, wasActive);
717 } catch (Throwable t) {
718 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
719 } finally {
720 if (!connectStillInProgress) {
721
722
723 if (connectTimeoutFuture != null) {
724 connectTimeoutFuture.cancel(false);
725 }
726 connectPromise = null;
727 }
728 }
729 }
730
731 private boolean doFinishConnect() throws Exception {
732 if (socket.finishConnect()) {
733 writeFilter(false);
734 if (requestedRemoteAddress instanceof InetSocketAddress) {
735 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
736 }
737 requestedRemoteAddress = null;
738 return true;
739 }
740 writeFilter(true);
741 return false;
742 }
743 }
744
745 @Override
746 protected void doBind(SocketAddress local) throws Exception {
747 if (local instanceof InetSocketAddress) {
748 checkResolvable((InetSocketAddress) local);
749 }
750 socket.bind(local);
751 this.local = socket.localAddress();
752 }
753
754
755
756
757 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
758 if (localAddress instanceof InetSocketAddress) {
759 checkResolvable((InetSocketAddress) localAddress);
760 }
761
762 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
763 ? (InetSocketAddress) remoteAddress : null;
764 if (remoteSocketAddr != null) {
765 checkResolvable(remoteSocketAddr);
766 }
767
768 if (remote != null) {
769
770
771
772 throw new AlreadyConnectedException();
773 }
774
775 if (localAddress != null) {
776 socket.bind(localAddress);
777 }
778
779 boolean connected = doConnect0(remoteAddress, localAddress);
780 if (connected) {
781 remote = remoteSocketAddr == null?
782 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
783 }
784
785
786
787 local = socket.localAddress();
788 return connected;
789 }
790
791 protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
792 boolean success = false;
793 try {
794 boolean connected = socket.connect(remoteAddress);
795 if (!connected) {
796 writeFilter(true);
797 }
798 success = true;
799 return connected;
800 } finally {
801 if (!success) {
802 doClose();
803 }
804 }
805 }
806
807 @Override
808 protected SocketAddress localAddress0() {
809 return local;
810 }
811
812 @Override
813 protected SocketAddress remoteAddress0() {
814 return remote;
815 }
816 }