1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.unix.DomainSocketAddress;
23 import io.netty.channel.unix.DomainSocketChannel;
24 import io.netty.channel.unix.FileDescriptor;
25 import io.netty.channel.unix.PeerCredentials;
26 import io.netty.util.internal.UnstableApi;
27
28 import java.io.IOException;
29 import java.net.SocketAddress;
30
31 import static io.netty.channel.kqueue.BsdSocket.newSocketDomain;
32
33 @UnstableApi
34 public final class KQueueDomainSocketChannel extends AbstractKQueueStreamChannel implements DomainSocketChannel {
35 private final KQueueDomainSocketChannelConfig config = new KQueueDomainSocketChannelConfig(this);
36
37 private volatile DomainSocketAddress local;
38 private volatile DomainSocketAddress remote;
39
40 public KQueueDomainSocketChannel() {
41 super(null, newSocketDomain(), false);
42 }
43
44 public KQueueDomainSocketChannel(int fd) {
45 this(null, new BsdSocket(fd));
46 }
47
48 KQueueDomainSocketChannel(Channel parent, BsdSocket fd) {
49 super(parent, fd, true);
50 }
51
52 @Override
53 protected AbstractKQueueUnsafe newUnsafe() {
54 return new KQueueDomainUnsafe();
55 }
56
57 @Override
58 protected DomainSocketAddress localAddress0() {
59 return local;
60 }
61
62 @Override
63 protected DomainSocketAddress remoteAddress0() {
64 return remote;
65 }
66
67 @Override
68 protected void doBind(SocketAddress localAddress) throws Exception {
69 socket.bind(localAddress);
70 local = (DomainSocketAddress) localAddress;
71 }
72
73 @Override
74 public KQueueDomainSocketChannelConfig config() {
75 return config;
76 }
77
78 @Override
79 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
80 if (super.doConnect(remoteAddress, localAddress)) {
81 local = (DomainSocketAddress) localAddress;
82 remote = (DomainSocketAddress) remoteAddress;
83 return true;
84 }
85 return false;
86 }
87
88 @Override
89 public DomainSocketAddress remoteAddress() {
90 return (DomainSocketAddress) super.remoteAddress();
91 }
92
93 @Override
94 public DomainSocketAddress localAddress() {
95 return (DomainSocketAddress) super.localAddress();
96 }
97
98 @Override
99 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
100 Object msg = in.current();
101 if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
102
103 in.remove();
104 return 1;
105 }
106 return super.doWriteSingle(in);
107 }
108
109 @Override
110 protected Object filterOutboundMessage(Object msg) {
111 if (msg instanceof FileDescriptor) {
112 return msg;
113 }
114 return super.filterOutboundMessage(msg);
115 }
116
117
118
119
120
121 @UnstableApi
122 public PeerCredentials peerCredentials() throws IOException {
123 return socket.getPeerCredentials();
124 }
125
126 private final class KQueueDomainUnsafe extends KQueueStreamUnsafe {
127 @Override
128 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
129 switch (config().getReadMode()) {
130 case BYTES:
131 super.readReady(allocHandle);
132 break;
133 case FILE_DESCRIPTORS:
134 readReadyFd();
135 break;
136 default:
137 throw new Error();
138 }
139 }
140
141 private void readReadyFd() {
142 if (socket.isInputShutdown()) {
143 super.clearReadFilter0();
144 return;
145 }
146 final ChannelConfig config = config();
147 final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
148
149 final ChannelPipeline pipeline = pipeline();
150 allocHandle.reset(config);
151 readReadyBefore();
152
153 try {
154 readLoop: do {
155
156
157
158 int recvFd = socket.recvFd();
159 switch(recvFd) {
160 case 0:
161 allocHandle.lastBytesRead(0);
162 break readLoop;
163 case -1:
164 allocHandle.lastBytesRead(-1);
165 close(voidPromise());
166 return;
167 default:
168 allocHandle.lastBytesRead(1);
169 allocHandle.incMessagesRead(1);
170 readPending = false;
171 pipeline.fireChannelRead(new FileDescriptor(recvFd));
172 break;
173 }
174 } while (allocHandle.continueReading());
175
176 allocHandle.readComplete();
177 pipeline.fireChannelReadComplete();
178 } catch (Throwable t) {
179 allocHandle.readComplete();
180 pipeline.fireChannelReadComplete();
181 pipeline.fireExceptionCaught(t);
182 } finally {
183 readReadyFinally(config);
184 }
185 }
186 }
187 }