View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.DefaultAddressedEnvelope;
23  import io.netty.channel.unix.DomainDatagramChannel;
24  import io.netty.channel.unix.DomainDatagramChannelConfig;
25  import io.netty.channel.unix.DomainDatagramPacket;
26  import io.netty.channel.unix.DomainDatagramSocketAddress;
27  import io.netty.channel.unix.DomainSocketAddress;
28  import io.netty.channel.unix.IovArray;
29  import io.netty.channel.unix.PeerCredentials;
30  import io.netty.channel.unix.UnixChannelUtil;
31  import io.netty.util.CharsetUtil;
32  import io.netty.util.UncheckedBooleanSupplier;
33  import io.netty.util.internal.StringUtil;
34  import io.netty.util.internal.UnstableApi;
35  
36  import java.io.IOException;
37  import java.net.SocketAddress;
38  import java.nio.ByteBuffer;
39  
40  import static io.netty.channel.kqueue.BsdSocket.newSocketDomainDgram;
41  
42  @UnstableApi
43  public final class KQueueDomainDatagramChannel extends AbstractKQueueDatagramChannel implements DomainDatagramChannel {
44  
45      private static final String EXPECTED_TYPES =
46              " (expected: " +
47                      StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
48                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
49                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
50                      StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
51                      StringUtil.simpleClassName(ByteBuf.class) + ')';
52  
53      private volatile boolean connected;
54      private volatile DomainSocketAddress local;
55      private volatile DomainSocketAddress remote;
56  
57      private final KQueueDomainDatagramChannelConfig config;
58  
59      public KQueueDomainDatagramChannel() {
60          this(newSocketDomainDgram(), false);
61      }
62  
63      public KQueueDomainDatagramChannel(int fd) {
64          this(new BsdSocket(fd), true);
65      }
66  
67      private KQueueDomainDatagramChannel(BsdSocket socket, boolean active) {
68          super(null, socket, active);
69          config = new KQueueDomainDatagramChannelConfig(this);
70      }
71  
72      @Override
73      public KQueueDomainDatagramChannelConfig config() {
74          return config;
75      }
76  
77      @Override
78      protected void doBind(SocketAddress localAddress) throws Exception {
79          super.doBind(localAddress);
80          local = (DomainSocketAddress) localAddress;
81          active = true;
82      }
83  
84      @Override
85      protected void doClose() throws Exception {
86          super.doClose();
87          connected = active = false;
88          local = null;
89          remote = null;
90      }
91  
92      @Override
93      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
94          if (super.doConnect(remoteAddress, localAddress)) {
95              if (localAddress != null) {
96                  local = (DomainSocketAddress) localAddress;
97              }
98              remote = (DomainSocketAddress) remoteAddress;
99              connected = true;
100             return true;
101         }
102         return false;
103     }
104 
105     @Override
106     protected void doDisconnect() throws Exception {
107         doClose();
108     }
109 
110     @Override
111     protected boolean doWriteMessage(Object msg) throws Exception {
112         final ByteBuf data;
113         DomainSocketAddress remoteAddress;
114         if (msg instanceof AddressedEnvelope) {
115             @SuppressWarnings("unchecked")
116             AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
117                     (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
118             data = envelope.content();
119             remoteAddress = envelope.recipient();
120         } else {
121             data = (ByteBuf) msg;
122             remoteAddress = null;
123         }
124 
125         final int dataLen = data.readableBytes();
126         if (dataLen == 0) {
127             return true;
128         }
129 
130         final long writtenBytes;
131         if (data.hasMemoryAddress()) {
132             long memoryAddress = data.memoryAddress();
133             if (remoteAddress == null) {
134                 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
135             } else {
136                 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
137                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
138             }
139         } else if (data.nioBufferCount() > 1) {
140             IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
141             array.add(data, data.readerIndex(), data.readableBytes());
142             int cnt = array.count();
143             assert cnt != 0;
144 
145             if (remoteAddress == null) {
146                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
147             } else {
148                 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
149                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
150             }
151         } else {
152             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
153             if (remoteAddress == null) {
154                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
155             } else {
156                 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
157                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
158             }
159         }
160 
161         return writtenBytes > 0;
162     }
163 
164     @Override
165     protected Object filterOutboundMessage(Object msg) {
166         if (msg instanceof DomainDatagramPacket) {
167             DomainDatagramPacket packet = (DomainDatagramPacket) msg;
168             ByteBuf content = packet.content();
169             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
170                     new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
171         }
172 
173         if (msg instanceof ByteBuf) {
174             ByteBuf buf = (ByteBuf) msg;
175             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
176         }
177 
178         if (msg instanceof AddressedEnvelope) {
179             @SuppressWarnings("unchecked")
180             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
181             if (e.content() instanceof ByteBuf &&
182                     (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
183 
184                 ByteBuf content = (ByteBuf) e.content();
185                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
186                         new DefaultAddressedEnvelope<ByteBuf, DomainSocketAddress>(
187                                 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
188             }
189         }
190 
191         throw new UnsupportedOperationException(
192                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
193     }
194 
195     @Override
196     public boolean isActive() {
197         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
198     }
199 
200     @Override
201     public boolean isConnected() {
202         return connected;
203     }
204 
205     @Override
206     public DomainSocketAddress localAddress() {
207         return (DomainSocketAddress) super.localAddress();
208     }
209 
210     @Override
211     protected DomainSocketAddress localAddress0() {
212         return local;
213     }
214 
215     @Override
216     protected AbstractKQueueUnsafe newUnsafe() {
217         return new KQueueDomainDatagramChannelUnsafe();
218     }
219 
220     /**
221      * Returns the unix credentials (uid, gid, pid) of the peer
222      * <a href=https://man7.org/linux/man-pages/man7/socket.7.html>SO_PEERCRED</a>
223      */
224     public PeerCredentials peerCredentials() throws IOException {
225         return socket.getPeerCredentials();
226     }
227 
228     @Override
229     public DomainSocketAddress remoteAddress() {
230         return (DomainSocketAddress) super.remoteAddress();
231     }
232 
233     @Override
234     protected DomainSocketAddress remoteAddress0() {
235         return remote;
236     }
237 
238     final class KQueueDomainDatagramChannelUnsafe extends AbstractKQueueUnsafe {
239 
240         @Override
241         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
242             assert eventLoop().inEventLoop();
243             final DomainDatagramChannelConfig config = config();
244             if (shouldBreakReadReady(config)) {
245                 clearReadFilter0();
246                 return;
247             }
248             final ChannelPipeline pipeline = pipeline();
249             final ByteBufAllocator allocator = config.getAllocator();
250             allocHandle.reset(config);
251             readReadyBefore();
252 
253             Throwable exception = null;
254             try {
255                 ByteBuf byteBuf = null;
256                 try {
257                     boolean connected = isConnected();
258                     do {
259                         byteBuf = allocHandle.allocate(allocator);
260                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
261 
262                         final DomainDatagramPacket packet;
263                         if (connected) {
264                             allocHandle.lastBytesRead(doReadBytes(byteBuf));
265                             if (allocHandle.lastBytesRead() <= 0) {
266                                 // nothing was read, release the buffer.
267                                 byteBuf.release();
268                                 break;
269                             }
270                             packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
271                                     (DomainSocketAddress) remoteAddress());
272                         } else {
273                             final DomainDatagramSocketAddress remoteAddress;
274                             if (byteBuf.hasMemoryAddress()) {
275                                 // has a memory address so use optimized call
276                                 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
277                                         byteBuf.writerIndex(), byteBuf.capacity());
278                             } else {
279                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
280                                         byteBuf.writerIndex(), byteBuf.writableBytes());
281                                 remoteAddress =
282                                         socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
283                             }
284 
285                             if (remoteAddress == null) {
286                                 allocHandle.lastBytesRead(-1);
287                                 byteBuf.release();
288                                 break;
289                             }
290                             DomainSocketAddress localAddress = remoteAddress.localAddress();
291                             if (localAddress == null) {
292                                 localAddress = (DomainSocketAddress) localAddress();
293                             }
294                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
295                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
296 
297                             packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
298                         }
299 
300                         allocHandle.incMessagesRead(1);
301 
302                         readPending = false;
303                         pipeline.fireChannelRead(packet);
304 
305                         byteBuf = null;
306 
307                         // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
308                         // as we read anything).
309                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
310                 } catch (Throwable t) {
311                     if (byteBuf != null) {
312                         byteBuf.release();
313                     }
314                     exception = t;
315                 }
316 
317                 allocHandle.readComplete();
318                 pipeline.fireChannelReadComplete();
319 
320                 if (exception != null) {
321                     pipeline.fireExceptionCaught(exception);
322                 }
323             } finally {
324                 readReadyFinally(config);
325             }
326         }
327     }
328 }