View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.DefaultAddressedEnvelope;
25  import io.netty.channel.unix.DomainDatagramChannel;
26  import io.netty.channel.unix.DomainDatagramChannelConfig;
27  import io.netty.channel.unix.DomainDatagramPacket;
28  import io.netty.channel.unix.DomainDatagramSocketAddress;
29  import io.netty.channel.unix.DomainSocketAddress;
30  import io.netty.channel.unix.IovArray;
31  import io.netty.channel.unix.PeerCredentials;
32  import io.netty.channel.unix.UnixChannelUtil;
33  import io.netty.util.CharsetUtil;
34  import io.netty.util.UncheckedBooleanSupplier;
35  import io.netty.util.internal.StringUtil;
36  
37  import java.io.IOException;
38  import java.net.SocketAddress;
39  import java.nio.ByteBuffer;
40  
41  import static io.netty.channel.epoll.LinuxSocket.newSocketDomainDgram;
42  
43  public final class EpollDomainDatagramChannel extends AbstractEpollChannel implements DomainDatagramChannel {
44  
45      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
46  
47      private static final String EXPECTED_TYPES =
48              " (expected: " +
49                      StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
50                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
51                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
52                      StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
53                      StringUtil.simpleClassName(ByteBuf.class) + ')';
54  
55      private volatile boolean connected;
56      private volatile DomainSocketAddress local;
57      private volatile DomainSocketAddress remote;
58  
59      private final EpollDomainDatagramChannelConfig config;
60  
61      public EpollDomainDatagramChannel() {
62          this(newSocketDomainDgram(), false);
63      }
64  
65      public EpollDomainDatagramChannel(int fd) {
66          this(new LinuxSocket(fd), true);
67      }
68  
69      private EpollDomainDatagramChannel(LinuxSocket socket, boolean active) {
70          super(null, socket, active, EpollIoOps.valueOf(0));
71          config = new EpollDomainDatagramChannelConfig(this);
72      }
73  
74      @Override
75      public EpollDomainDatagramChannelConfig config() {
76          return config;
77      }
78  
79      @Override
80      protected void doBind(SocketAddress localAddress) throws Exception {
81          super.doBind(localAddress);
82          local = (DomainSocketAddress) localAddress;
83          active = true;
84      }
85  
86      @Override
87      protected void doClose() throws Exception {
88          super.doClose();
89          connected = active = false;
90          local = null;
91          remote = null;
92      }
93  
94      @Override
95      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
96          if (super.doConnect(remoteAddress, localAddress)) {
97              if (localAddress != null) {
98                  local = (DomainSocketAddress) localAddress;
99              }
100             remote = (DomainSocketAddress) remoteAddress;
101             connected = true;
102             return true;
103         }
104         return false;
105     }
106 
107     @Override
108     protected void doDisconnect() throws Exception {
109         doClose();
110     }
111 
112     @Override
113     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
114         int maxMessagesPerWrite = maxMessagesPerWrite();
115         while (maxMessagesPerWrite > 0) {
116             Object msg = in.current();
117             if (msg == null) {
118                 break;
119             }
120 
121             try {
122                 boolean done = false;
123                 for (int i = config().getWriteSpinCount(); i > 0; --i) {
124                     if (doWriteMessage(msg)) {
125                         done = true;
126                         break;
127                     }
128                 }
129 
130                 if (done) {
131                     in.remove();
132                     maxMessagesPerWrite--;
133                 } else {
134                     break;
135                 }
136             } catch (IOException e) {
137                 maxMessagesPerWrite--;
138 
139                 // Continue on write error as a DatagramChannel can write to multiple remote peers
140                 //
141                 // See https://github.com/netty/netty/issues/2665
142                 in.remove(e);
143             }
144         }
145 
146         if (in.isEmpty()) {
147             // Did write all messages.
148             clearFlag(Native.EPOLLOUT);
149         } else {
150             // Did not write all messages.
151             setFlag(Native.EPOLLOUT);
152         }
153     }
154 
155     private boolean doWriteMessage(Object msg) throws Exception {
156         final ByteBuf data;
157         DomainSocketAddress remoteAddress;
158         if (msg instanceof AddressedEnvelope) {
159             @SuppressWarnings("unchecked")
160             AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
161                     (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
162             data = envelope.content();
163             remoteAddress = envelope.recipient();
164         } else {
165             data = (ByteBuf) msg;
166             remoteAddress = null;
167         }
168 
169         final int dataLen = data.readableBytes();
170         if (dataLen == 0) {
171             return true;
172         }
173 
174         final long writtenBytes;
175         if (data.hasMemoryAddress()) {
176             long memoryAddress = data.memoryAddress();
177             if (remoteAddress == null) {
178                 writtenBytes = socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
179             } else {
180                 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
181                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
182             }
183         } else if (data.nioBufferCount() > 1) {
184             IovArray array = registration().ioHandler().cleanIovArray();
185             array.add(data, data.readerIndex(), data.readableBytes());
186             int cnt = array.count();
187             assert cnt != 0;
188 
189             if (remoteAddress == null) {
190                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
191             } else {
192                 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
193                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
194             }
195         } else {
196             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
197             if (remoteAddress == null) {
198                 writtenBytes = socket.send(nioData, nioData.position(), nioData.limit());
199             } else {
200                 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
201                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
202             }
203         }
204 
205         return writtenBytes > 0;
206     }
207 
208     @Override
209     protected Object filterOutboundMessage(Object msg) {
210         if (msg instanceof DomainDatagramPacket) {
211             DomainDatagramPacket packet = (DomainDatagramPacket) msg;
212             ByteBuf content = packet.content();
213             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
214                     new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
215         }
216 
217         if (msg instanceof ByteBuf) {
218             ByteBuf buf = (ByteBuf) msg;
219             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
220         }
221 
222         if (msg instanceof AddressedEnvelope) {
223             @SuppressWarnings("unchecked")
224             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
225             if (e.content() instanceof ByteBuf &&
226                     (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
227 
228                 ByteBuf content = (ByteBuf) e.content();
229                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
230                         new DefaultAddressedEnvelope<ByteBuf, DomainSocketAddress>(
231                                 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
232             }
233         }
234 
235         throw new UnsupportedOperationException(
236                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
237     }
238 
239     @Override
240     public boolean isActive() {
241         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
242     }
243 
244     @Override
245     public boolean isConnected() {
246         return connected;
247     }
248 
249     @Override
250     public DomainSocketAddress localAddress() {
251         return (DomainSocketAddress) super.localAddress();
252     }
253 
254     @Override
255     protected DomainSocketAddress localAddress0() {
256         return local;
257     }
258 
259     @Override
260     public ChannelMetadata metadata() {
261         return METADATA;
262     }
263 
264     @Override
265     protected AbstractEpollUnsafe newUnsafe() {
266         return new EpollDomainDatagramChannelUnsafe();
267     }
268 
269     /**
270      * Returns the unix credentials (uid, gid, pid) of the peer
271      * <a href=https://man7.org/linux/man-pages/man7/socket.7.html>SO_PEERCRED</a>
272      */
273     public PeerCredentials peerCredentials() throws IOException {
274         return socket.getPeerCredentials();
275     }
276 
277     @Override
278     public DomainSocketAddress remoteAddress() {
279         return (DomainSocketAddress) super.remoteAddress();
280     }
281 
282     @Override
283     protected DomainSocketAddress remoteAddress0() {
284         return remote;
285     }
286 
287     final class EpollDomainDatagramChannelUnsafe extends AbstractEpollUnsafe {
288 
289         @Override
290         void epollInReady() {
291             assert eventLoop().inEventLoop();
292             final DomainDatagramChannelConfig config = config();
293             if (shouldBreakEpollInReady(config)) {
294                 clearEpollIn0();
295                 return;
296             }
297             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
298             final ChannelPipeline pipeline = pipeline();
299             final ByteBufAllocator allocator = config.getAllocator();
300             allocHandle.reset(config);
301 
302             Throwable exception = null;
303             try {
304                 ByteBuf byteBuf = null;
305                 try {
306                     boolean connected = isConnected();
307                     do {
308                         byteBuf = allocHandle.allocate(allocator);
309                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
310 
311                         final DomainDatagramPacket packet;
312                         if (connected) {
313                             allocHandle.lastBytesRead(doReadBytes(byteBuf));
314                             if (allocHandle.lastBytesRead() <= 0) {
315                                 // nothing was read, release the buffer.
316                                 byteBuf.release();
317                                 break;
318                             }
319                             packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
320                                     (DomainSocketAddress) remoteAddress());
321                         } else {
322                             final DomainDatagramSocketAddress remoteAddress;
323                             if (byteBuf.hasMemoryAddress()) {
324                                 // has a memory address so use optimized call
325                                 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
326                                         byteBuf.writerIndex(), byteBuf.capacity());
327                             } else {
328                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
329                                         byteBuf.writerIndex(), byteBuf.writableBytes());
330                                 remoteAddress =
331                                         socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
332                             }
333 
334                             if (remoteAddress == null) {
335                                 allocHandle.lastBytesRead(-1);
336                                 byteBuf.release();
337                                 break;
338                             }
339                             DomainSocketAddress localAddress = remoteAddress.localAddress();
340                             if (localAddress == null) {
341                                 localAddress = (DomainSocketAddress) localAddress();
342                             }
343                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
344                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
345 
346                             packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
347                         }
348 
349                         allocHandle.incMessagesRead(1);
350 
351                         readPending = false;
352                         pipeline.fireChannelRead(packet);
353 
354                         byteBuf = null;
355 
356                         // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
357                         // as we read anything).
358                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
359                 } catch (Throwable t) {
360                     if (byteBuf != null) {
361                         byteBuf.release();
362                     }
363                     exception = t;
364                 }
365 
366                 allocHandle.readComplete();
367                 pipeline.fireChannelReadComplete();
368 
369                 if (exception != null) {
370                     pipeline.fireExceptionCaught(exception);
371                 }
372             } finally {
373                 if (shouldStopReading(config)) {
374                     clearEpollIn();
375                 }
376             }
377         }
378     }
379 }