1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.EventLoop;
24 import io.netty.channel.ServerChannel;
25
26 import java.net.InetSocketAddress;
27 import java.net.SocketAddress;
28
29 public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel implements ServerChannel {
30 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
31
32 AbstractKQueueServerChannel(BsdSocket fd) {
33 this(fd, isSoErrorZero(fd));
34 }
35
36 AbstractKQueueServerChannel(BsdSocket fd, boolean active) {
37 super(null, fd, active);
38 }
39
40 @Override
41 public ChannelMetadata metadata() {
42 return METADATA;
43 }
44
45 @Override
46 protected boolean isCompatible(EventLoop loop) {
47 return loop instanceof KQueueEventLoop;
48 }
49
50 @Override
51 protected InetSocketAddress remoteAddress0() {
52 return null;
53 }
54
55 @Override
56 protected AbstractKQueueUnsafe newUnsafe() {
57 return new KQueueServerSocketUnsafe();
58 }
59
60 @Override
61 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
62 throw new UnsupportedOperationException();
63 }
64
65 @Override
66 protected Object filterOutboundMessage(Object msg) throws Exception {
67 throw new UnsupportedOperationException();
68 }
69
70 abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
71
72 @Override
73 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
74 throw new UnsupportedOperationException();
75 }
76
77 final class KQueueServerSocketUnsafe extends AbstractKQueueUnsafe {
78
79
80
81 private final byte[] acceptedAddress = new byte[26];
82
83 @Override
84 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
85 assert eventLoop().inEventLoop();
86 final ChannelConfig config = config();
87 if (shouldBreakReadReady(config)) {
88 clearReadFilter0();
89 return;
90 }
91 final ChannelPipeline pipeline = pipeline();
92 allocHandle.reset(config);
93 allocHandle.attemptedBytesRead(1);
94 readReadyBefore();
95
96 Throwable exception = null;
97 try {
98 try {
99 do {
100 int acceptFd = socket.accept(acceptedAddress);
101 if (acceptFd == -1) {
102
103 allocHandle.lastBytesRead(-1);
104 break;
105 }
106 allocHandle.lastBytesRead(1);
107 allocHandle.incMessagesRead(1);
108
109 readPending = false;
110 pipeline.fireChannelRead(newChildChannel(acceptFd, acceptedAddress, 1,
111 acceptedAddress[0]));
112 } while (allocHandle.continueReading());
113 } catch (Throwable t) {
114 exception = t;
115 }
116 allocHandle.readComplete();
117 pipeline.fireChannelReadComplete();
118
119 if (exception != null) {
120 pipeline.fireExceptionCaught(exception);
121 }
122 } finally {
123 readReadyFinally(config);
124 }
125 }
126 }
127 }