View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelMetadata;
22  import io.netty.channel.ChannelOutboundBuffer;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.DefaultAddressedEnvelope;
25  import io.netty.channel.unix.DomainDatagramChannel;
26  import io.netty.channel.unix.DomainDatagramChannelConfig;
27  import io.netty.channel.unix.DomainDatagramPacket;
28  import io.netty.channel.unix.DomainDatagramSocketAddress;
29  import io.netty.channel.unix.DomainSocketAddress;
30  import io.netty.channel.unix.IovArray;
31  import io.netty.channel.unix.PeerCredentials;
32  import io.netty.channel.unix.UnixChannelUtil;
33  import io.netty.util.CharsetUtil;
34  import io.netty.util.UncheckedBooleanSupplier;
35  import io.netty.util.internal.StringUtil;
36  import io.netty.util.internal.UnstableApi;
37  
38  import java.io.IOException;
39  import java.net.SocketAddress;
40  import java.nio.ByteBuffer;
41  
42  import static io.netty.channel.epoll.LinuxSocket.newSocketDomainDgram;
43  
44  @UnstableApi
45  public final class EpollDomainDatagramChannel extends AbstractEpollChannel implements DomainDatagramChannel {
46  
47      private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
48  
49      private static final String EXPECTED_TYPES =
50              " (expected: " +
51                      StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
52                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
53                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
54                      StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
55                      StringUtil.simpleClassName(ByteBuf.class) + ')';
56  
57      private volatile boolean connected;
58      private volatile DomainSocketAddress local;
59      private volatile DomainSocketAddress remote;
60  
61      private final EpollDomainDatagramChannelConfig config;
62  
63      public EpollDomainDatagramChannel() {
64          this(newSocketDomainDgram(), false);
65      }
66  
67      public EpollDomainDatagramChannel(int fd) {
68          this(new LinuxSocket(fd), true);
69      }
70  
71      private EpollDomainDatagramChannel(LinuxSocket socket, boolean active) {
72          super(null, socket, active);
73          config = new EpollDomainDatagramChannelConfig(this);
74      }
75  
76      @Override
77      public EpollDomainDatagramChannelConfig config() {
78          return config;
79      }
80  
81      @Override
82      protected void doBind(SocketAddress localAddress) throws Exception {
83          super.doBind(localAddress);
84          local = (DomainSocketAddress) localAddress;
85          active = true;
86      }
87  
88      @Override
89      protected void doClose() throws Exception {
90          super.doClose();
91          connected = active = false;
92          local = null;
93          remote = null;
94      }
95  
96      @Override
97      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
98          if (super.doConnect(remoteAddress, localAddress)) {
99              if (localAddress != null) {
100                 local = (DomainSocketAddress) localAddress;
101             }
102             remote = (DomainSocketAddress) remoteAddress;
103             connected = true;
104             return true;
105         }
106         return false;
107     }
108 
109     @Override
110     protected void doDisconnect() throws Exception {
111         doClose();
112     }
113 
114     @Override
115     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
116         int maxMessagesPerWrite = maxMessagesPerWrite();
117         while (maxMessagesPerWrite > 0) {
118             Object msg = in.current();
119             if (msg == null) {
120                 break;
121             }
122 
123             try {
124                 boolean done = false;
125                 for (int i = config().getWriteSpinCount(); i > 0; --i) {
126                     if (doWriteMessage(msg)) {
127                         done = true;
128                         break;
129                     }
130                 }
131 
132                 if (done) {
133                     in.remove();
134                     maxMessagesPerWrite--;
135                 } else {
136                     break;
137                 }
138             } catch (IOException e) {
139                 maxMessagesPerWrite--;
140 
141                 // Continue on write error as a DatagramChannel can write to multiple remote peers
142                 //
143                 // See https://github.com/netty/netty/issues/2665
144                 in.remove(e);
145             }
146         }
147 
148         if (in.isEmpty()) {
149             // Did write all messages.
150             clearFlag(Native.EPOLLOUT);
151         } else {
152             // Did not write all messages.
153             setFlag(Native.EPOLLOUT);
154         }
155     }
156 
157     private boolean doWriteMessage(Object msg) throws Exception {
158         final ByteBuf data;
159         DomainSocketAddress remoteAddress;
160         if (msg instanceof AddressedEnvelope) {
161             @SuppressWarnings("unchecked")
162             AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
163                     (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
164             data = envelope.content();
165             remoteAddress = envelope.recipient();
166         } else {
167             data = (ByteBuf) msg;
168             remoteAddress = null;
169         }
170 
171         final int dataLen = data.readableBytes();
172         if (dataLen == 0) {
173             return true;
174         }
175 
176         final long writtenBytes;
177         if (data.hasMemoryAddress()) {
178             long memoryAddress = data.memoryAddress();
179             if (remoteAddress == null) {
180                 writtenBytes = socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
181             } else {
182                 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
183                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
184             }
185         } else if (data.nioBufferCount() > 1) {
186             IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
187             array.add(data, data.readerIndex(), data.readableBytes());
188             int cnt = array.count();
189             assert cnt != 0;
190 
191             if (remoteAddress == null) {
192                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
193             } else {
194                 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
195                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
196             }
197         } else {
198             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
199             if (remoteAddress == null) {
200                 writtenBytes = socket.send(nioData, nioData.position(), nioData.limit());
201             } else {
202                 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
203                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
204             }
205         }
206 
207         return writtenBytes > 0;
208     }
209 
210     @Override
211     protected Object filterOutboundMessage(Object msg) {
212         if (msg instanceof DomainDatagramPacket) {
213             DomainDatagramPacket packet = (DomainDatagramPacket) msg;
214             ByteBuf content = packet.content();
215             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
216                     new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
217         }
218 
219         if (msg instanceof ByteBuf) {
220             ByteBuf buf = (ByteBuf) msg;
221             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
222         }
223 
224         if (msg instanceof AddressedEnvelope) {
225             @SuppressWarnings("unchecked")
226             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
227             if (e.content() instanceof ByteBuf &&
228                     (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
229 
230                 ByteBuf content = (ByteBuf) e.content();
231                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
232                         new DefaultAddressedEnvelope<ByteBuf, DomainSocketAddress>(
233                                 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
234             }
235         }
236 
237         throw new UnsupportedOperationException(
238                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
239     }
240 
241     @Override
242     public boolean isActive() {
243         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
244     }
245 
246     @Override
247     public boolean isConnected() {
248         return connected;
249     }
250 
251     @Override
252     public DomainSocketAddress localAddress() {
253         return (DomainSocketAddress) super.localAddress();
254     }
255 
256     @Override
257     protected DomainSocketAddress localAddress0() {
258         return local;
259     }
260 
261     @Override
262     public ChannelMetadata metadata() {
263         return METADATA;
264     }
265 
266     @Override
267     protected AbstractEpollUnsafe newUnsafe() {
268         return new EpollDomainDatagramChannelUnsafe();
269     }
270 
271     /**
272      * Returns the unix credentials (uid, gid, pid) of the peer
273      * <a href=https://man7.org/linux/man-pages/man7/socket.7.html>SO_PEERCRED</a>
274      */
275     public PeerCredentials peerCredentials() throws IOException {
276         return socket.getPeerCredentials();
277     }
278 
279     @Override
280     public DomainSocketAddress remoteAddress() {
281         return (DomainSocketAddress) super.remoteAddress();
282     }
283 
284     @Override
285     protected DomainSocketAddress remoteAddress0() {
286         return remote;
287     }
288 
289     final class EpollDomainDatagramChannelUnsafe extends AbstractEpollUnsafe {
290 
291         @Override
292         void epollInReady() {
293             assert eventLoop().inEventLoop();
294             final DomainDatagramChannelConfig config = config();
295             if (shouldBreakEpollInReady(config)) {
296                 clearEpollIn0();
297                 return;
298             }
299             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
300             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
301 
302             final ChannelPipeline pipeline = pipeline();
303             final ByteBufAllocator allocator = config.getAllocator();
304             allocHandle.reset(config);
305             epollInBefore();
306 
307             Throwable exception = null;
308             try {
309                 ByteBuf byteBuf = null;
310                 try {
311                     boolean connected = isConnected();
312                     do {
313                         byteBuf = allocHandle.allocate(allocator);
314                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
315 
316                         final DomainDatagramPacket packet;
317                         if (connected) {
318                             allocHandle.lastBytesRead(doReadBytes(byteBuf));
319                             if (allocHandle.lastBytesRead() <= 0) {
320                                 // nothing was read, release the buffer.
321                                 byteBuf.release();
322                                 break;
323                             }
324                             packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
325                                     (DomainSocketAddress) remoteAddress());
326                         } else {
327                             final DomainDatagramSocketAddress remoteAddress;
328                             if (byteBuf.hasMemoryAddress()) {
329                                 // has a memory address so use optimized call
330                                 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
331                                         byteBuf.writerIndex(), byteBuf.capacity());
332                             } else {
333                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
334                                         byteBuf.writerIndex(), byteBuf.writableBytes());
335                                 remoteAddress =
336                                         socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
337                             }
338 
339                             if (remoteAddress == null) {
340                                 allocHandle.lastBytesRead(-1);
341                                 byteBuf.release();
342                                 break;
343                             }
344                             DomainSocketAddress localAddress = remoteAddress.localAddress();
345                             if (localAddress == null) {
346                                 localAddress = (DomainSocketAddress) localAddress();
347                             }
348                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
349                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
350 
351                             packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
352                         }
353 
354                         allocHandle.incMessagesRead(1);
355 
356                         readPending = false;
357                         pipeline.fireChannelRead(packet);
358 
359                         byteBuf = null;
360 
361                         // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
362                         // as we read anything).
363                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
364                 } catch (Throwable t) {
365                     if (byteBuf != null) {
366                         byteBuf.release();
367                     }
368                     exception = t;
369                 }
370 
371                 allocHandle.readComplete();
372                 pipeline.fireChannelReadComplete();
373 
374                 if (exception != null) {
375                     pipeline.fireExceptionCaught(exception);
376                 }
377             } finally {
378                 epollInFinally(config);
379             }
380         }
381     }
382 }