1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOutboundBuffer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.DefaultAddressedEnvelope;
26 import io.netty.channel.unix.DomainDatagramChannel;
27 import io.netty.channel.unix.DomainDatagramChannelConfig;
28 import io.netty.channel.unix.DomainDatagramPacket;
29 import io.netty.channel.unix.DomainDatagramSocketAddress;
30 import io.netty.channel.unix.DomainSocketAddress;
31 import io.netty.channel.unix.IovArray;
32 import io.netty.channel.unix.PeerCredentials;
33 import io.netty.channel.unix.UnixChannelUtil;
34 import io.netty.util.CharsetUtil;
35 import io.netty.util.UncheckedBooleanSupplier;
36 import io.netty.util.internal.StringUtil;
37
38 import java.io.IOException;
39 import java.net.SocketAddress;
40 import java.nio.ByteBuffer;
41
42 import static io.netty.channel.epoll.LinuxSocket.newSocketDomainDgram;
43
44 public final class EpollDomainDatagramChannel extends AbstractEpollChannel implements DomainDatagramChannel {
45
46 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
47
48 private static final String EXPECTED_TYPES =
49 " (expected: " +
50 StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
51 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
52 StringUtil.simpleClassName(ByteBuf.class) + ", " +
53 StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
54 StringUtil.simpleClassName(ByteBuf.class) + ')';
55
56 private volatile boolean connected;
57 private volatile DomainSocketAddress local;
58 private volatile DomainSocketAddress remote;
59
60 private final EpollDomainDatagramChannelConfig config;
61
62 public EpollDomainDatagramChannel() {
63 this(newSocketDomainDgram(), false);
64 }
65
66 public EpollDomainDatagramChannel(int fd) {
67 this(new LinuxSocket(fd), true);
68 }
69
70 private EpollDomainDatagramChannel(LinuxSocket socket, boolean active) {
71 super(null, socket, active, EpollIoOps.valueOf(0));
72 config = new EpollDomainDatagramChannelConfig(this);
73 }
74
75 @Override
76 public EpollDomainDatagramChannelConfig config() {
77 return config;
78 }
79
80 @Override
81 protected void doBind(SocketAddress localAddress) throws Exception {
82 super.doBind(localAddress);
83 local = (DomainSocketAddress) localAddress;
84 active = true;
85 }
86
87 @Override
88 protected void doClose() throws Exception {
89 super.doClose();
90 connected = active = false;
91 local = null;
92 remote = null;
93 }
94
95 @Override
96 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
97 if (super.doConnect(remoteAddress, localAddress)) {
98 if (localAddress != null) {
99 local = (DomainSocketAddress) localAddress;
100 }
101 remote = (DomainSocketAddress) remoteAddress;
102 connected = true;
103 return true;
104 }
105 return false;
106 }
107
108 @Override
109 protected void doDisconnect() throws Exception {
110 doClose();
111 }
112
113 @Override
114 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
115 int maxMessagesPerWrite = maxMessagesPerWrite();
116 while (maxMessagesPerWrite > 0) {
117 Object msg = in.current();
118 if (msg == null) {
119 break;
120 }
121
122 try {
123 boolean done = false;
124 for (int i = config().getWriteSpinCount(); i > 0; --i) {
125 if (doWriteMessage(msg)) {
126 done = true;
127 break;
128 }
129 }
130
131 if (done) {
132 in.remove();
133 maxMessagesPerWrite--;
134 } else {
135 break;
136 }
137 } catch (IOException e) {
138 maxMessagesPerWrite--;
139
140
141
142
143 in.remove(e);
144 }
145 }
146
147 if (in.isEmpty()) {
148
149 clearFlag(Native.EPOLLOUT);
150 } else {
151
152 setFlag(Native.EPOLLOUT);
153 }
154 }
155
156 private boolean doWriteMessage(Object msg) throws Exception {
157 final ByteBuf data;
158 DomainSocketAddress remoteAddress;
159 if (msg instanceof AddressedEnvelope) {
160 @SuppressWarnings("unchecked")
161 AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
162 (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
163 data = envelope.content();
164 remoteAddress = envelope.recipient();
165 } else {
166 data = (ByteBuf) msg;
167 remoteAddress = null;
168 }
169
170 final int dataLen = data.readableBytes();
171 if (dataLen == 0) {
172 return true;
173 }
174
175 final long writtenBytes;
176 if (data.hasMemoryAddress()) {
177 long memoryAddress = data.memoryAddress();
178 if (remoteAddress == null) {
179 writtenBytes = socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
180 } else {
181 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
182 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
183 }
184 } else if (data.nioBufferCount() > 1) {
185 IovArray array = ((NativeArrays) registration().attachment()).cleanIovArray();
186 array.add(data, data.readerIndex(), data.readableBytes());
187 int cnt = array.count();
188 assert cnt != 0;
189
190 if (remoteAddress == null) {
191 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
192 } else {
193 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
194 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
195 }
196 } else {
197 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
198 if (remoteAddress == null) {
199 writtenBytes = socket.send(nioData, nioData.position(), nioData.limit());
200 } else {
201 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
202 remoteAddress.path().getBytes(CharsetUtil.UTF_8));
203 }
204 }
205
206 return writtenBytes > 0;
207 }
208
209 @Override
210 protected Object filterOutboundMessage(Object msg) {
211 if (msg instanceof DomainDatagramPacket) {
212 DomainDatagramPacket packet = (DomainDatagramPacket) msg;
213 ByteBuf content = packet.content();
214 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
215 new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
216 }
217
218 if (msg instanceof ByteBuf) {
219 ByteBuf buf = (ByteBuf) msg;
220 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
221 }
222
223 if (msg instanceof AddressedEnvelope) {
224 @SuppressWarnings("unchecked")
225 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
226 if (e.content() instanceof ByteBuf &&
227 (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
228
229 ByteBuf content = (ByteBuf) e.content();
230 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
231 new DefaultAddressedEnvelope<>(
232 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
233 }
234 }
235
236 throw new UnsupportedOperationException(
237 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
238 }
239
240 @Override
241 public boolean isActive() {
242 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
243 }
244
245 @Override
246 public boolean isConnected() {
247 return connected;
248 }
249
250 @Override
251 public DomainSocketAddress localAddress() {
252 return (DomainSocketAddress) super.localAddress();
253 }
254
255 @Override
256 protected DomainSocketAddress localAddress0() {
257 return local;
258 }
259
260 @Override
261 public ChannelMetadata metadata() {
262 return METADATA;
263 }
264
265 @Override
266 protected AbstractEpollUnsafe newUnsafe() {
267 return new EpollDomainDatagramChannelUnsafe();
268 }
269
270 @Override
271 protected void doRegister(ChannelPromise promise) {
272 super.doRegister(promise);
273 promise.addListener(f -> {
274 if (f.isSuccess() && isRegistered()) {
275
276
277 submitCurrentOps();
278 }
279 });
280 }
281
282
283
284
285
286 public PeerCredentials peerCredentials() throws IOException {
287 return socket.getPeerCredentials();
288 }
289
290 @Override
291 public DomainSocketAddress remoteAddress() {
292 return (DomainSocketAddress) super.remoteAddress();
293 }
294
295 @Override
296 protected DomainSocketAddress remoteAddress0() {
297 return remote;
298 }
299
300 final class EpollDomainDatagramChannelUnsafe extends AbstractEpollUnsafe {
301
302 @Override
303 void epollInReady() {
304 assert eventLoop().inEventLoop();
305 final DomainDatagramChannelConfig config = config();
306 if (shouldBreakEpollInReady(config)) {
307 clearEpollIn0();
308 return;
309 }
310 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
311 final ChannelPipeline pipeline = pipeline();
312 final ByteBufAllocator allocator = config.getAllocator();
313 allocHandle.reset(config);
314
315 Throwable exception = null;
316 try {
317 ByteBuf byteBuf = null;
318 try {
319 boolean connected = isConnected();
320 do {
321 byteBuf = allocHandle.allocate(allocator);
322 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
323
324 final DomainDatagramPacket packet;
325 if (connected) {
326 allocHandle.lastBytesRead(doReadBytes(byteBuf));
327 if (allocHandle.lastBytesRead() <= 0) {
328
329 byteBuf.release();
330 break;
331 }
332 packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
333 (DomainSocketAddress) remoteAddress());
334 } else {
335 final DomainDatagramSocketAddress remoteAddress;
336 if (byteBuf.hasMemoryAddress()) {
337
338 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
339 byteBuf.writerIndex(), byteBuf.capacity());
340 } else {
341 ByteBuffer nioData = byteBuf.internalNioBuffer(
342 byteBuf.writerIndex(), byteBuf.writableBytes());
343 remoteAddress =
344 socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
345 }
346
347 if (remoteAddress == null) {
348 allocHandle.lastBytesRead(-1);
349 byteBuf.release();
350 break;
351 }
352 DomainSocketAddress localAddress = remoteAddress.localAddress();
353 if (localAddress == null) {
354 localAddress = (DomainSocketAddress) localAddress();
355 }
356 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
357 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
358
359 packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
360 }
361
362 allocHandle.incMessagesRead(1);
363
364 readPending = false;
365 pipeline.fireChannelRead(packet);
366
367 byteBuf = null;
368
369
370
371 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
372 } catch (Throwable t) {
373 if (byteBuf != null) {
374 byteBuf.release();
375 }
376 exception = t;
377 }
378
379 allocHandle.readComplete();
380 pipeline.fireChannelReadComplete();
381
382 if (exception != null) {
383 pipeline.fireExceptionCaught(exception);
384 }
385 } finally {
386 if (shouldStopReading(config)) {
387 clearEpollIn();
388 }
389 }
390 }
391 }
392 }