View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.DefaultAddressedEnvelope;
23  import io.netty.channel.unix.DomainDatagramChannel;
24  import io.netty.channel.unix.DomainDatagramChannelConfig;
25  import io.netty.channel.unix.DomainDatagramPacket;
26  import io.netty.channel.unix.DomainDatagramSocketAddress;
27  import io.netty.channel.unix.DomainSocketAddress;
28  import io.netty.channel.unix.IovArray;
29  import io.netty.channel.unix.PeerCredentials;
30  import io.netty.channel.unix.UnixChannelUtil;
31  import io.netty.util.CharsetUtil;
32  import io.netty.util.UncheckedBooleanSupplier;
33  import io.netty.util.internal.StringUtil;
34  
35  import java.io.IOException;
36  import java.net.SocketAddress;
37  import java.nio.ByteBuffer;
38  
39  import static io.netty.channel.kqueue.BsdSocket.newSocketDomainDgram;
40  
41  public final class KQueueDomainDatagramChannel extends AbstractKQueueDatagramChannel implements DomainDatagramChannel {
42  
43      private static final String EXPECTED_TYPES =
44              " (expected: " +
45                      StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
46                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
47                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
48                      StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
49                      StringUtil.simpleClassName(ByteBuf.class) + ')';
50  
51      private volatile boolean connected;
52      private volatile DomainSocketAddress local;
53      private volatile DomainSocketAddress remote;
54  
55      private final KQueueDomainDatagramChannelConfig config;
56  
57      public KQueueDomainDatagramChannel() {
58          this(newSocketDomainDgram(), false);
59      }
60  
61      public KQueueDomainDatagramChannel(int fd) {
62          this(new BsdSocket(fd), true);
63      }
64  
65      private KQueueDomainDatagramChannel(BsdSocket socket, boolean active) {
66          super(null, socket, active);
67          config = new KQueueDomainDatagramChannelConfig(this);
68      }
69  
70      @Override
71      public KQueueDomainDatagramChannelConfig config() {
72          return config;
73      }
74  
75      @Override
76      protected void doBind(SocketAddress localAddress) throws Exception {
77          super.doBind(localAddress);
78          local = (DomainSocketAddress) localAddress;
79          active = true;
80      }
81  
82      @Override
83      protected void doClose() throws Exception {
84          super.doClose();
85          connected = active = false;
86          local = null;
87          remote = null;
88      }
89  
90      @Override
91      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
92          if (super.doConnect(remoteAddress, localAddress)) {
93              if (localAddress != null) {
94                  local = (DomainSocketAddress) localAddress;
95              }
96              remote = (DomainSocketAddress) remoteAddress;
97              connected = true;
98              return true;
99          }
100         return false;
101     }
102 
103     @Override
104     protected void doDisconnect() throws Exception {
105         doClose();
106     }
107 
108     @Override
109     protected boolean doWriteMessage(Object msg) throws Exception {
110         final ByteBuf data;
111         DomainSocketAddress remoteAddress;
112         if (msg instanceof AddressedEnvelope) {
113             @SuppressWarnings("unchecked")
114             AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
115                     (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
116             data = envelope.content();
117             remoteAddress = envelope.recipient();
118         } else {
119             data = (ByteBuf) msg;
120             remoteAddress = null;
121         }
122 
123         final int dataLen = data.readableBytes();
124         if (dataLen == 0) {
125             return true;
126         }
127 
128         final long writtenBytes;
129         if (data.hasMemoryAddress()) {
130             long memoryAddress = data.memoryAddress();
131             if (remoteAddress == null) {
132                 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
133             } else {
134                 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
135                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
136             }
137         } else if (data.nioBufferCount() > 1) {
138             IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
139             array.add(data, data.readerIndex(), data.readableBytes());
140             int cnt = array.count();
141             assert cnt != 0;
142 
143             if (remoteAddress == null) {
144                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
145             } else {
146                 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
147                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
148             }
149         } else {
150             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
151             if (remoteAddress == null) {
152                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
153             } else {
154                 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
155                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
156             }
157         }
158 
159         return writtenBytes > 0;
160     }
161 
162     @Override
163     protected Object filterOutboundMessage(Object msg) {
164         if (msg instanceof DomainDatagramPacket) {
165             DomainDatagramPacket packet = (DomainDatagramPacket) msg;
166             ByteBuf content = packet.content();
167             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
168                     new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
169         }
170 
171         if (msg instanceof ByteBuf) {
172             ByteBuf buf = (ByteBuf) msg;
173             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
174         }
175 
176         if (msg instanceof AddressedEnvelope) {
177             @SuppressWarnings("unchecked")
178             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
179             if (e.content() instanceof ByteBuf &&
180                     (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
181 
182                 ByteBuf content = (ByteBuf) e.content();
183                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
184                         new DefaultAddressedEnvelope<ByteBuf, DomainSocketAddress>(
185                                 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
186             }
187         }
188 
189         throw new UnsupportedOperationException(
190                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
191     }
192 
193     @Override
194     public boolean isActive() {
195         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
196     }
197 
198     @Override
199     public boolean isConnected() {
200         return connected;
201     }
202 
203     @Override
204     public DomainSocketAddress localAddress() {
205         return (DomainSocketAddress) super.localAddress();
206     }
207 
208     @Override
209     protected DomainSocketAddress localAddress0() {
210         return local;
211     }
212 
213     @Override
214     protected AbstractKQueueUnsafe newUnsafe() {
215         return new KQueueDomainDatagramChannelUnsafe();
216     }
217 
218     /**
219      * Returns the unix credentials (uid, gid, pid) of the peer
220      * <a href=https://man7.org/linux/man-pages/man7/socket.7.html>SO_PEERCRED</a>
221      */
222     public PeerCredentials peerCredentials() throws IOException {
223         return socket.getPeerCredentials();
224     }
225 
226     @Override
227     public DomainSocketAddress remoteAddress() {
228         return (DomainSocketAddress) super.remoteAddress();
229     }
230 
231     @Override
232     protected DomainSocketAddress remoteAddress0() {
233         return remote;
234     }
235 
236     final class KQueueDomainDatagramChannelUnsafe extends AbstractKQueueUnsafe {
237 
238         @Override
239         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
240             assert eventLoop().inEventLoop();
241             final DomainDatagramChannelConfig config = config();
242             if (shouldBreakReadReady(config)) {
243                 clearReadFilter0();
244                 return;
245             }
246             final ChannelPipeline pipeline = pipeline();
247             final ByteBufAllocator allocator = config.getAllocator();
248             allocHandle.reset(config);
249             readReadyBefore();
250 
251             Throwable exception = null;
252             try {
253                 ByteBuf byteBuf = null;
254                 try {
255                     boolean connected = isConnected();
256                     do {
257                         byteBuf = allocHandle.allocate(allocator);
258                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
259 
260                         final DomainDatagramPacket packet;
261                         if (connected) {
262                             allocHandle.lastBytesRead(doReadBytes(byteBuf));
263                             if (allocHandle.lastBytesRead() <= 0) {
264                                 // nothing was read, release the buffer.
265                                 byteBuf.release();
266                                 break;
267                             }
268                             packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
269                                     (DomainSocketAddress) remoteAddress());
270                         } else {
271                             final DomainDatagramSocketAddress remoteAddress;
272                             if (byteBuf.hasMemoryAddress()) {
273                                 // has a memory address so use optimized call
274                                 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
275                                         byteBuf.writerIndex(), byteBuf.capacity());
276                             } else {
277                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
278                                         byteBuf.writerIndex(), byteBuf.writableBytes());
279                                 remoteAddress =
280                                         socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
281                             }
282 
283                             if (remoteAddress == null) {
284                                 allocHandle.lastBytesRead(-1);
285                                 byteBuf.release();
286                                 break;
287                             }
288                             DomainSocketAddress localAddress = remoteAddress.localAddress();
289                             if (localAddress == null) {
290                                 localAddress = (DomainSocketAddress) localAddress();
291                             }
292                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
293                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
294 
295                             packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
296                         }
297 
298                         allocHandle.incMessagesRead(1);
299 
300                         readPending = false;
301                         pipeline.fireChannelRead(packet);
302 
303                         byteBuf = null;
304 
305                         // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
306                         // as we read anything).
307                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
308                 } catch (Throwable t) {
309                     if (byteBuf != null) {
310                         byteBuf.release();
311                     }
312                     exception = t;
313                 }
314 
315                 allocHandle.readComplete();
316                 pipeline.fireChannelReadComplete();
317 
318                 if (exception != null) {
319                     pipeline.fireExceptionCaught(exception);
320                 }
321             } finally {
322                 readReadyFinally(config);
323             }
324         }
325     }
326 }