1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.FileDescriptor;
41 import io.netty.channel.unix.UnixChannel;
42 import io.netty.util.ReferenceCountUtil;
43 import io.netty.util.concurrent.Future;
44 import io.netty.util.internal.UnstableApi;
45
46 import java.io.IOException;
47 import java.net.ConnectException;
48 import java.net.InetSocketAddress;
49 import java.net.SocketAddress;
50 import java.nio.ByteBuffer;
51 import java.nio.channels.AlreadyConnectedException;
52 import java.nio.channels.ConnectionPendingException;
53 import java.nio.channels.NotYetConnectedException;
54 import java.nio.channels.UnresolvedAddressException;
55 import java.util.concurrent.TimeUnit;
56
57 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
58 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
59 import static io.netty.util.internal.ObjectUtil.checkNotNull;
60
61 abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
62
63 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64
65
66
67
68 private ChannelPromise connectPromise;
69 private Future<?> connectTimeoutFuture;
70 private SocketAddress requestedRemoteAddress;
71
72 final BsdSocket socket;
73 private IoRegistration registration;
74 private boolean readFilterEnabled;
75 private boolean writeFilterEnabled;
76
77 boolean readReadyRunnablePending;
78 boolean inputClosedSeenErrorOnRead;
79 protected volatile boolean active;
80 private volatile SocketAddress local;
81 private volatile SocketAddress remote;
82
83 AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
84 super(parent);
85 socket = checkNotNull(fd, "fd");
86 this.active = active;
87 if (active) {
88
89
90 local = fd.localAddress();
91 remote = fd.remoteAddress();
92 }
93 }
94
95 AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
96 super(parent);
97 socket = checkNotNull(fd, "fd");
98 active = true;
99
100
101 this.remote = remote;
102 local = fd.localAddress();
103 }
104
105 @Override
106 protected boolean isCompatible(EventLoop loop) {
107 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractKQueueUnsafe.class);
108 }
109
110 static boolean isSoErrorZero(BsdSocket fd) {
111 try {
112 return fd.getSoError() == 0;
113 } catch (IOException e) {
114 throw new ChannelException(e);
115 }
116 }
117
118 protected final IoRegistration registration() {
119 assert registration != null;
120 return registration;
121 }
122
123 @Override
124 public final FileDescriptor fd() {
125 return socket;
126 }
127
128 @Override
129 public boolean isActive() {
130 return active;
131 }
132
133 @Override
134 public ChannelMetadata metadata() {
135 return METADATA;
136 }
137
138 @Override
139 protected void doClose() throws Exception {
140
141
142 doDeregister();
143
144 active = false;
145
146
147 inputClosedSeenErrorOnRead = true;
148 socket.close();
149 }
150
151 @Override
152 protected void doDisconnect() throws Exception {
153 doClose();
154 }
155
156 void resetCachedAddresses() {
157 local = socket.localAddress();
158 remote = socket.remoteAddress();
159 }
160
161 @Override
162 public boolean isOpen() {
163 return socket.isOpen();
164 }
165
166 @Override
167 protected void doDeregister() throws Exception {
168 IoRegistration registration = this.registration;
169 if (registration != null) {
170
171
172
173 readFilter(false);
174 writeFilter(false);
175 clearRdHup0();
176
177 registration.cancel();
178 this.registration = null;
179 }
180 }
181
182 private void clearRdHup0() {
183 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP));
184 }
185
186 private void submit(KQueueIoOps ops) {
187 if (!isOpen()) {
188
189
190 return;
191 }
192 try {
193 registration.submit(ops);
194 } catch (Exception e) {
195 throw new ChannelException(e);
196 }
197 }
198
199 @Override
200 protected final void doBeginRead() throws Exception {
201
202 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
203 unsafe.readPending = true;
204
205
206
207
208 readFilter(true);
209 }
210
211 @Override
212 protected void doRegister(ChannelPromise promise) {
213 ((IoEventLoop) eventLoop()).register((AbstractKQueueUnsafe) unsafe()).addListener(f -> {
214 if (f.isSuccess()) {
215 this.registration = (IoRegistration) f.getNow();
216
217
218
219 readReadyRunnablePending = false;
220
221 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP));
222
223
224 if (writeFilterEnabled) {
225 submit(Native.WRITE_ENABLED_OPS);
226 }
227 if (readFilterEnabled) {
228 submit(Native.READ_ENABLED_OPS);
229 }
230 promise.setSuccess();
231 } else {
232 promise.setFailure(f.cause());
233 }
234 });
235 }
236
237 @Override
238 protected abstract AbstractKQueueUnsafe newUnsafe();
239
240 @Override
241 public abstract KQueueChannelConfig config();
242
243
244
245
246 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
247 return newDirectBuffer(buf, buf);
248 }
249
250
251
252
253
254
255 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
256 final int readableBytes = buf.readableBytes();
257 if (readableBytes == 0) {
258 ReferenceCountUtil.release(holder);
259 return Unpooled.EMPTY_BUFFER;
260 }
261
262 final ByteBufAllocator alloc = alloc();
263 if (alloc.isDirectBufferPooled()) {
264 return newDirectBuffer0(holder, buf, alloc, readableBytes);
265 }
266
267 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
268 if (directBuf == null) {
269 return newDirectBuffer0(holder, buf, alloc, readableBytes);
270 }
271
272 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
273 ReferenceCountUtil.safeRelease(holder);
274 return directBuf;
275 }
276
277 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
278 final ByteBuf directBuf = alloc.directBuffer(capacity);
279 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
280 ReferenceCountUtil.safeRelease(holder);
281 return directBuf;
282 }
283
284 protected static void checkResolvable(InetSocketAddress addr) {
285 if (addr.isUnresolved()) {
286 throw new UnresolvedAddressException();
287 }
288 }
289
290
291
292
293 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
294 int writerIndex = byteBuf.writerIndex();
295 int localReadAmount;
296 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
297 if (byteBuf.hasMemoryAddress()) {
298 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
299 } else {
300 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
301 localReadAmount = socket.read(buf, buf.position(), buf.limit());
302 }
303 if (localReadAmount > 0) {
304 byteBuf.writerIndex(writerIndex + localReadAmount);
305 }
306 return localReadAmount;
307 }
308
309 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
310 if (buf.hasMemoryAddress()) {
311 int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
312 if (localFlushedAmount > 0) {
313 in.removeBytes(localFlushedAmount);
314 return 1;
315 }
316 } else {
317 final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
318 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
319 int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
320 if (localFlushedAmount > 0) {
321 nioBuf.position(nioBuf.position() + localFlushedAmount);
322 in.removeBytes(localFlushedAmount);
323 return 1;
324 }
325 }
326 return WRITE_STATUS_SNDBUF_FULL;
327 }
328
329 final boolean shouldBreakReadReady(ChannelConfig config) {
330 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
331 }
332
333 private static boolean isAllowHalfClosure(ChannelConfig config) {
334 if (config instanceof KQueueDomainSocketChannelConfig) {
335 return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
336 }
337
338 return config instanceof SocketChannelConfig &&
339 ((SocketChannelConfig) config).isAllowHalfClosure();
340 }
341
342 final void clearReadFilter() {
343
344 if (isRegistered()) {
345 final EventLoop loop = eventLoop();
346 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
347 if (loop.inEventLoop()) {
348 unsafe.clearReadFilter0();
349 } else {
350
351 loop.execute(new Runnable() {
352 @Override
353 public void run() {
354 if (!unsafe.readPending && !config().isAutoRead()) {
355
356 unsafe.clearReadFilter0();
357 }
358 }
359 });
360 }
361 } else {
362
363
364 readFilterEnabled = false;
365 }
366 }
367
368 void readFilter(boolean readFilterEnabled) throws IOException {
369 if (this.readFilterEnabled != readFilterEnabled) {
370 this.readFilterEnabled = readFilterEnabled;
371 submit(readFilterEnabled ? Native.READ_ENABLED_OPS : Native.READ_DISABLED_OPS);
372 }
373 }
374
375 void writeFilter(boolean writeFilterEnabled) throws IOException {
376 if (this.writeFilterEnabled != writeFilterEnabled) {
377 this.writeFilterEnabled = writeFilterEnabled;
378 submit(writeFilterEnabled ? Native.WRITE_ENABLED_OPS : Native.WRITE_DISABLED_OPS);
379 }
380 }
381
382 @UnstableApi
383 public abstract class AbstractKQueueUnsafe extends AbstractUnsafe implements KQueueIoHandle {
384 boolean readPending;
385 private KQueueRecvByteAllocatorHandle allocHandle;
386
387 Channel channel() {
388 return AbstractKQueueChannel.this;
389 }
390
391 @Override
392 public int ident() {
393 return fd().intValue();
394 }
395
396 @Override
397 public void close() {
398 close(voidPromise());
399 }
400
401 @Override
402 public void handle(IoRegistration registration, IoEvent event) {
403 KQueueIoEvent kqueueEvent = (KQueueIoEvent) event;
404 final short filter = kqueueEvent.filter();
405 final short flags = kqueueEvent.flags();
406 final int fflags = kqueueEvent.fflags();
407 final long data = kqueueEvent.data();
408
409
410
411 if (filter == Native.EVFILT_WRITE) {
412 writeReady();
413 } else if (filter == Native.EVFILT_READ) {
414
415 KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
416 readReady(allocHandle);
417 } else if (filter == Native.EVFILT_SOCK && (fflags & Native.NOTE_RDHUP) != 0) {
418 readEOF();
419 return;
420 }
421
422
423
424
425 if ((flags & Native.EV_EOF) != 0) {
426 readEOF();
427 }
428 }
429
430 abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
431
432 final boolean shouldStopReading(ChannelConfig config) {
433
434
435
436
437
438
439 return !readPending && !config.isAutoRead();
440 }
441
442 final boolean failConnectPromise(Throwable cause) {
443 if (connectPromise != null) {
444
445
446
447 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
448 AbstractKQueueChannel.this.connectPromise = null;
449 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
450 : new ConnectException("failed to connect").initCause(cause))) {
451 closeIfClosed();
452 return true;
453 }
454 }
455 return false;
456 }
457
458 private void writeReady() {
459 if (connectPromise != null) {
460
461 finishConnect();
462 } else if (!socket.isOutputShutdown()) {
463
464 super.flush0();
465 }
466 }
467
468
469
470
471 void shutdownInput(boolean readEOF) {
472
473
474
475
476
477 if (readEOF && connectPromise != null) {
478 finishConnect();
479 }
480 if (!socket.isInputShutdown()) {
481 if (isAllowHalfClosure(config())) {
482 try {
483 socket.shutdown(true, false);
484 } catch (IOException ignored) {
485
486
487 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
488 return;
489 } catch (NotYetConnectedException ignore) {
490
491
492 }
493 if (shouldStopReading(config())) {
494 clearReadFilter0();
495 }
496 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
497 } else {
498 close(voidPromise());
499 return;
500 }
501 }
502 if (!readEOF && !inputClosedSeenErrorOnRead) {
503 inputClosedSeenErrorOnRead = true;
504 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
505 }
506 }
507
508 private void readEOF() {
509
510 final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
511 allocHandle.readEOF();
512
513 if (isActive()) {
514
515
516
517 readReady(allocHandle);
518 } else {
519
520 shutdownInput(true);
521 }
522
523
524 clearRdHup0();
525 }
526
527 @Override
528 public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
529 if (allocHandle == null) {
530 allocHandle = new KQueueRecvByteAllocatorHandle(
531 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
532 }
533 return allocHandle;
534 }
535
536 @Override
537 protected final void flush0() {
538
539
540
541 if (!writeFilterEnabled) {
542 super.flush0();
543 }
544 }
545
546 protected final void clearReadFilter0() {
547 assert eventLoop().inEventLoop();
548 try {
549 readPending = false;
550 readFilter(false);
551 } catch (IOException e) {
552
553
554 pipeline().fireExceptionCaught(e);
555 unsafe().close(unsafe().voidPromise());
556 }
557 }
558
559 private void fireEventAndClose(Object evt) {
560 pipeline().fireUserEventTriggered(evt);
561 close(voidPromise());
562 }
563
564 @Override
565 public void connect(
566 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
567
568
569 if (promise.isDone() || !ensureOpen(promise)) {
570 return;
571 }
572
573 try {
574 if (connectPromise != null) {
575 throw new ConnectionPendingException();
576 }
577
578 boolean wasActive = isActive();
579 if (doConnect(remoteAddress, localAddress)) {
580 fulfillConnectPromise(promise, wasActive);
581 } else {
582 connectPromise = promise;
583 requestedRemoteAddress = remoteAddress;
584
585
586 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
587 if (connectTimeoutMillis > 0) {
588 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
589 @Override
590 public void run() {
591 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
592 if (connectPromise != null && !connectPromise.isDone()
593 && connectPromise.tryFailure(new ConnectTimeoutException(
594 "connection timed out after " + connectTimeoutMillis + " ms: " +
595 remoteAddress))) {
596 close(voidPromise());
597 }
598 }
599 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
600 }
601
602 promise.addListener(new ChannelFutureListener() {
603 @Override
604 public void operationComplete(ChannelFuture future) {
605
606
607 if (future.isCancelled()) {
608 if (connectTimeoutFuture != null) {
609 connectTimeoutFuture.cancel(false);
610 }
611 connectPromise = null;
612 close(voidPromise());
613 }
614 }
615 });
616 }
617 } catch (Throwable t) {
618 closeIfClosed();
619 promise.tryFailure(annotateConnectException(t, remoteAddress));
620 }
621 }
622
623 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
624 if (promise == null) {
625
626 return;
627 }
628 active = true;
629
630
631
632 boolean active = isActive();
633
634
635 boolean promiseSet = promise.trySuccess();
636
637
638
639 if (!wasActive && active) {
640 pipeline().fireChannelActive();
641 }
642
643
644 if (!promiseSet) {
645 close(voidPromise());
646 }
647 }
648
649 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
650 if (promise == null) {
651
652 return;
653 }
654
655
656 promise.tryFailure(cause);
657 closeIfClosed();
658 }
659
660 private void finishConnect() {
661
662
663
664 assert eventLoop().inEventLoop();
665
666 boolean connectStillInProgress = false;
667 try {
668 boolean wasActive = isActive();
669 if (!doFinishConnect()) {
670 connectStillInProgress = true;
671 return;
672 }
673 fulfillConnectPromise(connectPromise, wasActive);
674 } catch (Throwable t) {
675 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
676 } finally {
677 if (!connectStillInProgress) {
678
679
680 if (connectTimeoutFuture != null) {
681 connectTimeoutFuture.cancel(false);
682 }
683 connectPromise = null;
684 }
685 }
686 }
687
688 private boolean doFinishConnect() throws Exception {
689 if (socket.finishConnect()) {
690 writeFilter(false);
691 if (requestedRemoteAddress instanceof InetSocketAddress) {
692 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
693 }
694 requestedRemoteAddress = null;
695 return true;
696 }
697 writeFilter(true);
698 return false;
699 }
700 }
701
702 @Override
703 protected void doBind(SocketAddress local) throws Exception {
704 if (local instanceof InetSocketAddress) {
705 checkResolvable((InetSocketAddress) local);
706 }
707 socket.bind(local);
708 this.local = socket.localAddress();
709 }
710
711
712
713
714 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
715 if (localAddress instanceof InetSocketAddress) {
716 checkResolvable((InetSocketAddress) localAddress);
717 }
718
719 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
720 ? (InetSocketAddress) remoteAddress : null;
721 if (remoteSocketAddr != null) {
722 checkResolvable(remoteSocketAddr);
723 }
724
725 if (remote != null) {
726
727
728
729 throw new AlreadyConnectedException();
730 }
731
732 if (localAddress != null) {
733 socket.bind(localAddress);
734 }
735
736 boolean connected = doConnect0(remoteAddress, localAddress);
737 if (connected) {
738 remote = remoteSocketAddr == null?
739 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
740 }
741
742
743
744 local = socket.localAddress();
745 return connected;
746 }
747
748 protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
749 boolean success = false;
750 try {
751 boolean connected = socket.connect(remoteAddress);
752 if (!connected) {
753 writeFilter(true);
754 }
755 success = true;
756 return connected;
757 } finally {
758 if (!success) {
759 doClose();
760 }
761 }
762 }
763
764 @Override
765 protected SocketAddress localAddress0() {
766 return local;
767 }
768
769 @Override
770 protected SocketAddress remoteAddress0() {
771 return remote;
772 }
773 }