1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.unix.DomainSocketAddress;
23 import io.netty.channel.unix.DomainSocketChannel;
24 import io.netty.channel.unix.FileDescriptor;
25 import io.netty.channel.unix.PeerCredentials;
26 import io.netty.util.internal.UnstableApi;
27
28 import java.io.IOException;
29 import java.net.SocketAddress;
30
31 import static io.netty.channel.kqueue.BsdSocket.newSocketDomain;
32
33 public final class KQueueDomainSocketChannel extends AbstractKQueueStreamChannel implements DomainSocketChannel {
34 private final KQueueDomainSocketChannelConfig config = new KQueueDomainSocketChannelConfig(this);
35
36 private volatile DomainSocketAddress local;
37 private volatile DomainSocketAddress remote;
38
39 public KQueueDomainSocketChannel() {
40 super(null, newSocketDomain(), false);
41 }
42
43 public KQueueDomainSocketChannel(int fd) {
44 this(null, new BsdSocket(fd));
45 }
46
47 KQueueDomainSocketChannel(Channel parent, BsdSocket fd) {
48 super(parent, fd, true);
49 local = fd.localDomainSocketAddress();
50 remote = fd.remoteDomainSocketAddress();
51 }
52
53 @Override
54 protected AbstractKQueueUnsafe newUnsafe() {
55 return new KQueueDomainUnsafe();
56 }
57
58 @Override
59 protected DomainSocketAddress localAddress0() {
60 return local;
61 }
62
63 @Override
64 protected DomainSocketAddress remoteAddress0() {
65 return remote;
66 }
67
68 @Override
69 protected void doBind(SocketAddress localAddress) throws Exception {
70 socket.bind(localAddress);
71 local = (DomainSocketAddress) localAddress;
72 }
73
74 @Override
75 public KQueueDomainSocketChannelConfig config() {
76 return config;
77 }
78
79 @Override
80 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
81 if (super.doConnect(remoteAddress, localAddress)) {
82 local = localAddress != null ? (DomainSocketAddress) localAddress : socket.localDomainSocketAddress();
83 remote = (DomainSocketAddress) remoteAddress;
84 return true;
85 }
86 return false;
87 }
88
89 @Override
90 public DomainSocketAddress remoteAddress() {
91 return (DomainSocketAddress) super.remoteAddress();
92 }
93
94 @Override
95 public DomainSocketAddress localAddress() {
96 return (DomainSocketAddress) super.localAddress();
97 }
98
99 @Override
100 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
101 Object msg = in.current();
102 if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
103
104 in.remove();
105 return 1;
106 }
107 return super.doWriteSingle(in);
108 }
109
110 @Override
111 protected Object filterOutboundMessage(Object msg) {
112 if (msg instanceof FileDescriptor) {
113 return msg;
114 }
115 return super.filterOutboundMessage(msg);
116 }
117
118
119
120
121
122 @UnstableApi
123 public PeerCredentials peerCredentials() throws IOException {
124 return socket.getPeerCredentials();
125 }
126
127 private final class KQueueDomainUnsafe extends KQueueStreamUnsafe {
128 @Override
129 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
130 switch (config().getReadMode()) {
131 case BYTES:
132 super.readReady(allocHandle);
133 break;
134 case FILE_DESCRIPTORS:
135 readReadyFd();
136 break;
137 default:
138 throw new Error();
139 }
140 }
141
142 private void readReadyFd() {
143 if (socket.isInputShutdown()) {
144 super.clearReadFilter0();
145 return;
146 }
147 final ChannelConfig config = config();
148 final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
149
150 final ChannelPipeline pipeline = pipeline();
151 allocHandle.reset(config);
152 readReadyBefore();
153
154 try {
155 readLoop: do {
156
157
158
159 int recvFd = socket.recvFd();
160 switch(recvFd) {
161 case 0:
162 allocHandle.lastBytesRead(0);
163 break readLoop;
164 case -1:
165 allocHandle.lastBytesRead(-1);
166 close(voidPromise());
167 return;
168 default:
169 allocHandle.lastBytesRead(1);
170 allocHandle.incMessagesRead(1);
171 readPending = false;
172 pipeline.fireChannelRead(new FileDescriptor(recvFd));
173 break;
174 }
175 } while (allocHandle.continueReading());
176
177 allocHandle.readComplete();
178 pipeline.fireChannelReadComplete();
179 } catch (Throwable t) {
180 allocHandle.readComplete();
181 pipeline.fireChannelReadComplete();
182 pipeline.fireExceptionCaught(t);
183 } finally {
184 readReadyFinally(config);
185 }
186 }
187 }
188 }