1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.DefaultAddressedEnvelope;
23 import io.netty.channel.unix.DomainDatagramChannel;
24 import io.netty.channel.unix.DomainDatagramChannelConfig;
25 import io.netty.channel.unix.DomainDatagramPacket;
26 import io.netty.channel.unix.DomainDatagramSocketAddress;
27 import io.netty.channel.unix.DomainSocketAddress;
28 import io.netty.channel.unix.IovArray;
29 import io.netty.channel.unix.PeerCredentials;
30 import io.netty.channel.unix.UnixChannelUtil;
31 import io.netty.util.CharsetUtil;
32 import io.netty.util.UncheckedBooleanSupplier;
33 import io.netty.util.internal.StringUtil;
34
35 import java.io.IOException;
36 import java.net.SocketAddress;
37 import java.nio.ByteBuffer;
38
39 import static io.netty.channel.kqueue.BsdSocket.newSocketDomainDgram;
40
41 public final class KQueueDomainDatagramChannel extends AbstractKQueueDatagramChannel implements DomainDatagramChannel {
42
43 private static final String EXPECTED_TYPES =
44 " (expected: " +
45 StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
46 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
47 StringUtil.simpleClassName(ByteBuf.class) + ", " +
48 StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
49 StringUtil.simpleClassName(ByteBuf.class) + ')';
50
51 private volatile boolean connected;
52 private volatile DomainSocketAddress local;
53 private volatile DomainSocketAddress remote;
54
55 private final KQueueDomainDatagramChannelConfig config;
56
57 public KQueueDomainDatagramChannel() {
58 this(newSocketDomainDgram(), false);
59 }
60
61 public KQueueDomainDatagramChannel(int fd) {
62 this(new BsdSocket(fd), true);
63 }
64
65 private KQueueDomainDatagramChannel(BsdSocket socket, boolean active) {
66 super(null, socket, active);
67 config = new KQueueDomainDatagramChannelConfig(this);
68 }
69
70 @Override
71 public KQueueDomainDatagramChannelConfig config() {
72 return config;
73 }
74
75 @Override
76 protected void doBind(SocketAddress localAddress) throws Exception {
77 super.doBind(localAddress);
78 local = (DomainSocketAddress) localAddress;
79 active = true;
80 }
81
82 @Override
83 protected void doClose() throws Exception {
84 super.doClose();
85 connected = active = false;
86 local = null;
87 remote = null;
88 }
89
90 @Override
91 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
92 if (super.doConnect(remoteAddress, localAddress)) {
93 if (localAddress != null) {
94 local = (DomainSocketAddress) localAddress;
95 }
96 remote = (DomainSocketAddress) remoteAddress;
97 connected = true;
98 return true;
99 }
100 return false;
101 }
102
103 @Override
104 protected void doDisconnect() throws Exception {
105 doClose();
106 }
107
108 @Override
109 protected boolean doWriteMessage(Object msg) throws Exception {
110 final ByteBuf data;
111 DomainSocketAddress remoteAddress;
112 if (msg instanceof AddressedEnvelope) {
113 @SuppressWarnings("unchecked")
114 AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
115 (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
116 data = envelope.content();
117 remoteAddress = envelope.recipient();
118 } else {
119 data = (ByteBuf) msg;
120 remoteAddress = null;
121 }
122
123 final int dataLen = data.readableBytes();
124 if (dataLen == 0) {
125 return true;
126 }
127
128 final long writtenBytes;
129 if (data.hasMemoryAddress()) {
130 long memoryAddress = data.memoryAddress();
131 if (remoteAddress == null) {
132 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
133 } else {
134 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
135 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
136 }
137 } else if (data.nioBufferCount() > 1) {
138 IovArray array = ((NativeArrays) registration().attachment()).cleanIovArray();
139 array.add(data, data.readerIndex(), data.readableBytes());
140 int cnt = array.count();
141 assert cnt != 0;
142
143 if (remoteAddress == null) {
144 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
145 } else {
146 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
147 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
148 }
149 } else {
150 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
151 if (remoteAddress == null) {
152 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
153 } else {
154 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
155 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
156 }
157 }
158
159 return writtenBytes > 0;
160 }
161
162 @Override
163 protected Object filterOutboundMessage(Object msg) {
164 if (msg instanceof DomainDatagramPacket) {
165 DomainDatagramPacket packet = (DomainDatagramPacket) msg;
166 ByteBuf content = packet.content();
167 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
168 new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
169 }
170
171 if (msg instanceof ByteBuf) {
172 ByteBuf buf = (ByteBuf) msg;
173 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
174 }
175
176 if (msg instanceof AddressedEnvelope) {
177 @SuppressWarnings("unchecked")
178 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
179 if (e.content() instanceof ByteBuf &&
180 (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
181
182 ByteBuf content = (ByteBuf) e.content();
183 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
184 new DefaultAddressedEnvelope<ByteBuf, DomainSocketAddress>(
185 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
186 }
187 }
188
189 throw new UnsupportedOperationException(
190 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
191 }
192
193 @Override
194 public boolean isActive() {
195 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
196 }
197
198 @Override
199 public boolean isConnected() {
200 return connected;
201 }
202
203 @Override
204 public DomainSocketAddress localAddress() {
205 return (DomainSocketAddress) super.localAddress();
206 }
207
208 @Override
209 protected DomainSocketAddress localAddress0() {
210 return local;
211 }
212
213 @Override
214 protected AbstractKQueueUnsafe newUnsafe() {
215 return new KQueueDomainDatagramChannelUnsafe();
216 }
217
218
219
220
221
222 public PeerCredentials peerCredentials() throws IOException {
223 return socket.getPeerCredentials();
224 }
225
226 @Override
227 public DomainSocketAddress remoteAddress() {
228 return (DomainSocketAddress) super.remoteAddress();
229 }
230
231 @Override
232 protected DomainSocketAddress remoteAddress0() {
233 return remote;
234 }
235
236 final class KQueueDomainDatagramChannelUnsafe extends AbstractKQueueUnsafe {
237
238 @Override
239 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
240 assert eventLoop().inEventLoop();
241 final DomainDatagramChannelConfig config = config();
242 if (shouldBreakReadReady(config)) {
243 clearReadFilter0();
244 return;
245 }
246 final ChannelPipeline pipeline = pipeline();
247 final ByteBufAllocator allocator = config.getAllocator();
248 allocHandle.reset(config);
249
250 Throwable exception = null;
251 try {
252 ByteBuf byteBuf = null;
253 try {
254 boolean connected = isConnected();
255 do {
256 byteBuf = allocHandle.allocate(allocator);
257 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
258
259 final DomainDatagramPacket packet;
260 if (connected) {
261 allocHandle.lastBytesRead(doReadBytes(byteBuf));
262 if (allocHandle.lastBytesRead() <= 0) {
263
264 byteBuf.release();
265 break;
266 }
267 packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
268 (DomainSocketAddress) remoteAddress());
269 } else {
270 final DomainDatagramSocketAddress remoteAddress;
271 if (byteBuf.hasMemoryAddress()) {
272
273 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
274 byteBuf.writerIndex(), byteBuf.capacity());
275 } else {
276 ByteBuffer nioData = byteBuf.internalNioBuffer(
277 byteBuf.writerIndex(), byteBuf.writableBytes());
278 remoteAddress =
279 socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
280 }
281
282 if (remoteAddress == null) {
283 allocHandle.lastBytesRead(-1);
284 byteBuf.release();
285 break;
286 }
287 DomainSocketAddress localAddress = remoteAddress.localAddress();
288 if (localAddress == null) {
289 localAddress = (DomainSocketAddress) localAddress();
290 }
291 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
292 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
293
294 packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
295 }
296
297 allocHandle.incMessagesRead(1);
298
299 readPending = false;
300 pipeline.fireChannelRead(packet);
301
302 byteBuf = null;
303
304
305
306 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
307 } catch (Throwable t) {
308 if (byteBuf != null) {
309 byteBuf.release();
310 }
311 exception = t;
312 }
313
314 allocHandle.readComplete();
315 pipeline.fireChannelReadComplete();
316
317 if (exception != null) {
318 pipeline.fireExceptionCaught(exception);
319 }
320 } finally {
321 if (shouldStopReading(config)) {
322 clearReadFilter0();
323 }
324 }
325 }
326 }
327 }