1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ServerChannel;
24
25 import java.net.InetSocketAddress;
26 import java.net.SocketAddress;
27
28 public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel implements ServerChannel {
29 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
30
31 AbstractKQueueServerChannel(BsdSocket fd) {
32 this(fd, isSoErrorZero(fd));
33 }
34
35 AbstractKQueueServerChannel(BsdSocket fd, boolean active) {
36 super(null, fd, active);
37 }
38
39 @Override
40 public ChannelMetadata metadata() {
41 return METADATA;
42 }
43
44 @Override
45 protected InetSocketAddress remoteAddress0() {
46 return null;
47 }
48
49 @Override
50 protected AbstractKQueueUnsafe newUnsafe() {
51 return new KQueueServerSocketUnsafe();
52 }
53
54 @Override
55 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
56 throw new UnsupportedOperationException();
57 }
58
59 @Override
60 protected Object filterOutboundMessage(Object msg) throws Exception {
61 throw new UnsupportedOperationException();
62 }
63
64 abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
65
66 @Override
67 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
68 throw new UnsupportedOperationException();
69 }
70
71 final class KQueueServerSocketUnsafe extends AbstractKQueueUnsafe {
72
73
74 private final byte[] acceptedAddress = new byte[25];
75
76 @Override
77 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
78 assert eventLoop().inEventLoop();
79 final ChannelConfig config = config();
80 if (shouldBreakReadReady(config)) {
81 clearReadFilter0();
82 return;
83 }
84 final ChannelPipeline pipeline = pipeline();
85 allocHandle.reset(config);
86 allocHandle.attemptedBytesRead(1);
87
88 Throwable exception = null;
89 try {
90 try {
91 do {
92 int acceptFd = socket.accept(acceptedAddress);
93 if (acceptFd == -1) {
94
95 allocHandle.lastBytesRead(-1);
96 break;
97 }
98 allocHandle.lastBytesRead(1);
99 allocHandle.incMessagesRead(1);
100
101 readPending = false;
102 pipeline.fireChannelRead(newChildChannel(acceptFd, acceptedAddress, 1,
103 acceptedAddress[0]));
104 } while (allocHandle.continueReading());
105 } catch (Throwable t) {
106 exception = t;
107 }
108 allocHandle.readComplete();
109 pipeline.fireChannelReadComplete();
110
111 if (exception != null) {
112 pipeline.fireExceptionCaught(exception);
113 }
114 } finally {
115 if (shouldStopReading(config)) {
116 clearReadFilter0();
117 }
118 }
119 }
120 }
121 }