1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ServerChannel;
24
25 import java.net.InetSocketAddress;
26 import java.net.SocketAddress;
27
28 public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel implements ServerChannel {
29 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
30
31 AbstractKQueueServerChannel(BsdSocket fd) {
32 this(fd, isSoErrorZero(fd));
33 }
34
35 AbstractKQueueServerChannel(BsdSocket fd, boolean active) {
36 super(null, fd, active);
37 }
38
39 @Override
40 public ChannelMetadata metadata() {
41 return METADATA;
42 }
43
44 @Override
45 protected InetSocketAddress remoteAddress0() {
46 return null;
47 }
48
49 @Override
50 protected AbstractKQueueUnsafe newUnsafe() {
51 return new KQueueServerSocketUnsafe();
52 }
53
54 @Override
55 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
56 throw new UnsupportedOperationException();
57 }
58
59 @Override
60 protected Object filterOutboundMessage(Object msg) throws Exception {
61 throw new UnsupportedOperationException();
62 }
63
64 abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
65
66 @Override
67 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
68 throw new UnsupportedOperationException();
69 }
70
71 final class KQueueServerSocketUnsafe extends AbstractKQueueUnsafe {
72
73
74
75 private final byte[] acceptedAddress = new byte[26];
76
77 @Override
78 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
79 assert eventLoop().inEventLoop();
80 final ChannelConfig config = config();
81 if (shouldBreakReadReady(config)) {
82 clearReadFilter0();
83 return;
84 }
85 final ChannelPipeline pipeline = pipeline();
86 allocHandle.reset(config);
87 allocHandle.attemptedBytesRead(1);
88
89 Throwable exception = null;
90 try {
91 try {
92 do {
93 int acceptFd = socket.accept(acceptedAddress);
94 if (acceptFd == -1) {
95
96 allocHandle.lastBytesRead(-1);
97 break;
98 }
99 allocHandle.lastBytesRead(1);
100 allocHandle.incMessagesRead(1);
101
102 readPending = false;
103 pipeline.fireChannelRead(newChildChannel(acceptFd, acceptedAddress, 1,
104 acceptedAddress[0]));
105 } while (allocHandle.continueReading());
106 } catch (Throwable t) {
107 exception = t;
108 }
109 allocHandle.readComplete();
110 pipeline.fireChannelReadComplete();
111
112 if (exception != null) {
113 pipeline.fireExceptionCaught(exception);
114 }
115 } finally {
116 if (shouldStopReading(config)) {
117 clearReadFilter0();
118 }
119 }
120 }
121 }
122 }