1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.FileDescriptor;
41 import io.netty.channel.unix.UnixChannel;
42 import io.netty.util.ReferenceCountUtil;
43 import io.netty.util.concurrent.Future;
44 import io.netty.util.internal.UnstableApi;
45
46 import java.io.IOException;
47 import java.net.ConnectException;
48 import java.net.InetSocketAddress;
49 import java.net.SocketAddress;
50 import java.nio.ByteBuffer;
51 import java.nio.channels.AlreadyConnectedException;
52 import java.nio.channels.ConnectionPendingException;
53 import java.nio.channels.NotYetConnectedException;
54 import java.nio.channels.UnresolvedAddressException;
55 import java.util.concurrent.TimeUnit;
56
57 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
58 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
59 import static io.netty.util.internal.ObjectUtil.checkNotNull;
60
61 abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
62 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
63
64
65
66
67 private ChannelPromise connectPromise;
68 private Future<?> connectTimeoutFuture;
69 private SocketAddress requestedRemoteAddress;
70
71 final BsdSocket socket;
72 private IoRegistration registration;
73 private boolean readFilterEnabled;
74 private boolean writeFilterEnabled;
75
76 private static final KQueueIoOps READ_ENABLED_OPS =
77 KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_ADD_ENABLE, 0);
78 private static final KQueueIoOps WRITE_ENABLED_OPS =
79 KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_ADD_ENABLE, 0);
80 private static final KQueueIoOps READ_DISABLED_OPS =
81 KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_DELETE_DISABLE, 0);
82 private static final KQueueIoOps WRITE_DISABLED_OPS =
83 KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_DELETE_DISABLE, 0);
84
85 boolean readReadyRunnablePending;
86 boolean inputClosedSeenErrorOnRead;
87 protected volatile boolean active;
88 private volatile SocketAddress local;
89 private volatile SocketAddress remote;
90
91 AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
92 super(parent);
93 socket = checkNotNull(fd, "fd");
94 this.active = active;
95 if (active) {
96
97
98 local = fd.localAddress();
99 remote = fd.remoteAddress();
100 }
101 }
102
103 AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
104 super(parent);
105 socket = checkNotNull(fd, "fd");
106 active = true;
107
108
109 this.remote = remote;
110 local = fd.localAddress();
111 }
112
113 @Override
114 protected boolean isCompatible(EventLoop loop) {
115 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractKQueueUnsafe.class);
116 }
117
118 static boolean isSoErrorZero(BsdSocket fd) {
119 try {
120 return fd.getSoError() == 0;
121 } catch (IOException e) {
122 throw new ChannelException(e);
123 }
124 }
125
126 protected final IoRegistration registration() {
127 assert registration != null;
128 return registration;
129 }
130
131 @Override
132 public final FileDescriptor fd() {
133 return socket;
134 }
135
136 @Override
137 public boolean isActive() {
138 return active;
139 }
140
141 @Override
142 public ChannelMetadata metadata() {
143 return METADATA;
144 }
145
146 @Override
147 protected void doClose() throws Exception {
148 active = false;
149
150
151 inputClosedSeenErrorOnRead = true;
152 socket.close();
153 }
154
155 @Override
156 protected void doDisconnect() throws Exception {
157 doClose();
158 }
159
160 void resetCachedAddresses() {
161 local = socket.localAddress();
162 remote = socket.remoteAddress();
163 }
164
165 @Override
166 public boolean isOpen() {
167 return socket.isOpen();
168 }
169
170 @Override
171 protected void doDeregister() throws Exception {
172
173
174
175 readFilter(false);
176 writeFilter(false);
177 clearRdHup0();
178
179 IoRegistration registration = this.registration;
180 if (registration != null) {
181 registration.cancel();
182 }
183 }
184
185 private void clearRdHup0() {
186 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP));
187 }
188
189 private void submit(KQueueIoOps ops) {
190 try {
191 registration.submit(ops);
192 } catch (Exception e) {
193 throw new ChannelException(e);
194 }
195 }
196
197 @Override
198 protected final void doBeginRead() throws Exception {
199
200 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
201 unsafe.readPending = true;
202
203
204
205
206 readFilter(true);
207 }
208
209 @Override
210 protected void doRegister(ChannelPromise promise) {
211 ((IoEventLoop) eventLoop()).register((AbstractKQueueUnsafe) unsafe()).addListener(f -> {
212 if (f.isSuccess()) {
213 this.registration = (IoRegistration) f.getNow();
214
215
216
217 readReadyRunnablePending = false;
218
219 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP));
220
221
222 if (writeFilterEnabled) {
223 submit(WRITE_ENABLED_OPS);
224 }
225 if (readFilterEnabled) {
226 submit(READ_ENABLED_OPS);
227 }
228 promise.setSuccess();
229 } else {
230 promise.setFailure(f.cause());
231 }
232 });
233 }
234
235 @Override
236 protected abstract AbstractKQueueUnsafe newUnsafe();
237
238 @Override
239 public abstract KQueueChannelConfig config();
240
241
242
243
244 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
245 return newDirectBuffer(buf, buf);
246 }
247
248
249
250
251
252
253 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
254 final int readableBytes = buf.readableBytes();
255 if (readableBytes == 0) {
256 ReferenceCountUtil.release(holder);
257 return Unpooled.EMPTY_BUFFER;
258 }
259
260 final ByteBufAllocator alloc = alloc();
261 if (alloc.isDirectBufferPooled()) {
262 return newDirectBuffer0(holder, buf, alloc, readableBytes);
263 }
264
265 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
266 if (directBuf == null) {
267 return newDirectBuffer0(holder, buf, alloc, readableBytes);
268 }
269
270 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
271 ReferenceCountUtil.safeRelease(holder);
272 return directBuf;
273 }
274
275 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
276 final ByteBuf directBuf = alloc.directBuffer(capacity);
277 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
278 ReferenceCountUtil.safeRelease(holder);
279 return directBuf;
280 }
281
282 protected static void checkResolvable(InetSocketAddress addr) {
283 if (addr.isUnresolved()) {
284 throw new UnresolvedAddressException();
285 }
286 }
287
288
289
290
291 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
292 int writerIndex = byteBuf.writerIndex();
293 int localReadAmount;
294 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
295 if (byteBuf.hasMemoryAddress()) {
296 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
297 } else {
298 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
299 localReadAmount = socket.read(buf, buf.position(), buf.limit());
300 }
301 if (localReadAmount > 0) {
302 byteBuf.writerIndex(writerIndex + localReadAmount);
303 }
304 return localReadAmount;
305 }
306
307 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
308 if (buf.hasMemoryAddress()) {
309 int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
310 if (localFlushedAmount > 0) {
311 in.removeBytes(localFlushedAmount);
312 return 1;
313 }
314 } else {
315 final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
316 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
317 int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
318 if (localFlushedAmount > 0) {
319 nioBuf.position(nioBuf.position() + localFlushedAmount);
320 in.removeBytes(localFlushedAmount);
321 return 1;
322 }
323 }
324 return WRITE_STATUS_SNDBUF_FULL;
325 }
326
327 final boolean shouldBreakReadReady(ChannelConfig config) {
328 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
329 }
330
331 private static boolean isAllowHalfClosure(ChannelConfig config) {
332 if (config instanceof KQueueDomainSocketChannelConfig) {
333 return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
334 }
335
336 return config instanceof SocketChannelConfig &&
337 ((SocketChannelConfig) config).isAllowHalfClosure();
338 }
339
340 final void clearReadFilter() {
341
342 if (isRegistered()) {
343 final EventLoop loop = eventLoop();
344 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
345 if (loop.inEventLoop()) {
346 unsafe.clearReadFilter0();
347 } else {
348
349 loop.execute(new Runnable() {
350 @Override
351 public void run() {
352 if (!unsafe.readPending && !config().isAutoRead()) {
353
354 unsafe.clearReadFilter0();
355 }
356 }
357 });
358 }
359 } else {
360
361
362 readFilterEnabled = false;
363 }
364 }
365
366 void readFilter(boolean readFilterEnabled) throws IOException {
367 if (this.readFilterEnabled != readFilterEnabled) {
368 this.readFilterEnabled = readFilterEnabled;
369 submit(readFilterEnabled ? READ_ENABLED_OPS : READ_DISABLED_OPS);
370 }
371 }
372
373 void writeFilter(boolean writeFilterEnabled) throws IOException {
374 if (this.writeFilterEnabled != writeFilterEnabled) {
375 this.writeFilterEnabled = writeFilterEnabled;
376 submit(writeFilterEnabled ? WRITE_ENABLED_OPS : WRITE_DISABLED_OPS);
377 }
378 }
379
380 @UnstableApi
381 public abstract class AbstractKQueueUnsafe extends AbstractUnsafe implements KQueueIoHandle {
382 boolean readPending;
383 private KQueueRecvByteAllocatorHandle allocHandle;
384
385 Channel channel() {
386 return AbstractKQueueChannel.this;
387 }
388
389 @Override
390 public int ident() {
391 return fd().intValue();
392 }
393
394 @Override
395 public void close() {
396 close(voidPromise());
397 }
398
399 @Override
400 public void handle(IoRegistration registration, IoEvent event) {
401 KQueueIoEvent kqueueEvent = (KQueueIoEvent) event;
402 final short filter = kqueueEvent.filter();
403 final short flags = kqueueEvent.flags();
404 final int fflags = kqueueEvent.fflags();
405 final long data = kqueueEvent.data();
406
407
408
409 if (filter == Native.EVFILT_WRITE) {
410 writeReady();
411 } else if (filter == Native.EVFILT_READ) {
412
413 KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
414 readReady(allocHandle);
415 } else if (filter == Native.EVFILT_SOCK && (fflags & Native.NOTE_RDHUP) != 0) {
416 readEOF();
417 return;
418 }
419
420
421
422
423 if ((flags & Native.EV_EOF) != 0) {
424 readEOF();
425 }
426 }
427
428 abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
429
430 final boolean shouldStopReading(ChannelConfig config) {
431
432
433
434
435
436
437 return !readPending && !config.isAutoRead();
438 }
439
440 final boolean failConnectPromise(Throwable cause) {
441 if (connectPromise != null) {
442
443
444
445 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
446 AbstractKQueueChannel.this.connectPromise = null;
447 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
448 : new ConnectException("failed to connect").initCause(cause))) {
449 closeIfClosed();
450 return true;
451 }
452 }
453 return false;
454 }
455
456 private void writeReady() {
457 if (connectPromise != null) {
458
459 finishConnect();
460 } else if (!socket.isOutputShutdown()) {
461
462 super.flush0();
463 }
464 }
465
466
467
468
469 void shutdownInput(boolean readEOF) {
470
471
472
473
474
475 if (readEOF && connectPromise != null) {
476 finishConnect();
477 }
478 if (!socket.isInputShutdown()) {
479 if (isAllowHalfClosure(config())) {
480 try {
481 socket.shutdown(true, false);
482 } catch (IOException ignored) {
483
484
485 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
486 return;
487 } catch (NotYetConnectedException ignore) {
488
489
490 }
491 if (shouldStopReading(config())) {
492 clearReadFilter0();
493 }
494 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
495 } else {
496 close(voidPromise());
497 return;
498 }
499 }
500 if (!readEOF && !inputClosedSeenErrorOnRead) {
501 inputClosedSeenErrorOnRead = true;
502 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
503 }
504 }
505
506 private void readEOF() {
507
508 final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
509 allocHandle.readEOF();
510
511 if (isActive()) {
512
513
514
515 readReady(allocHandle);
516 } else {
517
518 shutdownInput(true);
519 }
520
521
522 clearRdHup0();
523 }
524
525 @Override
526 public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
527 if (allocHandle == null) {
528 allocHandle = new KQueueRecvByteAllocatorHandle(
529 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
530 }
531 return allocHandle;
532 }
533
534 @Override
535 protected final void flush0() {
536
537
538
539 if (!writeFilterEnabled) {
540 super.flush0();
541 }
542 }
543
544 protected final void clearReadFilter0() {
545 assert eventLoop().inEventLoop();
546 try {
547 readPending = false;
548 readFilter(false);
549 } catch (IOException e) {
550
551
552 pipeline().fireExceptionCaught(e);
553 unsafe().close(unsafe().voidPromise());
554 }
555 }
556
557 private void fireEventAndClose(Object evt) {
558 pipeline().fireUserEventTriggered(evt);
559 close(voidPromise());
560 }
561
562 @Override
563 public void connect(
564 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
565
566
567 if (promise.isDone() || !ensureOpen(promise)) {
568 return;
569 }
570
571 try {
572 if (connectPromise != null) {
573 throw new ConnectionPendingException();
574 }
575
576 boolean wasActive = isActive();
577 if (doConnect(remoteAddress, localAddress)) {
578 fulfillConnectPromise(promise, wasActive);
579 } else {
580 connectPromise = promise;
581 requestedRemoteAddress = remoteAddress;
582
583
584 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
585 if (connectTimeoutMillis > 0) {
586 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
587 @Override
588 public void run() {
589 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
590 if (connectPromise != null && !connectPromise.isDone()
591 && connectPromise.tryFailure(new ConnectTimeoutException(
592 "connection timed out after " + connectTimeoutMillis + " ms: " +
593 remoteAddress))) {
594 close(voidPromise());
595 }
596 }
597 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
598 }
599
600 promise.addListener(new ChannelFutureListener() {
601 @Override
602 public void operationComplete(ChannelFuture future) {
603
604
605 if (future.isCancelled()) {
606 if (connectTimeoutFuture != null) {
607 connectTimeoutFuture.cancel(false);
608 }
609 connectPromise = null;
610 close(voidPromise());
611 }
612 }
613 });
614 }
615 } catch (Throwable t) {
616 closeIfClosed();
617 promise.tryFailure(annotateConnectException(t, remoteAddress));
618 }
619 }
620
621 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
622 if (promise == null) {
623
624 return;
625 }
626 active = true;
627
628
629
630 boolean active = isActive();
631
632
633 boolean promiseSet = promise.trySuccess();
634
635
636
637 if (!wasActive && active) {
638 pipeline().fireChannelActive();
639 }
640
641
642 if (!promiseSet) {
643 close(voidPromise());
644 }
645 }
646
647 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
648 if (promise == null) {
649
650 return;
651 }
652
653
654 promise.tryFailure(cause);
655 closeIfClosed();
656 }
657
658 private void finishConnect() {
659
660
661
662 assert eventLoop().inEventLoop();
663
664 boolean connectStillInProgress = false;
665 try {
666 boolean wasActive = isActive();
667 if (!doFinishConnect()) {
668 connectStillInProgress = true;
669 return;
670 }
671 fulfillConnectPromise(connectPromise, wasActive);
672 } catch (Throwable t) {
673 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
674 } finally {
675 if (!connectStillInProgress) {
676
677
678 if (connectTimeoutFuture != null) {
679 connectTimeoutFuture.cancel(false);
680 }
681 connectPromise = null;
682 }
683 }
684 }
685
686 private boolean doFinishConnect() throws Exception {
687 if (socket.finishConnect()) {
688 writeFilter(false);
689 if (requestedRemoteAddress instanceof InetSocketAddress) {
690 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
691 }
692 requestedRemoteAddress = null;
693 return true;
694 }
695 writeFilter(true);
696 return false;
697 }
698 }
699
700 @Override
701 protected void doBind(SocketAddress local) throws Exception {
702 if (local instanceof InetSocketAddress) {
703 checkResolvable((InetSocketAddress) local);
704 }
705 socket.bind(local);
706 this.local = socket.localAddress();
707 }
708
709
710
711
712 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
713 if (localAddress instanceof InetSocketAddress) {
714 checkResolvable((InetSocketAddress) localAddress);
715 }
716
717 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
718 ? (InetSocketAddress) remoteAddress : null;
719 if (remoteSocketAddr != null) {
720 checkResolvable(remoteSocketAddr);
721 }
722
723 if (remote != null) {
724
725
726
727 throw new AlreadyConnectedException();
728 }
729
730 if (localAddress != null) {
731 socket.bind(localAddress);
732 }
733
734 boolean connected = doConnect0(remoteAddress, localAddress);
735 if (connected) {
736 remote = remoteSocketAddr == null?
737 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
738 }
739
740
741
742 local = socket.localAddress();
743 return connected;
744 }
745
746 protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
747 boolean success = false;
748 try {
749 boolean connected = socket.connect(remoteAddress);
750 if (!connected) {
751 writeFilter(true);
752 }
753 success = true;
754 return connected;
755 } finally {
756 if (!success) {
757 doClose();
758 }
759 }
760 }
761
762 @Override
763 protected SocketAddress localAddress0() {
764 return local;
765 }
766
767 @Override
768 protected SocketAddress remoteAddress0() {
769 return remote;
770 }
771 }