1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.DefaultAddressedEnvelope;
23 import io.netty.channel.unix.DomainDatagramChannel;
24 import io.netty.channel.unix.DomainDatagramChannelConfig;
25 import io.netty.channel.unix.DomainDatagramPacket;
26 import io.netty.channel.unix.DomainDatagramSocketAddress;
27 import io.netty.channel.unix.DomainSocketAddress;
28 import io.netty.channel.unix.IovArray;
29 import io.netty.channel.unix.PeerCredentials;
30 import io.netty.channel.unix.UnixChannelUtil;
31 import io.netty.util.CharsetUtil;
32 import io.netty.util.UncheckedBooleanSupplier;
33 import io.netty.util.internal.StringUtil;
34
35 import java.io.IOException;
36 import java.net.SocketAddress;
37 import java.nio.ByteBuffer;
38
39 import static io.netty.channel.kqueue.BsdSocket.newSocketDomainDgram;
40
41 public final class KQueueDomainDatagramChannel extends AbstractKQueueDatagramChannel implements DomainDatagramChannel {
42
43 private static final String EXPECTED_TYPES =
44 " (expected: " +
45 StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
46 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
47 StringUtil.simpleClassName(ByteBuf.class) + ", " +
48 StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
49 StringUtil.simpleClassName(ByteBuf.class) + ')';
50
51 private volatile boolean connected;
52 private volatile DomainSocketAddress local;
53 private volatile DomainSocketAddress remote;
54
55 private final KQueueDomainDatagramChannelConfig config;
56
57 public KQueueDomainDatagramChannel() {
58 this(newSocketDomainDgram(), false);
59 }
60
61 public KQueueDomainDatagramChannel(int fd) {
62 this(new BsdSocket(fd), true);
63 }
64
65 private KQueueDomainDatagramChannel(BsdSocket socket, boolean active) {
66 super(null, socket, active);
67 config = new KQueueDomainDatagramChannelConfig(this);
68 }
69
70 @Override
71 public KQueueDomainDatagramChannelConfig config() {
72 return config;
73 }
74
75 @Override
76 protected void doBind(SocketAddress localAddress) throws Exception {
77 super.doBind(localAddress);
78 local = (DomainSocketAddress) localAddress;
79 active = true;
80 }
81
82 @Override
83 protected void doClose() throws Exception {
84 try {
85 super.doClose();
86 } finally {
87 connected = active = false;
88 local = null;
89 remote = null;
90 }
91 }
92
93 @Override
94 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
95 if (super.doConnect(remoteAddress, localAddress)) {
96 if (localAddress != null) {
97 local = (DomainSocketAddress) localAddress;
98 }
99 remote = (DomainSocketAddress) remoteAddress;
100 connected = true;
101 return true;
102 }
103 return false;
104 }
105
106 @Override
107 protected void doDisconnect() throws Exception {
108 doClose();
109 }
110
111 @Override
112 protected boolean doWriteMessage(Object msg) throws Exception {
113 final ByteBuf data;
114 DomainSocketAddress remoteAddress;
115 if (msg instanceof AddressedEnvelope) {
116 @SuppressWarnings("unchecked")
117 AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
118 (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
119 data = envelope.content();
120 remoteAddress = envelope.recipient();
121 } else {
122 data = (ByteBuf) msg;
123 remoteAddress = null;
124 }
125
126 final int dataLen = data.readableBytes();
127 if (dataLen == 0) {
128 return true;
129 }
130
131 final long writtenBytes;
132 if (data.hasMemoryAddress()) {
133 long memoryAddress = data.memoryAddress();
134 if (remoteAddress == null) {
135 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
136 } else {
137 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
138 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
139 }
140 } else if (data.nioBufferCount() > 1) {
141 IovArray array = ((NativeArrays) registration().attachment()).cleanIovArray();
142 array.add(data, data.readerIndex(), data.readableBytes());
143 int cnt = array.count();
144 assert cnt != 0;
145
146 if (remoteAddress == null) {
147 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
148 } else {
149 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
150 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
151 }
152 } else {
153 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
154 if (remoteAddress == null) {
155 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
156 } else {
157 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
158 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
159 }
160 }
161
162 return writtenBytes > 0;
163 }
164
165 @Override
166 protected Object filterOutboundMessage(Object msg) {
167 if (msg instanceof DomainDatagramPacket) {
168 DomainDatagramPacket packet = (DomainDatagramPacket) msg;
169 ByteBuf content = packet.content();
170 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
171 new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
172 }
173
174 if (msg instanceof ByteBuf) {
175 ByteBuf buf = (ByteBuf) msg;
176 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
177 }
178
179 if (msg instanceof AddressedEnvelope) {
180 @SuppressWarnings("unchecked")
181 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
182 if (e.content() instanceof ByteBuf &&
183 (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
184
185 ByteBuf content = (ByteBuf) e.content();
186 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
187 new DefaultAddressedEnvelope<ByteBuf, DomainSocketAddress>(
188 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
189 }
190 }
191
192 throw new UnsupportedOperationException(
193 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
194 }
195
196 @Override
197 public boolean isActive() {
198 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
199 }
200
201 @Override
202 public boolean isConnected() {
203 return connected;
204 }
205
206 @Override
207 public DomainSocketAddress localAddress() {
208 return (DomainSocketAddress) super.localAddress();
209 }
210
211 @Override
212 protected DomainSocketAddress localAddress0() {
213 return local;
214 }
215
216 @Override
217 protected AbstractKQueueUnsafe newUnsafe() {
218 return new KQueueDomainDatagramChannelUnsafe();
219 }
220
221
222
223
224
225 public PeerCredentials peerCredentials() throws IOException {
226 return socket.getPeerCredentials();
227 }
228
229 @Override
230 public DomainSocketAddress remoteAddress() {
231 return (DomainSocketAddress) super.remoteAddress();
232 }
233
234 @Override
235 protected DomainSocketAddress remoteAddress0() {
236 return remote;
237 }
238
239 final class KQueueDomainDatagramChannelUnsafe extends AbstractKQueueUnsafe {
240
241 @Override
242 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
243 assert eventLoop().inEventLoop();
244 final DomainDatagramChannelConfig config = config();
245 if (shouldBreakReadReady(config)) {
246 clearReadFilter0();
247 return;
248 }
249 final ChannelPipeline pipeline = pipeline();
250 final ByteBufAllocator allocator = config.getAllocator();
251 allocHandle.reset(config);
252
253 Throwable exception = null;
254 try {
255 ByteBuf byteBuf = null;
256 try {
257 boolean connected = isConnected();
258 do {
259 byteBuf = allocHandle.allocate(allocator);
260 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
261
262 final DomainDatagramPacket packet;
263 if (connected) {
264 allocHandle.lastBytesRead(doReadBytes(byteBuf));
265 if (allocHandle.lastBytesRead() <= 0) {
266
267 byteBuf.release();
268 break;
269 }
270 packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
271 (DomainSocketAddress) remoteAddress());
272 } else {
273 final DomainDatagramSocketAddress remoteAddress;
274 if (byteBuf.hasMemoryAddress()) {
275
276 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
277 byteBuf.writerIndex(), byteBuf.capacity());
278 } else {
279 ByteBuffer nioData = byteBuf.internalNioBuffer(
280 byteBuf.writerIndex(), byteBuf.writableBytes());
281 remoteAddress =
282 socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
283 }
284
285 if (remoteAddress == null) {
286 allocHandle.lastBytesRead(-1);
287 byteBuf.release();
288 break;
289 }
290 DomainSocketAddress localAddress = remoteAddress.localAddress();
291 if (localAddress == null) {
292 localAddress = (DomainSocketAddress) localAddress();
293 }
294 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
295 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
296
297 packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
298 }
299
300 allocHandle.incMessagesRead(1);
301
302 readPending = false;
303 pipeline.fireChannelRead(packet);
304
305 byteBuf = null;
306
307
308
309 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
310 } catch (Throwable t) {
311 if (byteBuf != null) {
312 byteBuf.release();
313 }
314 exception = t;
315 }
316
317 allocHandle.readComplete();
318 pipeline.fireChannelReadComplete();
319
320 if (exception != null) {
321 pipeline.fireExceptionCaught(exception);
322 }
323 } finally {
324 if (shouldStopReading(config)) {
325 clearReadFilter0();
326 }
327 }
328 }
329 }
330 }