1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.FileDescriptor;
41 import io.netty.channel.unix.UnixChannel;
42 import io.netty.util.ReferenceCountUtil;
43 import io.netty.util.concurrent.Future;
44 import io.netty.util.internal.UnstableApi;
45
46 import java.io.IOException;
47 import java.net.ConnectException;
48 import java.net.InetSocketAddress;
49 import java.net.SocketAddress;
50 import java.nio.ByteBuffer;
51 import java.nio.channels.AlreadyConnectedException;
52 import java.nio.channels.ConnectionPendingException;
53 import java.nio.channels.NotYetConnectedException;
54 import java.nio.channels.UnresolvedAddressException;
55 import java.util.concurrent.TimeUnit;
56
57 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
58 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
59 import static io.netty.util.internal.ObjectUtil.checkNotNull;
60
61 abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
62
63 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64
65
66
67
68 private ChannelPromise connectPromise;
69 private Future<?> connectTimeoutFuture;
70 private SocketAddress requestedRemoteAddress;
71
72 final BsdSocket socket;
73 private IoRegistration registration;
74 private boolean readFilterEnabled;
75 private boolean writeFilterEnabled;
76
77 boolean readReadyRunnablePending;
78 boolean inputClosedSeenErrorOnRead;
79 protected volatile boolean active;
80 private volatile SocketAddress local;
81 private volatile SocketAddress remote;
82
83 AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
84 super(parent);
85 socket = checkNotNull(fd, "fd");
86 this.active = active;
87 if (active) {
88
89
90 local = fd.localAddress();
91 remote = fd.remoteAddress();
92 }
93 }
94
95 AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
96 super(parent);
97 socket = checkNotNull(fd, "fd");
98 active = true;
99
100
101 this.remote = remote;
102 local = fd.localAddress();
103 }
104
105 @Override
106 protected boolean isCompatible(EventLoop loop) {
107 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractKQueueUnsafe.class);
108 }
109
110 static boolean isSoErrorZero(BsdSocket fd) {
111 try {
112 return fd.getSoError() == 0;
113 } catch (IOException e) {
114 throw new ChannelException(e);
115 }
116 }
117
118 protected final IoRegistration registration() {
119 assert registration != null;
120 return registration;
121 }
122
123 @Override
124 public final FileDescriptor fd() {
125 return socket;
126 }
127
128 @Override
129 public boolean isActive() {
130 return active;
131 }
132
133 @Override
134 public ChannelMetadata metadata() {
135 return METADATA;
136 }
137
138 @Override
139 protected void doClose() throws Exception {
140 active = false;
141
142
143 inputClosedSeenErrorOnRead = true;
144 socket.close();
145 }
146
147 @Override
148 protected void doDisconnect() throws Exception {
149 doClose();
150 }
151
152 void resetCachedAddresses() {
153 local = socket.localAddress();
154 remote = socket.remoteAddress();
155 }
156
157 @Override
158 public boolean isOpen() {
159 return socket.isOpen();
160 }
161
162 @Override
163 protected void doDeregister() throws Exception {
164
165
166
167 readFilter(false);
168 writeFilter(false);
169 clearRdHup0();
170
171 IoRegistration registration = this.registration;
172 if (registration != null) {
173 registration.cancel();
174 }
175 }
176
177 private void clearRdHup0() {
178 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP));
179 }
180
181 private void submit(KQueueIoOps ops) {
182 if (!isOpen()) {
183
184
185 return;
186 }
187 try {
188 registration.submit(ops);
189 } catch (Exception e) {
190 throw new ChannelException(e);
191 }
192 }
193
194 @Override
195 protected final void doBeginRead() throws Exception {
196
197 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
198 unsafe.readPending = true;
199
200
201
202
203 readFilter(true);
204 }
205
206 @Override
207 protected void doRegister(ChannelPromise promise) {
208 ((IoEventLoop) eventLoop()).register((AbstractKQueueUnsafe) unsafe()).addListener(f -> {
209 if (f.isSuccess()) {
210 this.registration = (IoRegistration) f.getNow();
211
212
213
214 readReadyRunnablePending = false;
215
216 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP));
217
218
219 if (writeFilterEnabled) {
220 submit(Native.WRITE_ENABLED_OPS);
221 }
222 if (readFilterEnabled) {
223 submit(Native.READ_ENABLED_OPS);
224 }
225 promise.setSuccess();
226 } else {
227 promise.setFailure(f.cause());
228 }
229 });
230 }
231
232 @Override
233 protected abstract AbstractKQueueUnsafe newUnsafe();
234
235 @Override
236 public abstract KQueueChannelConfig config();
237
238
239
240
241 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
242 return newDirectBuffer(buf, buf);
243 }
244
245
246
247
248
249
250 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
251 final int readableBytes = buf.readableBytes();
252 if (readableBytes == 0) {
253 ReferenceCountUtil.release(holder);
254 return Unpooled.EMPTY_BUFFER;
255 }
256
257 final ByteBufAllocator alloc = alloc();
258 if (alloc.isDirectBufferPooled()) {
259 return newDirectBuffer0(holder, buf, alloc, readableBytes);
260 }
261
262 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
263 if (directBuf == null) {
264 return newDirectBuffer0(holder, buf, alloc, readableBytes);
265 }
266
267 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
268 ReferenceCountUtil.safeRelease(holder);
269 return directBuf;
270 }
271
272 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
273 final ByteBuf directBuf = alloc.directBuffer(capacity);
274 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
275 ReferenceCountUtil.safeRelease(holder);
276 return directBuf;
277 }
278
279 protected static void checkResolvable(InetSocketAddress addr) {
280 if (addr.isUnresolved()) {
281 throw new UnresolvedAddressException();
282 }
283 }
284
285
286
287
288 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
289 int writerIndex = byteBuf.writerIndex();
290 int localReadAmount;
291 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
292 if (byteBuf.hasMemoryAddress()) {
293 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
294 } else {
295 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
296 localReadAmount = socket.read(buf, buf.position(), buf.limit());
297 }
298 if (localReadAmount > 0) {
299 byteBuf.writerIndex(writerIndex + localReadAmount);
300 }
301 return localReadAmount;
302 }
303
304 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
305 if (buf.hasMemoryAddress()) {
306 int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
307 if (localFlushedAmount > 0) {
308 in.removeBytes(localFlushedAmount);
309 return 1;
310 }
311 } else {
312 final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
313 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
314 int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
315 if (localFlushedAmount > 0) {
316 nioBuf.position(nioBuf.position() + localFlushedAmount);
317 in.removeBytes(localFlushedAmount);
318 return 1;
319 }
320 }
321 return WRITE_STATUS_SNDBUF_FULL;
322 }
323
324 final boolean shouldBreakReadReady(ChannelConfig config) {
325 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
326 }
327
328 private static boolean isAllowHalfClosure(ChannelConfig config) {
329 if (config instanceof KQueueDomainSocketChannelConfig) {
330 return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
331 }
332
333 return config instanceof SocketChannelConfig &&
334 ((SocketChannelConfig) config).isAllowHalfClosure();
335 }
336
337 final void clearReadFilter() {
338
339 if (isRegistered()) {
340 final EventLoop loop = eventLoop();
341 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
342 if (loop.inEventLoop()) {
343 unsafe.clearReadFilter0();
344 } else {
345
346 loop.execute(new Runnable() {
347 @Override
348 public void run() {
349 if (!unsafe.readPending && !config().isAutoRead()) {
350
351 unsafe.clearReadFilter0();
352 }
353 }
354 });
355 }
356 } else {
357
358
359 readFilterEnabled = false;
360 }
361 }
362
363 void readFilter(boolean readFilterEnabled) throws IOException {
364 if (this.readFilterEnabled != readFilterEnabled) {
365 this.readFilterEnabled = readFilterEnabled;
366 submit(readFilterEnabled ? Native.READ_ENABLED_OPS : Native.READ_DISABLED_OPS);
367 }
368 }
369
370 void writeFilter(boolean writeFilterEnabled) throws IOException {
371 if (this.writeFilterEnabled != writeFilterEnabled) {
372 this.writeFilterEnabled = writeFilterEnabled;
373 submit(writeFilterEnabled ? Native.WRITE_ENABLED_OPS : Native.WRITE_DISABLED_OPS);
374 }
375 }
376
377 @UnstableApi
378 public abstract class AbstractKQueueUnsafe extends AbstractUnsafe implements KQueueIoHandle {
379 boolean readPending;
380 private KQueueRecvByteAllocatorHandle allocHandle;
381
382 Channel channel() {
383 return AbstractKQueueChannel.this;
384 }
385
386 @Override
387 public int ident() {
388 return fd().intValue();
389 }
390
391 @Override
392 public void close() {
393 close(voidPromise());
394 }
395
396 @Override
397 public void handle(IoRegistration registration, IoEvent event) {
398 KQueueIoEvent kqueueEvent = (KQueueIoEvent) event;
399 final short filter = kqueueEvent.filter();
400 final short flags = kqueueEvent.flags();
401 final int fflags = kqueueEvent.fflags();
402 final long data = kqueueEvent.data();
403
404
405
406 if (filter == Native.EVFILT_WRITE) {
407 writeReady();
408 } else if (filter == Native.EVFILT_READ) {
409
410 KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
411 readReady(allocHandle);
412 } else if (filter == Native.EVFILT_SOCK && (fflags & Native.NOTE_RDHUP) != 0) {
413 readEOF();
414 return;
415 }
416
417
418
419
420 if ((flags & Native.EV_EOF) != 0) {
421 readEOF();
422 }
423 }
424
425 abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
426
427 final boolean shouldStopReading(ChannelConfig config) {
428
429
430
431
432
433
434 return !readPending && !config.isAutoRead();
435 }
436
437 final boolean failConnectPromise(Throwable cause) {
438 if (connectPromise != null) {
439
440
441
442 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
443 AbstractKQueueChannel.this.connectPromise = null;
444 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
445 : new ConnectException("failed to connect").initCause(cause))) {
446 closeIfClosed();
447 return true;
448 }
449 }
450 return false;
451 }
452
453 private void writeReady() {
454 if (connectPromise != null) {
455
456 finishConnect();
457 } else if (!socket.isOutputShutdown()) {
458
459 super.flush0();
460 }
461 }
462
463
464
465
466 void shutdownInput(boolean readEOF) {
467
468
469
470
471
472 if (readEOF && connectPromise != null) {
473 finishConnect();
474 }
475 if (!socket.isInputShutdown()) {
476 if (isAllowHalfClosure(config())) {
477 try {
478 socket.shutdown(true, false);
479 } catch (IOException ignored) {
480
481
482 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
483 return;
484 } catch (NotYetConnectedException ignore) {
485
486
487 }
488 if (shouldStopReading(config())) {
489 clearReadFilter0();
490 }
491 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
492 } else {
493 close(voidPromise());
494 return;
495 }
496 }
497 if (!readEOF && !inputClosedSeenErrorOnRead) {
498 inputClosedSeenErrorOnRead = true;
499 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
500 }
501 }
502
503 private void readEOF() {
504
505 final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
506 allocHandle.readEOF();
507
508 if (isActive()) {
509
510
511
512 readReady(allocHandle);
513 } else {
514
515 shutdownInput(true);
516 }
517
518
519 clearRdHup0();
520 }
521
522 @Override
523 public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
524 if (allocHandle == null) {
525 allocHandle = new KQueueRecvByteAllocatorHandle(
526 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
527 }
528 return allocHandle;
529 }
530
531 @Override
532 protected final void flush0() {
533
534
535
536 if (!writeFilterEnabled) {
537 super.flush0();
538 }
539 }
540
541 protected final void clearReadFilter0() {
542 assert eventLoop().inEventLoop();
543 try {
544 readPending = false;
545 readFilter(false);
546 } catch (IOException e) {
547
548
549 pipeline().fireExceptionCaught(e);
550 unsafe().close(unsafe().voidPromise());
551 }
552 }
553
554 private void fireEventAndClose(Object evt) {
555 pipeline().fireUserEventTriggered(evt);
556 close(voidPromise());
557 }
558
559 @Override
560 public void connect(
561 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
562
563
564 if (promise.isDone() || !ensureOpen(promise)) {
565 return;
566 }
567
568 try {
569 if (connectPromise != null) {
570 throw new ConnectionPendingException();
571 }
572
573 boolean wasActive = isActive();
574 if (doConnect(remoteAddress, localAddress)) {
575 fulfillConnectPromise(promise, wasActive);
576 } else {
577 connectPromise = promise;
578 requestedRemoteAddress = remoteAddress;
579
580
581 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
582 if (connectTimeoutMillis > 0) {
583 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
584 @Override
585 public void run() {
586 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
587 if (connectPromise != null && !connectPromise.isDone()
588 && connectPromise.tryFailure(new ConnectTimeoutException(
589 "connection timed out after " + connectTimeoutMillis + " ms: " +
590 remoteAddress))) {
591 close(voidPromise());
592 }
593 }
594 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
595 }
596
597 promise.addListener(new ChannelFutureListener() {
598 @Override
599 public void operationComplete(ChannelFuture future) {
600
601
602 if (future.isCancelled()) {
603 if (connectTimeoutFuture != null) {
604 connectTimeoutFuture.cancel(false);
605 }
606 connectPromise = null;
607 close(voidPromise());
608 }
609 }
610 });
611 }
612 } catch (Throwable t) {
613 closeIfClosed();
614 promise.tryFailure(annotateConnectException(t, remoteAddress));
615 }
616 }
617
618 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
619 if (promise == null) {
620
621 return;
622 }
623 active = true;
624
625
626
627 boolean active = isActive();
628
629
630 boolean promiseSet = promise.trySuccess();
631
632
633
634 if (!wasActive && active) {
635 pipeline().fireChannelActive();
636 }
637
638
639 if (!promiseSet) {
640 close(voidPromise());
641 }
642 }
643
644 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
645 if (promise == null) {
646
647 return;
648 }
649
650
651 promise.tryFailure(cause);
652 closeIfClosed();
653 }
654
655 private void finishConnect() {
656
657
658
659 assert eventLoop().inEventLoop();
660
661 boolean connectStillInProgress = false;
662 try {
663 boolean wasActive = isActive();
664 if (!doFinishConnect()) {
665 connectStillInProgress = true;
666 return;
667 }
668 fulfillConnectPromise(connectPromise, wasActive);
669 } catch (Throwable t) {
670 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
671 } finally {
672 if (!connectStillInProgress) {
673
674
675 if (connectTimeoutFuture != null) {
676 connectTimeoutFuture.cancel(false);
677 }
678 connectPromise = null;
679 }
680 }
681 }
682
683 private boolean doFinishConnect() throws Exception {
684 if (socket.finishConnect()) {
685 writeFilter(false);
686 if (requestedRemoteAddress instanceof InetSocketAddress) {
687 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
688 }
689 requestedRemoteAddress = null;
690 return true;
691 }
692 writeFilter(true);
693 return false;
694 }
695 }
696
697 @Override
698 protected void doBind(SocketAddress local) throws Exception {
699 if (local instanceof InetSocketAddress) {
700 checkResolvable((InetSocketAddress) local);
701 }
702 socket.bind(local);
703 this.local = socket.localAddress();
704 }
705
706
707
708
709 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
710 if (localAddress instanceof InetSocketAddress) {
711 checkResolvable((InetSocketAddress) localAddress);
712 }
713
714 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
715 ? (InetSocketAddress) remoteAddress : null;
716 if (remoteSocketAddr != null) {
717 checkResolvable(remoteSocketAddr);
718 }
719
720 if (remote != null) {
721
722
723
724 throw new AlreadyConnectedException();
725 }
726
727 if (localAddress != null) {
728 socket.bind(localAddress);
729 }
730
731 boolean connected = doConnect0(remoteAddress, localAddress);
732 if (connected) {
733 remote = remoteSocketAddr == null?
734 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
735 }
736
737
738
739 local = socket.localAddress();
740 return connected;
741 }
742
743 protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
744 boolean success = false;
745 try {
746 boolean connected = socket.connect(remoteAddress);
747 if (!connected) {
748 writeFilter(true);
749 }
750 success = true;
751 return connected;
752 } finally {
753 if (!success) {
754 doClose();
755 }
756 }
757 }
758
759 @Override
760 protected SocketAddress localAddress0() {
761 return local;
762 }
763
764 @Override
765 protected SocketAddress remoteAddress0() {
766 return remote;
767 }
768 }