1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.EventLoop;
24 import io.netty.channel.ServerChannel;
25
26 import java.net.InetSocketAddress;
27 import java.net.SocketAddress;
28
29 public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel implements ServerChannel {
30 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
31
32 AbstractKQueueServerChannel(BsdSocket fd) {
33 this(fd, isSoErrorZero(fd));
34 }
35
36 AbstractKQueueServerChannel(BsdSocket fd, boolean active) {
37 super(null, fd, active);
38 }
39
40 @Override
41 public ChannelMetadata metadata() {
42 return METADATA;
43 }
44
45 @Override
46 protected boolean isCompatible(EventLoop loop) {
47 return loop instanceof KQueueEventLoop;
48 }
49
50 @Override
51 protected InetSocketAddress remoteAddress0() {
52 return null;
53 }
54
55 @Override
56 protected AbstractKQueueUnsafe newUnsafe() {
57 return new KQueueServerSocketUnsafe();
58 }
59
60 @Override
61 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
62 throw new UnsupportedOperationException();
63 }
64
65 @Override
66 protected Object filterOutboundMessage(Object msg) throws Exception {
67 throw new UnsupportedOperationException();
68 }
69
70 abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
71
72 @Override
73 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
74 throw new UnsupportedOperationException();
75 }
76
77 final class KQueueServerSocketUnsafe extends AbstractKQueueUnsafe {
78
79
80 private final byte[] acceptedAddress = new byte[25];
81
82 @Override
83 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
84 assert eventLoop().inEventLoop();
85 final ChannelConfig config = config();
86 if (shouldBreakReadReady(config)) {
87 clearReadFilter0();
88 return;
89 }
90 final ChannelPipeline pipeline = pipeline();
91 allocHandle.reset(config);
92 allocHandle.attemptedBytesRead(1);
93 readReadyBefore();
94
95 Throwable exception = null;
96 try {
97 try {
98 do {
99 int acceptFd = socket.accept(acceptedAddress);
100 if (acceptFd == -1) {
101
102 allocHandle.lastBytesRead(-1);
103 break;
104 }
105 allocHandle.lastBytesRead(1);
106 allocHandle.incMessagesRead(1);
107
108 readPending = false;
109 pipeline.fireChannelRead(newChildChannel(acceptFd, acceptedAddress, 1,
110 acceptedAddress[0]));
111 } while (allocHandle.continueReading());
112 } catch (Throwable t) {
113 exception = t;
114 }
115 allocHandle.readComplete();
116 pipeline.fireChannelReadComplete();
117
118 if (exception != null) {
119 pipeline.fireExceptionCaught(exception);
120 }
121 } finally {
122 readReadyFinally(config);
123 }
124 }
125 }
126 }