1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.RecvByteBufAllocator;
34 import io.netty.channel.socket.ChannelInputShutdownEvent;
35 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36 import io.netty.channel.socket.SocketChannelConfig;
37 import io.netty.channel.unix.FileDescriptor;
38 import io.netty.channel.unix.UnixChannel;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.Future;
41 import io.netty.util.internal.UnstableApi;
42
43 import java.io.IOException;
44 import java.net.ConnectException;
45 import java.net.InetSocketAddress;
46 import java.net.SocketAddress;
47 import java.nio.ByteBuffer;
48 import java.nio.channels.AlreadyConnectedException;
49 import java.nio.channels.ConnectionPendingException;
50 import java.nio.channels.NotYetConnectedException;
51 import java.nio.channels.UnresolvedAddressException;
52 import java.util.concurrent.TimeUnit;
53
54 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
55 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
56 import static io.netty.util.internal.ObjectUtil.checkNotNull;
57
58 abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
59 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
60
61
62
63
64 private ChannelPromise connectPromise;
65 private Future<?> connectTimeoutFuture;
66 private SocketAddress requestedRemoteAddress;
67
68 final BsdSocket socket;
69 private boolean readFilterEnabled;
70 private boolean writeFilterEnabled;
71 KQueueEventLoop.KQueueRegistration registration;
72 boolean readReadyRunnablePending;
73 boolean inputClosedSeenErrorOnRead;
74 protected volatile boolean active;
75 private volatile SocketAddress local;
76 private volatile SocketAddress remote;
77
78 AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
79 super(parent);
80 socket = checkNotNull(fd, "fd");
81 this.active = active;
82 if (active) {
83
84
85 local = fd.localAddress();
86 remote = fd.remoteAddress();
87 }
88 }
89
90 AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
91 super(parent);
92 socket = checkNotNull(fd, "fd");
93 active = true;
94
95
96 this.remote = remote;
97 local = fd.localAddress();
98 }
99
100 static boolean isSoErrorZero(BsdSocket fd) {
101 try {
102 return fd.getSoError() == 0;
103 } catch (IOException e) {
104 throw new ChannelException(e);
105 }
106 }
107
108 @Override
109 public final FileDescriptor fd() {
110 return socket;
111 }
112
113 @Override
114 public boolean isActive() {
115 return active;
116 }
117
118 @Override
119 public ChannelMetadata metadata() {
120 return METADATA;
121 }
122
123 @Override
124 protected void doClose() throws Exception {
125 active = false;
126
127
128 inputClosedSeenErrorOnRead = true;
129 socket.close();
130 }
131
132 @Override
133 protected void doDisconnect() throws Exception {
134 doClose();
135 }
136
137 void resetCachedAddresses() {
138 local = socket.localAddress();
139 remote = socket.remoteAddress();
140 }
141
142 @Override
143 protected boolean isCompatible(EventLoop loop) {
144 return loop instanceof KQueueEventLoop;
145 }
146
147 @Override
148 public boolean isOpen() {
149 return socket.isOpen();
150 }
151
152 @Override
153 protected void doDeregister() throws Exception {
154 if (registration != null) {
155 registration.remove();
156 }
157
158
159
160 readFilterEnabled = false;
161 writeFilterEnabled = false;
162 }
163
164 void unregisterFilters() throws Exception {
165
166 readFilter(false);
167 writeFilter(false);
168 clearRdHup0();
169 }
170
171 private void clearRdHup0() {
172 evSet0(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP);
173 }
174
175 @Override
176 protected final void doBeginRead() throws Exception {
177
178 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
179 unsafe.readPending = true;
180
181
182
183
184 readFilter(true);
185
186
187
188 if (unsafe.maybeMoreDataToRead) {
189 unsafe.executeReadReadyRunnable(config());
190 }
191 }
192
193 @Override
194 protected void doRegister() throws Exception {
195
196
197
198 readReadyRunnablePending = false;
199
200 registration = ((KQueueEventLoop) eventLoop()).add(this);
201
202
203 if (writeFilterEnabled) {
204 evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
205 }
206 if (readFilterEnabled) {
207 evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
208 }
209 evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
210 }
211
212 @Override
213 protected abstract AbstractKQueueUnsafe newUnsafe();
214
215 @Override
216 public abstract KQueueChannelConfig config();
217
218
219
220
221 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
222 return newDirectBuffer(buf, buf);
223 }
224
225
226
227
228
229
230 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
231 final int readableBytes = buf.readableBytes();
232 if (readableBytes == 0) {
233 ReferenceCountUtil.release(holder);
234 return Unpooled.EMPTY_BUFFER;
235 }
236
237 final ByteBufAllocator alloc = alloc();
238 if (alloc.isDirectBufferPooled()) {
239 return newDirectBuffer0(holder, buf, alloc, readableBytes);
240 }
241
242 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
243 if (directBuf == null) {
244 return newDirectBuffer0(holder, buf, alloc, readableBytes);
245 }
246
247 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
248 ReferenceCountUtil.safeRelease(holder);
249 return directBuf;
250 }
251
252 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
253 final ByteBuf directBuf = alloc.directBuffer(capacity);
254 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
255 ReferenceCountUtil.safeRelease(holder);
256 return directBuf;
257 }
258
259 protected static void checkResolvable(InetSocketAddress addr) {
260 if (addr.isUnresolved()) {
261 throw new UnresolvedAddressException();
262 }
263 }
264
265
266
267
268 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
269 int writerIndex = byteBuf.writerIndex();
270 int localReadAmount;
271 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
272 if (byteBuf.hasMemoryAddress()) {
273 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
274 } else {
275 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
276 localReadAmount = socket.read(buf, buf.position(), buf.limit());
277 }
278 if (localReadAmount > 0) {
279 byteBuf.writerIndex(writerIndex + localReadAmount);
280 }
281 return localReadAmount;
282 }
283
284 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
285 if (buf.hasMemoryAddress()) {
286 int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
287 if (localFlushedAmount > 0) {
288 in.removeBytes(localFlushedAmount);
289 return 1;
290 }
291 } else {
292 final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
293 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
294 int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
295 if (localFlushedAmount > 0) {
296 nioBuf.position(nioBuf.position() + localFlushedAmount);
297 in.removeBytes(localFlushedAmount);
298 return 1;
299 }
300 }
301 return WRITE_STATUS_SNDBUF_FULL;
302 }
303
304 final boolean shouldBreakReadReady(ChannelConfig config) {
305 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
306 }
307
308 private static boolean isAllowHalfClosure(ChannelConfig config) {
309 if (config instanceof KQueueDomainSocketChannelConfig) {
310 return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
311 }
312
313 return config instanceof SocketChannelConfig &&
314 ((SocketChannelConfig) config).isAllowHalfClosure();
315 }
316
317 final void clearReadFilter() {
318
319 if (isRegistered()) {
320 final EventLoop loop = eventLoop();
321 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
322 if (loop.inEventLoop()) {
323 unsafe.clearReadFilter0();
324 } else {
325
326 loop.execute(new Runnable() {
327 @Override
328 public void run() {
329 if (!unsafe.readPending && !config().isAutoRead()) {
330
331 unsafe.clearReadFilter0();
332 }
333 }
334 });
335 }
336 } else {
337
338
339 readFilterEnabled = false;
340 }
341 }
342
343 void readFilter(boolean readFilterEnabled) throws IOException {
344 if (this.readFilterEnabled != readFilterEnabled) {
345 this.readFilterEnabled = readFilterEnabled;
346 evSet(Native.EVFILT_READ, readFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
347 }
348 }
349
350 void writeFilter(boolean writeFilterEnabled) throws IOException {
351 if (this.writeFilterEnabled != writeFilterEnabled) {
352 this.writeFilterEnabled = writeFilterEnabled;
353 evSet(Native.EVFILT_WRITE, writeFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
354 }
355 }
356
357 private void evSet(short filter, short flags) {
358 if (isRegistered()) {
359 evSet0(filter, flags);
360 }
361 }
362
363 private void evSet0(short filter, short flags) {
364 evSet0(filter, flags, 0);
365 }
366
367 private void evSet0(short filter, short flags, int fflags) {
368
369 if (isOpen() && registration != null) {
370 registration.evSet(filter, flags, fflags, 0);
371 }
372 }
373
374 @UnstableApi
375 public abstract class AbstractKQueueUnsafe extends AbstractUnsafe {
376 boolean readPending;
377 boolean maybeMoreDataToRead;
378 private KQueueRecvByteAllocatorHandle allocHandle;
379 private final Runnable readReadyRunnable = new Runnable() {
380 @Override
381 public void run() {
382 readReadyRunnablePending = false;
383 readReady(recvBufAllocHandle());
384 }
385 };
386
387 final void readReady(long numberBytesPending) {
388 KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
389 allocHandle.numberBytesPending(numberBytesPending);
390 readReady(allocHandle);
391 }
392
393 abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
394
395 final void readReadyBefore() {
396 maybeMoreDataToRead = false;
397 }
398
399 final void readReadyFinally(ChannelConfig config) {
400 maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
401
402 if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) {
403
404
405
406
407
408
409
410 executeReadReadyRunnable(config);
411 } else if (!readPending && !config.isAutoRead()) {
412
413
414
415
416
417
418 clearReadFilter0();
419 }
420 }
421
422 final boolean failConnectPromise(Throwable cause) {
423 if (connectPromise != null) {
424
425
426
427 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
428 AbstractKQueueChannel.this.connectPromise = null;
429 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
430 : new ConnectException("failed to connect").initCause(cause))) {
431 closeIfClosed();
432 return true;
433 }
434 }
435 return false;
436 }
437
438 final void writeReady() {
439 if (connectPromise != null) {
440
441 finishConnect();
442 } else if (!socket.isOutputShutdown()) {
443
444 super.flush0();
445 }
446 }
447
448
449
450
451 void shutdownInput(boolean readEOF) {
452
453
454
455
456
457 if (readEOF && connectPromise != null) {
458 finishConnect();
459 }
460 if (!socket.isInputShutdown()) {
461 if (isAllowHalfClosure(config())) {
462 try {
463 socket.shutdown(true, false);
464 } catch (IOException ignored) {
465
466
467 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
468 return;
469 } catch (NotYetConnectedException ignore) {
470
471
472 }
473 clearReadFilter0();
474 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
475 } else {
476 close(voidPromise());
477 }
478 } else if (!readEOF && !inputClosedSeenErrorOnRead) {
479 inputClosedSeenErrorOnRead = true;
480 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
481 }
482 }
483
484 final void readEOF() {
485
486 final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
487 allocHandle.readEOF();
488
489 if (isActive()) {
490
491
492
493 readReady(allocHandle);
494 } else {
495
496 shutdownInput(true);
497 }
498
499
500 clearRdHup0();
501 }
502
503 @Override
504 public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
505 if (allocHandle == null) {
506 allocHandle = new KQueueRecvByteAllocatorHandle(
507 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
508 }
509 return allocHandle;
510 }
511
512 @Override
513 protected final void flush0() {
514
515
516
517 if (!writeFilterEnabled) {
518 super.flush0();
519 }
520 }
521
522 final void executeReadReadyRunnable(ChannelConfig config) {
523 if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
524 return;
525 }
526 readReadyRunnablePending = true;
527 eventLoop().execute(readReadyRunnable);
528 }
529
530 protected final void clearReadFilter0() {
531 assert eventLoop().inEventLoop();
532 try {
533 readPending = false;
534 readFilter(false);
535 } catch (IOException e) {
536
537
538 pipeline().fireExceptionCaught(e);
539 unsafe().close(unsafe().voidPromise());
540 }
541 }
542
543 private void fireEventAndClose(Object evt) {
544 pipeline().fireUserEventTriggered(evt);
545 close(voidPromise());
546 }
547
548 @Override
549 public void connect(
550 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
551
552
553 if (promise.isDone() || !ensureOpen(promise)) {
554 return;
555 }
556
557 try {
558 if (connectPromise != null) {
559 throw new ConnectionPendingException();
560 }
561
562 boolean wasActive = isActive();
563 if (doConnect(remoteAddress, localAddress)) {
564 fulfillConnectPromise(promise, wasActive);
565 } else {
566 connectPromise = promise;
567 requestedRemoteAddress = remoteAddress;
568
569
570 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
571 if (connectTimeoutMillis > 0) {
572 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
573 @Override
574 public void run() {
575 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
576 if (connectPromise != null && !connectPromise.isDone()
577 && connectPromise.tryFailure(new ConnectTimeoutException(
578 "connection timed out after " + connectTimeoutMillis + " ms: " +
579 remoteAddress))) {
580 close(voidPromise());
581 }
582 }
583 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
584 }
585
586 promise.addListener(new ChannelFutureListener() {
587 @Override
588 public void operationComplete(ChannelFuture future) {
589
590
591 if (future.isCancelled()) {
592 if (connectTimeoutFuture != null) {
593 connectTimeoutFuture.cancel(false);
594 }
595 connectPromise = null;
596 close(voidPromise());
597 }
598 }
599 });
600 }
601 } catch (Throwable t) {
602 closeIfClosed();
603 promise.tryFailure(annotateConnectException(t, remoteAddress));
604 }
605 }
606
607 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
608 if (promise == null) {
609
610 return;
611 }
612 active = true;
613
614
615
616 boolean active = isActive();
617
618
619 boolean promiseSet = promise.trySuccess();
620
621
622
623 if (!wasActive && active) {
624 pipeline().fireChannelActive();
625 }
626
627
628 if (!promiseSet) {
629 close(voidPromise());
630 }
631 }
632
633 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
634 if (promise == null) {
635
636 return;
637 }
638
639
640 promise.tryFailure(cause);
641 closeIfClosed();
642 }
643
644 private void finishConnect() {
645
646
647
648 assert eventLoop().inEventLoop();
649
650 boolean connectStillInProgress = false;
651 try {
652 boolean wasActive = isActive();
653 if (!doFinishConnect()) {
654 connectStillInProgress = true;
655 return;
656 }
657 fulfillConnectPromise(connectPromise, wasActive);
658 } catch (Throwable t) {
659 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
660 } finally {
661 if (!connectStillInProgress) {
662
663
664 if (connectTimeoutFuture != null) {
665 connectTimeoutFuture.cancel(false);
666 }
667 connectPromise = null;
668 }
669 }
670 }
671
672 private boolean doFinishConnect() throws Exception {
673 if (socket.finishConnect()) {
674 writeFilter(false);
675 if (requestedRemoteAddress instanceof InetSocketAddress) {
676 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
677 }
678 requestedRemoteAddress = null;
679 return true;
680 }
681 writeFilter(true);
682 return false;
683 }
684 }
685
686 @Override
687 protected void doBind(SocketAddress local) throws Exception {
688 if (local instanceof InetSocketAddress) {
689 checkResolvable((InetSocketAddress) local);
690 }
691 socket.bind(local);
692 this.local = socket.localAddress();
693 }
694
695
696
697
698 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
699 if (localAddress instanceof InetSocketAddress) {
700 checkResolvable((InetSocketAddress) localAddress);
701 }
702
703 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
704 ? (InetSocketAddress) remoteAddress : null;
705 if (remoteSocketAddr != null) {
706 checkResolvable(remoteSocketAddr);
707 }
708
709 if (remote != null) {
710
711
712
713 throw new AlreadyConnectedException();
714 }
715
716 if (localAddress != null) {
717 socket.bind(localAddress);
718 }
719
720 boolean connected = doConnect0(remoteAddress, localAddress);
721 if (connected) {
722 remote = remoteSocketAddr == null?
723 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
724 }
725
726
727
728 local = socket.localAddress();
729 return connected;
730 }
731
732 protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
733 boolean success = false;
734 try {
735 boolean connected = socket.connect(remoteAddress);
736 if (!connected) {
737 writeFilter(true);
738 }
739 success = true;
740 return connected;
741 } finally {
742 if (!success) {
743 doClose();
744 }
745 }
746 }
747
748 @Override
749 protected SocketAddress localAddress0() {
750 return local;
751 }
752
753 @Override
754 protected SocketAddress remoteAddress0() {
755 return remote;
756 }
757 }