1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.DefaultAddressedEnvelope;
23 import io.netty.channel.unix.DomainDatagramChannel;
24 import io.netty.channel.unix.DomainDatagramChannelConfig;
25 import io.netty.channel.unix.DomainDatagramPacket;
26 import io.netty.channel.unix.DomainDatagramSocketAddress;
27 import io.netty.channel.unix.DomainSocketAddress;
28 import io.netty.channel.unix.IovArray;
29 import io.netty.channel.unix.PeerCredentials;
30 import io.netty.channel.unix.UnixChannelUtil;
31 import io.netty.util.CharsetUtil;
32 import io.netty.util.UncheckedBooleanSupplier;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.UnstableApi;
35
36 import java.io.IOException;
37 import java.net.SocketAddress;
38 import java.nio.ByteBuffer;
39
40 import static io.netty.channel.kqueue.BsdSocket.newSocketDomainDgram;
41
42 @UnstableApi
43 public final class KQueueDomainDatagramChannel extends AbstractKQueueDatagramChannel implements DomainDatagramChannel {
44
45 private static final String EXPECTED_TYPES =
46 " (expected: " +
47 StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
48 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
49 StringUtil.simpleClassName(ByteBuf.class) + ", " +
50 StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
51 StringUtil.simpleClassName(ByteBuf.class) + ')';
52
53 private volatile boolean connected;
54 private volatile DomainSocketAddress local;
55 private volatile DomainSocketAddress remote;
56
57 private final KQueueDomainDatagramChannelConfig config;
58
59 public KQueueDomainDatagramChannel() {
60 this(newSocketDomainDgram(), false);
61 }
62
63 public KQueueDomainDatagramChannel(int fd) {
64 this(new BsdSocket(fd), true);
65 }
66
67 private KQueueDomainDatagramChannel(BsdSocket socket, boolean active) {
68 super(null, socket, active);
69 config = new KQueueDomainDatagramChannelConfig(this);
70 }
71
72 @Override
73 public KQueueDomainDatagramChannelConfig config() {
74 return config;
75 }
76
77 @Override
78 protected void doBind(SocketAddress localAddress) throws Exception {
79 super.doBind(localAddress);
80 local = (DomainSocketAddress) localAddress;
81 active = true;
82 }
83
84 @Override
85 protected void doClose() throws Exception {
86 super.doClose();
87 connected = active = false;
88 local = null;
89 remote = null;
90 }
91
92 @Override
93 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
94 if (super.doConnect(remoteAddress, localAddress)) {
95 if (localAddress != null) {
96 local = (DomainSocketAddress) localAddress;
97 }
98 remote = (DomainSocketAddress) remoteAddress;
99 connected = true;
100 return true;
101 }
102 return false;
103 }
104
105 @Override
106 protected void doDisconnect() throws Exception {
107 doClose();
108 }
109
110 @Override
111 protected boolean doWriteMessage(Object msg) throws Exception {
112 final ByteBuf data;
113 DomainSocketAddress remoteAddress;
114 if (msg instanceof AddressedEnvelope) {
115 @SuppressWarnings("unchecked")
116 AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
117 (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
118 data = envelope.content();
119 remoteAddress = envelope.recipient();
120 } else {
121 data = (ByteBuf) msg;
122 remoteAddress = null;
123 }
124
125 final int dataLen = data.readableBytes();
126 if (dataLen == 0) {
127 return true;
128 }
129
130 final long writtenBytes;
131 if (data.hasMemoryAddress()) {
132 long memoryAddress = data.memoryAddress();
133 if (remoteAddress == null) {
134 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
135 } else {
136 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
137 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
138 }
139 } else if (data.nioBufferCount() > 1) {
140 IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
141 array.add(data, data.readerIndex(), data.readableBytes());
142 int cnt = array.count();
143 assert cnt != 0;
144
145 if (remoteAddress == null) {
146 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
147 } else {
148 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
149 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
150 }
151 } else {
152 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
153 if (remoteAddress == null) {
154 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
155 } else {
156 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
157 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
158 }
159 }
160
161 return writtenBytes > 0;
162 }
163
164 @Override
165 protected Object filterOutboundMessage(Object msg) {
166 if (msg instanceof DomainDatagramPacket) {
167 DomainDatagramPacket packet = (DomainDatagramPacket) msg;
168 ByteBuf content = packet.content();
169 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
170 new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
171 }
172
173 if (msg instanceof ByteBuf) {
174 ByteBuf buf = (ByteBuf) msg;
175 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
176 }
177
178 if (msg instanceof AddressedEnvelope) {
179 @SuppressWarnings("unchecked")
180 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
181 if (e.content() instanceof ByteBuf &&
182 (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
183
184 ByteBuf content = (ByteBuf) e.content();
185 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
186 new DefaultAddressedEnvelope<ByteBuf, DomainSocketAddress>(
187 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
188 }
189 }
190
191 throw new UnsupportedOperationException(
192 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
193 }
194
195 @Override
196 public boolean isActive() {
197 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
198 }
199
200 @Override
201 public boolean isConnected() {
202 return connected;
203 }
204
205 @Override
206 public DomainSocketAddress localAddress() {
207 return (DomainSocketAddress) super.localAddress();
208 }
209
210 @Override
211 protected DomainSocketAddress localAddress0() {
212 return local;
213 }
214
215 @Override
216 protected AbstractKQueueUnsafe newUnsafe() {
217 return new KQueueDomainDatagramChannelUnsafe();
218 }
219
220
221
222
223
224 public PeerCredentials peerCredentials() throws IOException {
225 return socket.getPeerCredentials();
226 }
227
228 @Override
229 public DomainSocketAddress remoteAddress() {
230 return (DomainSocketAddress) super.remoteAddress();
231 }
232
233 @Override
234 protected DomainSocketAddress remoteAddress0() {
235 return remote;
236 }
237
238 final class KQueueDomainDatagramChannelUnsafe extends AbstractKQueueUnsafe {
239
240 @Override
241 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
242 assert eventLoop().inEventLoop();
243 final DomainDatagramChannelConfig config = config();
244 if (shouldBreakReadReady(config)) {
245 clearReadFilter0();
246 return;
247 }
248 final ChannelPipeline pipeline = pipeline();
249 final ByteBufAllocator allocator = config.getAllocator();
250 allocHandle.reset(config);
251 readReadyBefore();
252
253 Throwable exception = null;
254 try {
255 ByteBuf byteBuf = null;
256 try {
257 boolean connected = isConnected();
258 do {
259 byteBuf = allocHandle.allocate(allocator);
260 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
261
262 final DomainDatagramPacket packet;
263 if (connected) {
264 allocHandle.lastBytesRead(doReadBytes(byteBuf));
265 if (allocHandle.lastBytesRead() <= 0) {
266
267 byteBuf.release();
268 break;
269 }
270 packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
271 (DomainSocketAddress) remoteAddress());
272 } else {
273 final DomainDatagramSocketAddress remoteAddress;
274 if (byteBuf.hasMemoryAddress()) {
275
276 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
277 byteBuf.writerIndex(), byteBuf.capacity());
278 } else {
279 ByteBuffer nioData = byteBuf.internalNioBuffer(
280 byteBuf.writerIndex(), byteBuf.writableBytes());
281 remoteAddress =
282 socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
283 }
284
285 if (remoteAddress == null) {
286 allocHandle.lastBytesRead(-1);
287 byteBuf.release();
288 break;
289 }
290 DomainSocketAddress localAddress = remoteAddress.localAddress();
291 if (localAddress == null) {
292 localAddress = (DomainSocketAddress) localAddress();
293 }
294 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
295 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
296
297 packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
298 }
299
300 allocHandle.incMessagesRead(1);
301
302 readPending = false;
303 pipeline.fireChannelRead(packet);
304
305 byteBuf = null;
306
307
308
309 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
310 } catch (Throwable t) {
311 if (byteBuf != null) {
312 byteBuf.release();
313 }
314 exception = t;
315 }
316
317 allocHandle.readComplete();
318 pipeline.fireChannelReadComplete();
319
320 if (exception != null) {
321 pipeline.fireExceptionCaught(exception);
322 }
323 } finally {
324 readReadyFinally(config);
325 }
326 }
327 }
328 }