1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.FileDescriptor;
41 import io.netty.channel.unix.UnixChannel;
42 import io.netty.util.ReferenceCountUtil;
43 import io.netty.util.concurrent.Future;
44 import io.netty.util.internal.UnstableApi;
45
46 import java.io.IOException;
47 import java.net.ConnectException;
48 import java.net.InetSocketAddress;
49 import java.net.SocketAddress;
50 import java.nio.ByteBuffer;
51 import java.nio.channels.AlreadyConnectedException;
52 import java.nio.channels.ConnectionPendingException;
53 import java.nio.channels.NotYetConnectedException;
54 import java.nio.channels.UnresolvedAddressException;
55 import java.util.concurrent.TimeUnit;
56
57 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
58 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
59 import static io.netty.util.internal.ObjectUtil.checkNotNull;
60
61 abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
62
63 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64
65
66
67
68 private ChannelPromise connectPromise;
69 private Future<?> connectTimeoutFuture;
70 private SocketAddress requestedRemoteAddress;
71
72 final BsdSocket socket;
73 private IoRegistration registration;
74 private boolean readFilterEnabled;
75 private boolean writeFilterEnabled;
76
77 boolean readReadyRunnablePending;
78 boolean inputClosedSeenErrorOnRead;
79 protected volatile boolean active;
80 private volatile SocketAddress local;
81 private volatile SocketAddress remote;
82
83 AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
84 super(parent);
85 socket = checkNotNull(fd, "fd");
86 this.active = active;
87 if (active) {
88
89
90 local = fd.localAddress();
91 remote = fd.remoteAddress();
92 }
93 }
94
95 AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
96 super(parent);
97 socket = checkNotNull(fd, "fd");
98 active = true;
99
100
101 this.remote = remote;
102 local = fd.localAddress();
103 }
104
105 @Override
106 protected boolean isCompatible(EventLoop loop) {
107 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractKQueueUnsafe.class);
108 }
109
110 static boolean isSoErrorZero(BsdSocket fd) {
111 try {
112 return fd.getSoError() == 0;
113 } catch (IOException e) {
114 throw new ChannelException(e);
115 }
116 }
117
118 protected final IoRegistration registration() {
119 assert registration != null;
120 return registration;
121 }
122
123 @Override
124 public final FileDescriptor fd() {
125 return socket;
126 }
127
128 @Override
129 public boolean isActive() {
130 return active;
131 }
132
133 @Override
134 public ChannelMetadata metadata() {
135 return METADATA;
136 }
137
138 @Override
139 protected void doClose() throws Exception {
140 active = false;
141
142
143 inputClosedSeenErrorOnRead = true;
144 socket.close();
145 }
146
147 @Override
148 protected void doDisconnect() throws Exception {
149 doClose();
150 }
151
152 void resetCachedAddresses() {
153 local = socket.localAddress();
154 remote = socket.remoteAddress();
155 }
156
157 @Override
158 public boolean isOpen() {
159 return socket.isOpen();
160 }
161
162 @Override
163 protected void doDeregister() throws Exception {
164
165
166
167 readFilter(false);
168 writeFilter(false);
169 clearRdHup0();
170
171 IoRegistration registration = this.registration;
172 if (registration != null) {
173 registration.cancel();
174 }
175 }
176
177 private void clearRdHup0() {
178 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP));
179 }
180
181 private void submit(KQueueIoOps ops) {
182 try {
183 registration.submit(ops);
184 } catch (Exception e) {
185 throw new ChannelException(e);
186 }
187 }
188
189 @Override
190 protected final void doBeginRead() throws Exception {
191
192 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
193 unsafe.readPending = true;
194
195
196
197
198 readFilter(true);
199 }
200
201 @Override
202 protected void doRegister(ChannelPromise promise) {
203 ((IoEventLoop) eventLoop()).register((AbstractKQueueUnsafe) unsafe()).addListener(f -> {
204 if (f.isSuccess()) {
205 this.registration = (IoRegistration) f.getNow();
206
207
208
209 readReadyRunnablePending = false;
210
211 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP));
212
213
214 if (writeFilterEnabled) {
215 submit(Native.WRITE_ENABLED_OPS);
216 }
217 if (readFilterEnabled) {
218 submit(Native.READ_ENABLED_OPS);
219 }
220 promise.setSuccess();
221 } else {
222 promise.setFailure(f.cause());
223 }
224 });
225 }
226
227 @Override
228 protected abstract AbstractKQueueUnsafe newUnsafe();
229
230 @Override
231 public abstract KQueueChannelConfig config();
232
233
234
235
236 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
237 return newDirectBuffer(buf, buf);
238 }
239
240
241
242
243
244
245 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
246 final int readableBytes = buf.readableBytes();
247 if (readableBytes == 0) {
248 ReferenceCountUtil.release(holder);
249 return Unpooled.EMPTY_BUFFER;
250 }
251
252 final ByteBufAllocator alloc = alloc();
253 if (alloc.isDirectBufferPooled()) {
254 return newDirectBuffer0(holder, buf, alloc, readableBytes);
255 }
256
257 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
258 if (directBuf == null) {
259 return newDirectBuffer0(holder, buf, alloc, readableBytes);
260 }
261
262 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
263 ReferenceCountUtil.safeRelease(holder);
264 return directBuf;
265 }
266
267 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
268 final ByteBuf directBuf = alloc.directBuffer(capacity);
269 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
270 ReferenceCountUtil.safeRelease(holder);
271 return directBuf;
272 }
273
274 protected static void checkResolvable(InetSocketAddress addr) {
275 if (addr.isUnresolved()) {
276 throw new UnresolvedAddressException();
277 }
278 }
279
280
281
282
283 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
284 int writerIndex = byteBuf.writerIndex();
285 int localReadAmount;
286 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
287 if (byteBuf.hasMemoryAddress()) {
288 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
289 } else {
290 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
291 localReadAmount = socket.read(buf, buf.position(), buf.limit());
292 }
293 if (localReadAmount > 0) {
294 byteBuf.writerIndex(writerIndex + localReadAmount);
295 }
296 return localReadAmount;
297 }
298
299 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
300 if (buf.hasMemoryAddress()) {
301 int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
302 if (localFlushedAmount > 0) {
303 in.removeBytes(localFlushedAmount);
304 return 1;
305 }
306 } else {
307 final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
308 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
309 int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
310 if (localFlushedAmount > 0) {
311 nioBuf.position(nioBuf.position() + localFlushedAmount);
312 in.removeBytes(localFlushedAmount);
313 return 1;
314 }
315 }
316 return WRITE_STATUS_SNDBUF_FULL;
317 }
318
319 final boolean shouldBreakReadReady(ChannelConfig config) {
320 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
321 }
322
323 private static boolean isAllowHalfClosure(ChannelConfig config) {
324 if (config instanceof KQueueDomainSocketChannelConfig) {
325 return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
326 }
327
328 return config instanceof SocketChannelConfig &&
329 ((SocketChannelConfig) config).isAllowHalfClosure();
330 }
331
332 final void clearReadFilter() {
333
334 if (isRegistered()) {
335 final EventLoop loop = eventLoop();
336 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
337 if (loop.inEventLoop()) {
338 unsafe.clearReadFilter0();
339 } else {
340
341 loop.execute(new Runnable() {
342 @Override
343 public void run() {
344 if (!unsafe.readPending && !config().isAutoRead()) {
345
346 unsafe.clearReadFilter0();
347 }
348 }
349 });
350 }
351 } else {
352
353
354 readFilterEnabled = false;
355 }
356 }
357
358 void readFilter(boolean readFilterEnabled) throws IOException {
359 if (this.readFilterEnabled != readFilterEnabled) {
360 this.readFilterEnabled = readFilterEnabled;
361 submit(readFilterEnabled ? Native.READ_ENABLED_OPS : Native.READ_DISABLED_OPS);
362 }
363 }
364
365 void writeFilter(boolean writeFilterEnabled) throws IOException {
366 if (this.writeFilterEnabled != writeFilterEnabled) {
367 this.writeFilterEnabled = writeFilterEnabled;
368 submit(writeFilterEnabled ? Native.WRITE_ENABLED_OPS : Native.WRITE_DISABLED_OPS);
369 }
370 }
371
372 @UnstableApi
373 public abstract class AbstractKQueueUnsafe extends AbstractUnsafe implements KQueueIoHandle {
374 boolean readPending;
375 private KQueueRecvByteAllocatorHandle allocHandle;
376
377 Channel channel() {
378 return AbstractKQueueChannel.this;
379 }
380
381 @Override
382 public int ident() {
383 return fd().intValue();
384 }
385
386 @Override
387 public void close() {
388 close(voidPromise());
389 }
390
391 @Override
392 public void handle(IoRegistration registration, IoEvent event) {
393 KQueueIoEvent kqueueEvent = (KQueueIoEvent) event;
394 final short filter = kqueueEvent.filter();
395 final short flags = kqueueEvent.flags();
396 final int fflags = kqueueEvent.fflags();
397 final long data = kqueueEvent.data();
398
399
400
401 if (filter == Native.EVFILT_WRITE) {
402 writeReady();
403 } else if (filter == Native.EVFILT_READ) {
404
405 KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
406 readReady(allocHandle);
407 } else if (filter == Native.EVFILT_SOCK && (fflags & Native.NOTE_RDHUP) != 0) {
408 readEOF();
409 return;
410 }
411
412
413
414
415 if ((flags & Native.EV_EOF) != 0) {
416 readEOF();
417 }
418 }
419
420 abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
421
422 final boolean shouldStopReading(ChannelConfig config) {
423
424
425
426
427
428
429 return !readPending && !config.isAutoRead();
430 }
431
432 final boolean failConnectPromise(Throwable cause) {
433 if (connectPromise != null) {
434
435
436
437 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
438 AbstractKQueueChannel.this.connectPromise = null;
439 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
440 : new ConnectException("failed to connect").initCause(cause))) {
441 closeIfClosed();
442 return true;
443 }
444 }
445 return false;
446 }
447
448 private void writeReady() {
449 if (connectPromise != null) {
450
451 finishConnect();
452 } else if (!socket.isOutputShutdown()) {
453
454 super.flush0();
455 }
456 }
457
458
459
460
461 void shutdownInput(boolean readEOF) {
462
463
464
465
466
467 if (readEOF && connectPromise != null) {
468 finishConnect();
469 }
470 if (!socket.isInputShutdown()) {
471 if (isAllowHalfClosure(config())) {
472 try {
473 socket.shutdown(true, false);
474 } catch (IOException ignored) {
475
476
477 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
478 return;
479 } catch (NotYetConnectedException ignore) {
480
481
482 }
483 if (shouldStopReading(config())) {
484 clearReadFilter0();
485 }
486 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
487 } else {
488 close(voidPromise());
489 return;
490 }
491 }
492 if (!readEOF && !inputClosedSeenErrorOnRead) {
493 inputClosedSeenErrorOnRead = true;
494 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
495 }
496 }
497
498 private void readEOF() {
499
500 final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
501 allocHandle.readEOF();
502
503 if (isActive()) {
504
505
506
507 readReady(allocHandle);
508 } else {
509
510 shutdownInput(true);
511 }
512
513
514 clearRdHup0();
515 }
516
517 @Override
518 public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
519 if (allocHandle == null) {
520 allocHandle = new KQueueRecvByteAllocatorHandle(
521 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
522 }
523 return allocHandle;
524 }
525
526 @Override
527 protected final void flush0() {
528
529
530
531 if (!writeFilterEnabled) {
532 super.flush0();
533 }
534 }
535
536 protected final void clearReadFilter0() {
537 assert eventLoop().inEventLoop();
538 try {
539 readPending = false;
540 readFilter(false);
541 } catch (IOException e) {
542
543
544 pipeline().fireExceptionCaught(e);
545 unsafe().close(unsafe().voidPromise());
546 }
547 }
548
549 private void fireEventAndClose(Object evt) {
550 pipeline().fireUserEventTriggered(evt);
551 close(voidPromise());
552 }
553
554 @Override
555 public void connect(
556 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
557
558
559 if (promise.isDone() || !ensureOpen(promise)) {
560 return;
561 }
562
563 try {
564 if (connectPromise != null) {
565 throw new ConnectionPendingException();
566 }
567
568 boolean wasActive = isActive();
569 if (doConnect(remoteAddress, localAddress)) {
570 fulfillConnectPromise(promise, wasActive);
571 } else {
572 connectPromise = promise;
573 requestedRemoteAddress = remoteAddress;
574
575
576 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
577 if (connectTimeoutMillis > 0) {
578 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
579 @Override
580 public void run() {
581 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
582 if (connectPromise != null && !connectPromise.isDone()
583 && connectPromise.tryFailure(new ConnectTimeoutException(
584 "connection timed out after " + connectTimeoutMillis + " ms: " +
585 remoteAddress))) {
586 close(voidPromise());
587 }
588 }
589 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
590 }
591
592 promise.addListener(new ChannelFutureListener() {
593 @Override
594 public void operationComplete(ChannelFuture future) {
595
596
597 if (future.isCancelled()) {
598 if (connectTimeoutFuture != null) {
599 connectTimeoutFuture.cancel(false);
600 }
601 connectPromise = null;
602 close(voidPromise());
603 }
604 }
605 });
606 }
607 } catch (Throwable t) {
608 closeIfClosed();
609 promise.tryFailure(annotateConnectException(t, remoteAddress));
610 }
611 }
612
613 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
614 if (promise == null) {
615
616 return;
617 }
618 active = true;
619
620
621
622 boolean active = isActive();
623
624
625 boolean promiseSet = promise.trySuccess();
626
627
628
629 if (!wasActive && active) {
630 pipeline().fireChannelActive();
631 }
632
633
634 if (!promiseSet) {
635 close(voidPromise());
636 }
637 }
638
639 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
640 if (promise == null) {
641
642 return;
643 }
644
645
646 promise.tryFailure(cause);
647 closeIfClosed();
648 }
649
650 private void finishConnect() {
651
652
653
654 assert eventLoop().inEventLoop();
655
656 boolean connectStillInProgress = false;
657 try {
658 boolean wasActive = isActive();
659 if (!doFinishConnect()) {
660 connectStillInProgress = true;
661 return;
662 }
663 fulfillConnectPromise(connectPromise, wasActive);
664 } catch (Throwable t) {
665 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
666 } finally {
667 if (!connectStillInProgress) {
668
669
670 if (connectTimeoutFuture != null) {
671 connectTimeoutFuture.cancel(false);
672 }
673 connectPromise = null;
674 }
675 }
676 }
677
678 private boolean doFinishConnect() throws Exception {
679 if (socket.finishConnect()) {
680 writeFilter(false);
681 if (requestedRemoteAddress instanceof InetSocketAddress) {
682 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
683 }
684 requestedRemoteAddress = null;
685 return true;
686 }
687 writeFilter(true);
688 return false;
689 }
690 }
691
692 @Override
693 protected void doBind(SocketAddress local) throws Exception {
694 if (local instanceof InetSocketAddress) {
695 checkResolvable((InetSocketAddress) local);
696 }
697 socket.bind(local);
698 this.local = socket.localAddress();
699 }
700
701
702
703
704 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
705 if (localAddress instanceof InetSocketAddress) {
706 checkResolvable((InetSocketAddress) localAddress);
707 }
708
709 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
710 ? (InetSocketAddress) remoteAddress : null;
711 if (remoteSocketAddr != null) {
712 checkResolvable(remoteSocketAddr);
713 }
714
715 if (remote != null) {
716
717
718
719 throw new AlreadyConnectedException();
720 }
721
722 if (localAddress != null) {
723 socket.bind(localAddress);
724 }
725
726 boolean connected = doConnect0(remoteAddress, localAddress);
727 if (connected) {
728 remote = remoteSocketAddr == null?
729 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
730 }
731
732
733
734 local = socket.localAddress();
735 return connected;
736 }
737
738 protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
739 boolean success = false;
740 try {
741 boolean connected = socket.connect(remoteAddress);
742 if (!connected) {
743 writeFilter(true);
744 }
745 success = true;
746 return connected;
747 } finally {
748 if (!success) {
749 doClose();
750 }
751 }
752 }
753
754 @Override
755 protected SocketAddress localAddress0() {
756 return local;
757 }
758
759 @Override
760 protected SocketAddress remoteAddress0() {
761 return remote;
762 }
763 }