View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultAddressedEnvelope;
26  import io.netty.channel.unix.DomainDatagramChannel;
27  import io.netty.channel.unix.DomainDatagramChannelConfig;
28  import io.netty.channel.unix.DomainDatagramPacket;
29  import io.netty.channel.unix.DomainDatagramSocketAddress;
30  import io.netty.channel.unix.DomainSocketAddress;
31  import io.netty.channel.unix.IovArray;
32  import io.netty.channel.unix.PeerCredentials;
33  import io.netty.channel.unix.UnixChannelUtil;
34  import io.netty.util.CharsetUtil;
35  import io.netty.util.UncheckedBooleanSupplier;
36  import io.netty.util.internal.StringUtil;
37  
38  import java.io.IOException;
39  import java.net.SocketAddress;
40  import java.nio.ByteBuffer;
41  
42  import static io.netty.channel.epoll.LinuxSocket.newSocketDomainDgram;
43  
44  public final class EpollDomainDatagramChannel extends AbstractEpollChannel implements DomainDatagramChannel {
45  
46      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
47  
48      private static final String EXPECTED_TYPES =
49              " (expected: " +
50                      StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
51                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
52                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
53                      StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
54                      StringUtil.simpleClassName(ByteBuf.class) + ')';
55  
56      private volatile boolean connected;
57      private volatile DomainSocketAddress local;
58      private volatile DomainSocketAddress remote;
59  
60      private final EpollDomainDatagramChannelConfig config;
61  
62      public EpollDomainDatagramChannel() {
63          this(newSocketDomainDgram(), false);
64      }
65  
66      public EpollDomainDatagramChannel(int fd) {
67          this(new LinuxSocket(fd), true);
68      }
69  
70      private EpollDomainDatagramChannel(LinuxSocket socket, boolean active) {
71          super(null, socket, active, EpollIoOps.valueOf(0));
72          config = new EpollDomainDatagramChannelConfig(this);
73      }
74  
75      @Override
76      public EpollDomainDatagramChannelConfig config() {
77          return config;
78      }
79  
80      @Override
81      protected void doBind(SocketAddress localAddress) throws Exception {
82          super.doBind(localAddress);
83          local = (DomainSocketAddress) localAddress;
84          active = true;
85      }
86  
87      @Override
88      protected void doClose() throws Exception {
89          super.doClose();
90          connected = active = false;
91          local = null;
92          remote = null;
93      }
94  
95      @Override
96      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
97          if (super.doConnect(remoteAddress, localAddress)) {
98              if (localAddress != null) {
99                  local = (DomainSocketAddress) localAddress;
100             }
101             remote = (DomainSocketAddress) remoteAddress;
102             connected = true;
103             return true;
104         }
105         return false;
106     }
107 
108     @Override
109     protected void doDisconnect() throws Exception {
110         doClose();
111     }
112 
113     @Override
114     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
115         int maxMessagesPerWrite = maxMessagesPerWrite();
116         while (maxMessagesPerWrite > 0) {
117             Object msg = in.current();
118             if (msg == null) {
119                 break;
120             }
121 
122             try {
123                 boolean done = false;
124                 for (int i = config().getWriteSpinCount(); i > 0; --i) {
125                     if (doWriteMessage(msg)) {
126                         done = true;
127                         break;
128                     }
129                 }
130 
131                 if (done) {
132                     in.remove();
133                     maxMessagesPerWrite--;
134                 } else {
135                     break;
136                 }
137             } catch (IOException e) {
138                 maxMessagesPerWrite--;
139 
140                 // Continue on write error as a DatagramChannel can write to multiple remote peers
141                 //
142                 // See https://github.com/netty/netty/issues/2665
143                 in.remove(e);
144             }
145         }
146 
147         if (in.isEmpty()) {
148             // Did write all messages.
149             clearFlag(Native.EPOLLOUT);
150         } else {
151             // Did not write all messages.
152             setFlag(Native.EPOLLOUT);
153         }
154     }
155 
156     private boolean doWriteMessage(Object msg) throws Exception {
157         final ByteBuf data;
158         DomainSocketAddress remoteAddress;
159         if (msg instanceof AddressedEnvelope) {
160             @SuppressWarnings("unchecked")
161             AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
162                     (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
163             data = envelope.content();
164             remoteAddress = envelope.recipient();
165         } else {
166             data = (ByteBuf) msg;
167             remoteAddress = null;
168         }
169 
170         final int dataLen = data.readableBytes();
171         if (dataLen == 0) {
172             return true;
173         }
174 
175         final long writtenBytes;
176         if (data.hasMemoryAddress()) {
177             long memoryAddress = data.memoryAddress();
178             if (remoteAddress == null) {
179                 writtenBytes = socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
180             } else {
181                 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
182                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
183             }
184         } else if (data.nioBufferCount() > 1) {
185             IovArray array =  ((NativeArrays) registration().attachment()).cleanIovArray();
186             array.add(data, data.readerIndex(), data.readableBytes());
187             int cnt = array.count();
188             assert cnt != 0;
189 
190             if (remoteAddress == null) {
191                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
192             } else {
193                 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
194                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
195             }
196         } else {
197             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
198             if (remoteAddress == null) {
199                 writtenBytes = socket.send(nioData, nioData.position(), nioData.limit());
200             } else {
201                 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
202                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
203             }
204         }
205 
206         return writtenBytes > 0;
207     }
208 
209     @Override
210     protected Object filterOutboundMessage(Object msg) {
211         if (msg instanceof DomainDatagramPacket) {
212             DomainDatagramPacket packet = (DomainDatagramPacket) msg;
213             ByteBuf content = packet.content();
214             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
215                     new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
216         }
217 
218         if (msg instanceof ByteBuf) {
219             ByteBuf buf = (ByteBuf) msg;
220             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
221         }
222 
223         if (msg instanceof AddressedEnvelope) {
224             @SuppressWarnings("unchecked")
225             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
226             if (e.content() instanceof ByteBuf &&
227                     (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
228 
229                 ByteBuf content = (ByteBuf) e.content();
230                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
231                         new DefaultAddressedEnvelope<>(
232                                 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
233             }
234         }
235 
236         throw new UnsupportedOperationException(
237                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
238     }
239 
240     @Override
241     public boolean isActive() {
242         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
243     }
244 
245     @Override
246     public boolean isConnected() {
247         return connected;
248     }
249 
250     @Override
251     public DomainSocketAddress localAddress() {
252         return (DomainSocketAddress) super.localAddress();
253     }
254 
255     @Override
256     protected DomainSocketAddress localAddress0() {
257         return local;
258     }
259 
260     @Override
261     public ChannelMetadata metadata() {
262         return METADATA;
263     }
264 
265     @Override
266     protected AbstractEpollUnsafe newUnsafe() {
267         return new EpollDomainDatagramChannelUnsafe();
268     }
269 
270     @Override
271     protected void doRegister(ChannelPromise promise) {
272         super.doRegister(promise);
273         promise.addListener(f -> {
274             if (f.isSuccess() && isRegistered()) {
275                 // As Datagram is connection-less we can submit the current ops once the registration itself was
276                 // successful.
277                 submitCurrentOps();
278             }
279         });
280     }
281 
282     /**
283      * Returns the unix credentials (uid, gid, pid) of the peer
284      * <a href=https://man7.org/linux/man-pages/man7/socket.7.html>SO_PEERCRED</a>
285      */
286     public PeerCredentials peerCredentials() throws IOException {
287         return socket.getPeerCredentials();
288     }
289 
290     @Override
291     public DomainSocketAddress remoteAddress() {
292         return (DomainSocketAddress) super.remoteAddress();
293     }
294 
295     @Override
296     protected DomainSocketAddress remoteAddress0() {
297         return remote;
298     }
299 
300     final class EpollDomainDatagramChannelUnsafe extends AbstractEpollUnsafe {
301 
302         @Override
303         void epollInReady() {
304             assert eventLoop().inEventLoop();
305             final DomainDatagramChannelConfig config = config();
306             if (shouldBreakEpollInReady(config)) {
307                 clearEpollIn0();
308                 return;
309             }
310             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
311             final ChannelPipeline pipeline = pipeline();
312             final ByteBufAllocator allocator = config.getAllocator();
313             allocHandle.reset(config);
314 
315             Throwable exception = null;
316             try {
317                 ByteBuf byteBuf = null;
318                 try {
319                     boolean connected = isConnected();
320                     do {
321                         byteBuf = allocHandle.allocate(allocator);
322                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
323 
324                         final DomainDatagramPacket packet;
325                         if (connected) {
326                             allocHandle.lastBytesRead(doReadBytes(byteBuf));
327                             if (allocHandle.lastBytesRead() <= 0) {
328                                 // nothing was read, release the buffer.
329                                 byteBuf.release();
330                                 break;
331                             }
332                             packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
333                                     (DomainSocketAddress) remoteAddress());
334                         } else {
335                             final DomainDatagramSocketAddress remoteAddress;
336                             if (byteBuf.hasMemoryAddress()) {
337                                 // has a memory address so use optimized call
338                                 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
339                                         byteBuf.writerIndex(), byteBuf.capacity());
340                             } else {
341                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
342                                         byteBuf.writerIndex(), byteBuf.writableBytes());
343                                 remoteAddress =
344                                         socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
345                             }
346 
347                             if (remoteAddress == null) {
348                                 allocHandle.lastBytesRead(-1);
349                                 byteBuf.release();
350                                 break;
351                             }
352                             DomainSocketAddress localAddress = remoteAddress.localAddress();
353                             if (localAddress == null) {
354                                 localAddress = (DomainSocketAddress) localAddress();
355                             }
356                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
357                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
358 
359                             packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
360                         }
361 
362                         allocHandle.incMessagesRead(1);
363 
364                         readPending = false;
365                         pipeline.fireChannelRead(packet);
366 
367                         byteBuf = null;
368 
369                         // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
370                         // as we read anything).
371                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
372                 } catch (Throwable t) {
373                     if (byteBuf != null) {
374                         byteBuf.release();
375                     }
376                     exception = t;
377                 }
378 
379                 allocHandle.readComplete();
380                 pipeline.fireChannelReadComplete();
381 
382                 if (exception != null) {
383                     pipeline.fireExceptionCaught(exception);
384                 }
385             } finally {
386                 if (shouldStopReading(config)) {
387                     clearEpollIn();
388                 }
389             }
390         }
391     }
392 }