1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ServerChannel;
24
25 import java.net.InetSocketAddress;
26 import java.net.SocketAddress;
27
28 public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel implements ServerChannel {
29 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
30
31 AbstractKQueueServerChannel(BsdSocket fd) {
32 this(fd, isSoErrorZero(fd));
33 }
34
35 AbstractKQueueServerChannel(BsdSocket fd, boolean active) {
36 super(null, fd, active);
37 }
38
39 @Override
40 public ChannelMetadata metadata() {
41 return METADATA;
42 }
43
44 @Override
45 protected InetSocketAddress remoteAddress0() {
46 return null;
47 }
48
49 @Override
50 protected AbstractKQueueUnsafe newUnsafe() {
51 return new KQueueServerSocketUnsafe();
52 }
53
54 @Override
55 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
56 throw new UnsupportedOperationException();
57 }
58
59 @Override
60 protected Object filterOutboundMessage(Object msg) throws Exception {
61 throw new UnsupportedOperationException();
62 }
63
64 abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
65
66 @Override
67 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
68 throw new UnsupportedOperationException();
69 }
70
71 final class KQueueServerSocketUnsafe extends AbstractKQueueUnsafe {
72
73
74
75 private final byte[] acceptedAddress = new byte[26];
76
77 @Override
78 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
79 assert eventLoop().inEventLoop();
80 final ChannelConfig config = config();
81 if (shouldBreakReadReady(config)) {
82 clearReadFilter0();
83 return;
84 }
85 final ChannelPipeline pipeline = pipeline();
86 allocHandle.reset(config);
87 allocHandle.attemptedBytesRead(1);
88 readReadyBefore();
89
90 Throwable exception = null;
91 try {
92 try {
93 do {
94 int acceptFd = socket.accept(acceptedAddress);
95 if (acceptFd == -1) {
96
97 allocHandle.lastBytesRead(-1);
98 break;
99 }
100 allocHandle.lastBytesRead(1);
101 allocHandle.incMessagesRead(1);
102
103 readPending = false;
104 pipeline.fireChannelRead(newChildChannel(acceptFd, acceptedAddress, 1,
105 acceptedAddress[0]));
106 } while (allocHandle.continueReading());
107 } catch (Throwable t) {
108 exception = t;
109 }
110 allocHandle.readComplete();
111 pipeline.fireChannelReadComplete();
112
113 if (exception != null) {
114 pipeline.fireExceptionCaught(exception);
115 }
116 } finally {
117 readReadyFinally(config);
118 }
119 }
120 }
121 }