1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.RecvByteBufAllocator;
34 import io.netty.channel.socket.ChannelInputShutdownEvent;
35 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36 import io.netty.channel.socket.SocketChannelConfig;
37 import io.netty.channel.unix.FileDescriptor;
38 import io.netty.channel.unix.UnixChannel;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.Future;
41 import io.netty.util.internal.UnstableApi;
42
43 import java.io.IOException;
44 import java.net.ConnectException;
45 import java.net.InetSocketAddress;
46 import java.net.SocketAddress;
47 import java.nio.ByteBuffer;
48 import java.nio.channels.AlreadyConnectedException;
49 import java.nio.channels.ConnectionPendingException;
50 import java.nio.channels.NotYetConnectedException;
51 import java.nio.channels.UnresolvedAddressException;
52 import java.util.concurrent.TimeUnit;
53
54 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
55 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
56 import static io.netty.util.internal.ObjectUtil.checkNotNull;
57
58 abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
59 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
60
61
62
63
64 private ChannelPromise connectPromise;
65 private Future<?> connectTimeoutFuture;
66 private SocketAddress requestedRemoteAddress;
67
68 final BsdSocket socket;
69 private boolean readFilterEnabled;
70 private boolean writeFilterEnabled;
71 boolean readReadyRunnablePending;
72 boolean inputClosedSeenErrorOnRead;
73 protected volatile boolean active;
74 private volatile SocketAddress local;
75 private volatile SocketAddress remote;
76
77 AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
78 super(parent);
79 socket = checkNotNull(fd, "fd");
80 this.active = active;
81 if (active) {
82
83
84 local = fd.localAddress();
85 remote = fd.remoteAddress();
86 }
87 }
88
89 AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
90 super(parent);
91 socket = checkNotNull(fd, "fd");
92 active = true;
93
94
95 this.remote = remote;
96 local = fd.localAddress();
97 }
98
99 static boolean isSoErrorZero(BsdSocket fd) {
100 try {
101 return fd.getSoError() == 0;
102 } catch (IOException e) {
103 throw new ChannelException(e);
104 }
105 }
106
107 @Override
108 public final FileDescriptor fd() {
109 return socket;
110 }
111
112 @Override
113 public boolean isActive() {
114 return active;
115 }
116
117 @Override
118 public ChannelMetadata metadata() {
119 return METADATA;
120 }
121
122 @Override
123 protected void doClose() throws Exception {
124 active = false;
125
126
127 inputClosedSeenErrorOnRead = true;
128 socket.close();
129 }
130
131 @Override
132 protected void doDisconnect() throws Exception {
133 doClose();
134 }
135
136 void resetCachedAddresses() {
137 local = socket.localAddress();
138 remote = socket.remoteAddress();
139 }
140
141 @Override
142 protected boolean isCompatible(EventLoop loop) {
143 return loop instanceof KQueueEventLoop;
144 }
145
146 @Override
147 public boolean isOpen() {
148 return socket.isOpen();
149 }
150
151 @Override
152 protected void doDeregister() throws Exception {
153 ((KQueueEventLoop) eventLoop()).remove(this);
154
155
156
157 readFilterEnabled = false;
158 writeFilterEnabled = false;
159 }
160
161 void unregisterFilters() throws Exception {
162
163 readFilter(false);
164 writeFilter(false);
165 evSet0(Native.EVFILT_SOCK, Native.EV_DELETE, 0);
166 }
167
168 @Override
169 protected final void doBeginRead() throws Exception {
170
171 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
172 unsafe.readPending = true;
173
174
175
176
177 readFilter(true);
178
179
180
181 if (unsafe.maybeMoreDataToRead) {
182 unsafe.executeReadReadyRunnable(config());
183 }
184 }
185
186 @Override
187 protected void doRegister() throws Exception {
188
189
190
191 readReadyRunnablePending = false;
192
193 ((KQueueEventLoop) eventLoop()).add(this);
194
195
196 if (writeFilterEnabled) {
197 evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
198 }
199 if (readFilterEnabled) {
200 evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
201 }
202 evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
203 }
204
205 @Override
206 protected abstract AbstractKQueueUnsafe newUnsafe();
207
208 @Override
209 public abstract KQueueChannelConfig config();
210
211
212
213
214 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
215 return newDirectBuffer(buf, buf);
216 }
217
218
219
220
221
222
223 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
224 final int readableBytes = buf.readableBytes();
225 if (readableBytes == 0) {
226 ReferenceCountUtil.release(holder);
227 return Unpooled.EMPTY_BUFFER;
228 }
229
230 final ByteBufAllocator alloc = alloc();
231 if (alloc.isDirectBufferPooled()) {
232 return newDirectBuffer0(holder, buf, alloc, readableBytes);
233 }
234
235 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
236 if (directBuf == null) {
237 return newDirectBuffer0(holder, buf, alloc, readableBytes);
238 }
239
240 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
241 ReferenceCountUtil.safeRelease(holder);
242 return directBuf;
243 }
244
245 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
246 final ByteBuf directBuf = alloc.directBuffer(capacity);
247 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
248 ReferenceCountUtil.safeRelease(holder);
249 return directBuf;
250 }
251
252 protected static void checkResolvable(InetSocketAddress addr) {
253 if (addr.isUnresolved()) {
254 throw new UnresolvedAddressException();
255 }
256 }
257
258
259
260
261 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
262 int writerIndex = byteBuf.writerIndex();
263 int localReadAmount;
264 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
265 if (byteBuf.hasMemoryAddress()) {
266 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
267 } else {
268 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
269 localReadAmount = socket.read(buf, buf.position(), buf.limit());
270 }
271 if (localReadAmount > 0) {
272 byteBuf.writerIndex(writerIndex + localReadAmount);
273 }
274 return localReadAmount;
275 }
276
277 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
278 if (buf.hasMemoryAddress()) {
279 int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
280 if (localFlushedAmount > 0) {
281 in.removeBytes(localFlushedAmount);
282 return 1;
283 }
284 } else {
285 final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
286 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
287 int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
288 if (localFlushedAmount > 0) {
289 nioBuf.position(nioBuf.position() + localFlushedAmount);
290 in.removeBytes(localFlushedAmount);
291 return 1;
292 }
293 }
294 return WRITE_STATUS_SNDBUF_FULL;
295 }
296
297 final boolean shouldBreakReadReady(ChannelConfig config) {
298 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
299 }
300
301 private static boolean isAllowHalfClosure(ChannelConfig config) {
302 if (config instanceof KQueueDomainSocketChannelConfig) {
303 return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
304 }
305
306 return config instanceof SocketChannelConfig &&
307 ((SocketChannelConfig) config).isAllowHalfClosure();
308 }
309
310 final void clearReadFilter() {
311
312 if (isRegistered()) {
313 final EventLoop loop = eventLoop();
314 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
315 if (loop.inEventLoop()) {
316 unsafe.clearReadFilter0();
317 } else {
318
319 loop.execute(new Runnable() {
320 @Override
321 public void run() {
322 if (!unsafe.readPending && !config().isAutoRead()) {
323
324 unsafe.clearReadFilter0();
325 }
326 }
327 });
328 }
329 } else {
330
331
332 readFilterEnabled = false;
333 }
334 }
335
336 void readFilter(boolean readFilterEnabled) throws IOException {
337 if (this.readFilterEnabled != readFilterEnabled) {
338 this.readFilterEnabled = readFilterEnabled;
339 evSet(Native.EVFILT_READ, readFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
340 }
341 }
342
343 void writeFilter(boolean writeFilterEnabled) throws IOException {
344 if (this.writeFilterEnabled != writeFilterEnabled) {
345 this.writeFilterEnabled = writeFilterEnabled;
346 evSet(Native.EVFILT_WRITE, writeFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
347 }
348 }
349
350 private void evSet(short filter, short flags) {
351 if (isRegistered()) {
352 evSet0(filter, flags);
353 }
354 }
355
356 private void evSet0(short filter, short flags) {
357 evSet0(filter, flags, 0);
358 }
359
360 private void evSet0(short filter, short flags, int fflags) {
361
362 if (isOpen()) {
363 ((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags);
364 }
365 }
366
367 @UnstableApi
368 public abstract class AbstractKQueueUnsafe extends AbstractUnsafe {
369 boolean readPending;
370 boolean maybeMoreDataToRead;
371 private KQueueRecvByteAllocatorHandle allocHandle;
372 private final Runnable readReadyRunnable = new Runnable() {
373 @Override
374 public void run() {
375 readReadyRunnablePending = false;
376 readReady(recvBufAllocHandle());
377 }
378 };
379
380 final void readReady(long numberBytesPending) {
381 KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
382 allocHandle.numberBytesPending(numberBytesPending);
383 readReady(allocHandle);
384 }
385
386 abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
387
388 final void readReadyBefore() {
389 maybeMoreDataToRead = false;
390 }
391
392 final void readReadyFinally(ChannelConfig config) {
393 maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
394
395 if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) {
396
397
398
399
400
401
402
403 executeReadReadyRunnable(config);
404 } else if (!readPending && !config.isAutoRead()) {
405
406
407
408
409
410
411 clearReadFilter0();
412 }
413 }
414
415 final boolean failConnectPromise(Throwable cause) {
416 if (connectPromise != null) {
417
418
419
420 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
421 AbstractKQueueChannel.this.connectPromise = null;
422 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
423 : new ConnectException("failed to connect").initCause(cause))) {
424 closeIfClosed();
425 return true;
426 }
427 }
428 return false;
429 }
430
431 final void writeReady() {
432 if (connectPromise != null) {
433
434 finishConnect();
435 } else if (!socket.isOutputShutdown()) {
436
437 super.flush0();
438 }
439 }
440
441
442
443
444 void shutdownInput(boolean readEOF) {
445
446
447
448
449
450 if (readEOF && connectPromise != null) {
451 finishConnect();
452 }
453 if (!socket.isInputShutdown()) {
454 if (isAllowHalfClosure(config())) {
455 try {
456 socket.shutdown(true, false);
457 } catch (IOException ignored) {
458
459
460 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
461 return;
462 } catch (NotYetConnectedException ignore) {
463
464
465 }
466 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
467 } else {
468 close(voidPromise());
469 }
470 } else if (!readEOF) {
471 inputClosedSeenErrorOnRead = true;
472 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
473 }
474 }
475
476 final void readEOF() {
477
478 final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
479 allocHandle.readEOF();
480
481 if (isActive()) {
482
483
484
485 readReady(allocHandle);
486 } else {
487
488 shutdownInput(true);
489 }
490 }
491
492 @Override
493 public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
494 if (allocHandle == null) {
495 allocHandle = new KQueueRecvByteAllocatorHandle(
496 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
497 }
498 return allocHandle;
499 }
500
501 @Override
502 protected final void flush0() {
503
504
505
506 if (!writeFilterEnabled) {
507 super.flush0();
508 }
509 }
510
511 final void executeReadReadyRunnable(ChannelConfig config) {
512 if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
513 return;
514 }
515 readReadyRunnablePending = true;
516 eventLoop().execute(readReadyRunnable);
517 }
518
519 protected final void clearReadFilter0() {
520 assert eventLoop().inEventLoop();
521 try {
522 readPending = false;
523 readFilter(false);
524 } catch (IOException e) {
525
526
527 pipeline().fireExceptionCaught(e);
528 unsafe().close(unsafe().voidPromise());
529 }
530 }
531
532 private void fireEventAndClose(Object evt) {
533 pipeline().fireUserEventTriggered(evt);
534 close(voidPromise());
535 }
536
537 @Override
538 public void connect(
539 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
540 if (!promise.setUncancellable() || !ensureOpen(promise)) {
541 return;
542 }
543
544 try {
545 if (connectPromise != null) {
546 throw new ConnectionPendingException();
547 }
548
549 boolean wasActive = isActive();
550 if (doConnect(remoteAddress, localAddress)) {
551 fulfillConnectPromise(promise, wasActive);
552 } else {
553 connectPromise = promise;
554 requestedRemoteAddress = remoteAddress;
555
556
557 int connectTimeoutMillis = config().getConnectTimeoutMillis();
558 if (connectTimeoutMillis > 0) {
559 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
560 @Override
561 public void run() {
562 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
563 if (connectPromise != null && !connectPromise.isDone()
564 && connectPromise.tryFailure(new ConnectTimeoutException(
565 "connection timed out: " + remoteAddress))) {
566 close(voidPromise());
567 }
568 }
569 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
570 }
571
572 promise.addListener(new ChannelFutureListener() {
573 @Override
574 public void operationComplete(ChannelFuture future) throws Exception {
575 if (future.isCancelled()) {
576 if (connectTimeoutFuture != null) {
577 connectTimeoutFuture.cancel(false);
578 }
579 connectPromise = null;
580 close(voidPromise());
581 }
582 }
583 });
584 }
585 } catch (Throwable t) {
586 closeIfClosed();
587 promise.tryFailure(annotateConnectException(t, remoteAddress));
588 }
589 }
590
591 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
592 if (promise == null) {
593
594 return;
595 }
596 active = true;
597
598
599
600 boolean active = isActive();
601
602
603 boolean promiseSet = promise.trySuccess();
604
605
606
607 if (!wasActive && active) {
608 pipeline().fireChannelActive();
609 }
610
611
612 if (!promiseSet) {
613 close(voidPromise());
614 }
615 }
616
617 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
618 if (promise == null) {
619
620 return;
621 }
622
623
624 promise.tryFailure(cause);
625 closeIfClosed();
626 }
627
628 private void finishConnect() {
629
630
631
632 assert eventLoop().inEventLoop();
633
634 boolean connectStillInProgress = false;
635 try {
636 boolean wasActive = isActive();
637 if (!doFinishConnect()) {
638 connectStillInProgress = true;
639 return;
640 }
641 fulfillConnectPromise(connectPromise, wasActive);
642 } catch (Throwable t) {
643 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
644 } finally {
645 if (!connectStillInProgress) {
646
647
648 if (connectTimeoutFuture != null) {
649 connectTimeoutFuture.cancel(false);
650 }
651 connectPromise = null;
652 }
653 }
654 }
655
656 private boolean doFinishConnect() throws Exception {
657 if (socket.finishConnect()) {
658 writeFilter(false);
659 if (requestedRemoteAddress instanceof InetSocketAddress) {
660 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
661 }
662 requestedRemoteAddress = null;
663 return true;
664 }
665 writeFilter(true);
666 return false;
667 }
668 }
669
670 @Override
671 protected void doBind(SocketAddress local) throws Exception {
672 if (local instanceof InetSocketAddress) {
673 checkResolvable((InetSocketAddress) local);
674 }
675 socket.bind(local);
676 this.local = socket.localAddress();
677 }
678
679
680
681
682 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
683 if (localAddress instanceof InetSocketAddress) {
684 checkResolvable((InetSocketAddress) localAddress);
685 }
686
687 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
688 ? (InetSocketAddress) remoteAddress : null;
689 if (remoteSocketAddr != null) {
690 checkResolvable(remoteSocketAddr);
691 }
692
693 if (remote != null) {
694
695
696
697 throw new AlreadyConnectedException();
698 }
699
700 if (localAddress != null) {
701 socket.bind(localAddress);
702 }
703
704 boolean connected = doConnect0(remoteAddress, localAddress);
705 if (connected) {
706 remote = remoteSocketAddr == null?
707 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
708 }
709
710
711
712 local = socket.localAddress();
713 return connected;
714 }
715
716 protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
717 boolean success = false;
718 try {
719 boolean connected = socket.connect(remoteAddress);
720 if (!connected) {
721 writeFilter(true);
722 }
723 success = true;
724 return connected;
725 } finally {
726 if (!success) {
727 doClose();
728 }
729 }
730 }
731
732 @Override
733 protected SocketAddress localAddress0() {
734 return local;
735 }
736
737 @Override
738 protected SocketAddress remoteAddress0() {
739 return remote;
740 }
741 }