View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.AddressedEnvelope;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.DefaultAddressedEnvelope;
23  import io.netty.channel.unix.DomainDatagramChannel;
24  import io.netty.channel.unix.DomainDatagramChannelConfig;
25  import io.netty.channel.unix.DomainDatagramPacket;
26  import io.netty.channel.unix.DomainDatagramSocketAddress;
27  import io.netty.channel.unix.DomainSocketAddress;
28  import io.netty.channel.unix.IovArray;
29  import io.netty.channel.unix.PeerCredentials;
30  import io.netty.channel.unix.UnixChannelUtil;
31  import io.netty.util.CharsetUtil;
32  import io.netty.util.UncheckedBooleanSupplier;
33  import io.netty.util.internal.StringUtil;
34  
35  import java.io.IOException;
36  import java.net.SocketAddress;
37  import java.nio.ByteBuffer;
38  
39  import static io.netty.channel.kqueue.BsdSocket.newSocketDomainDgram;
40  
41  public final class KQueueDomainDatagramChannel extends AbstractKQueueDatagramChannel implements DomainDatagramChannel {
42  
43      private static final String EXPECTED_TYPES =
44              " (expected: " +
45                      StringUtil.simpleClassName(DomainDatagramPacket.class) + ", " +
46                      StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
47                      StringUtil.simpleClassName(ByteBuf.class) + ", " +
48                      StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
49                      StringUtil.simpleClassName(ByteBuf.class) + ')';
50  
51      private volatile boolean connected;
52      private volatile DomainSocketAddress local;
53      private volatile DomainSocketAddress remote;
54  
55      private final KQueueDomainDatagramChannelConfig config;
56  
57      public KQueueDomainDatagramChannel() {
58          this(newSocketDomainDgram(), false);
59      }
60  
61      public KQueueDomainDatagramChannel(int fd) {
62          this(new BsdSocket(fd), true);
63      }
64  
65      private KQueueDomainDatagramChannel(BsdSocket socket, boolean active) {
66          super(null, socket, active);
67          config = new KQueueDomainDatagramChannelConfig(this);
68      }
69  
70      @Override
71      public KQueueDomainDatagramChannelConfig config() {
72          return config;
73      }
74  
75      @Override
76      protected void doBind(SocketAddress localAddress) throws Exception {
77          super.doBind(localAddress);
78          local = (DomainSocketAddress) localAddress;
79          active = true;
80      }
81  
82      @Override
83      protected void doClose() throws Exception {
84          try {
85              super.doClose();
86          } finally {
87              connected = active = false;
88              local = null;
89              remote = null;
90          }
91      }
92  
93      @Override
94      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
95          if (super.doConnect(remoteAddress, localAddress)) {
96              if (localAddress != null) {
97                  local = (DomainSocketAddress) localAddress;
98              }
99              remote = (DomainSocketAddress) remoteAddress;
100             connected = true;
101             return true;
102         }
103         return false;
104     }
105 
106     @Override
107     protected void doDisconnect() throws Exception {
108         doClose();
109     }
110 
111     @Override
112     protected boolean doWriteMessage(Object msg) throws Exception {
113         final ByteBuf data;
114         DomainSocketAddress remoteAddress;
115         if (msg instanceof AddressedEnvelope) {
116             @SuppressWarnings("unchecked")
117             AddressedEnvelope<ByteBuf, DomainSocketAddress> envelope =
118                     (AddressedEnvelope<ByteBuf, DomainSocketAddress>) msg;
119             data = envelope.content();
120             remoteAddress = envelope.recipient();
121         } else {
122             data = (ByteBuf) msg;
123             remoteAddress = null;
124         }
125 
126         final int dataLen = data.readableBytes();
127         if (dataLen == 0) {
128             return true;
129         }
130 
131         final long writtenBytes;
132         if (data.hasMemoryAddress()) {
133             long memoryAddress = data.memoryAddress();
134             if (remoteAddress == null) {
135                 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
136             } else {
137                 writtenBytes = socket.sendToAddressDomainSocket(memoryAddress, data.readerIndex(), data.writerIndex(),
138                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
139             }
140         } else if (data.nioBufferCount() > 1) {
141             IovArray array = ((NativeArrays) registration().attachment()).cleanIovArray();
142             array.add(data, data.readerIndex(), data.readableBytes());
143             int cnt = array.count();
144             assert cnt != 0;
145 
146             if (remoteAddress == null) {
147                 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
148             } else {
149                 writtenBytes = socket.sendToAddressesDomainSocket(array.memoryAddress(0), cnt,
150                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
151             }
152         } else {
153             ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
154             if (remoteAddress == null) {
155                 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
156             } else {
157                 writtenBytes = socket.sendToDomainSocket(nioData, nioData.position(), nioData.limit(),
158                         remoteAddress.path().getBytes(CharsetUtil.UTF_8));
159             }
160         }
161 
162         return writtenBytes > 0;
163     }
164 
165     @Override
166     protected Object filterOutboundMessage(Object msg) {
167         if (msg instanceof DomainDatagramPacket) {
168             DomainDatagramPacket packet = (DomainDatagramPacket) msg;
169             ByteBuf content = packet.content();
170             return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
171                     new DomainDatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
172         }
173 
174         if (msg instanceof ByteBuf) {
175             ByteBuf buf = (ByteBuf) msg;
176             return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
177         }
178 
179         if (msg instanceof AddressedEnvelope) {
180             @SuppressWarnings("unchecked")
181             AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
182             if (e.content() instanceof ByteBuf &&
183                     (e.recipient() == null || e.recipient() instanceof DomainSocketAddress)) {
184 
185                 ByteBuf content = (ByteBuf) e.content();
186                 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
187                         new DefaultAddressedEnvelope<ByteBuf, DomainSocketAddress>(
188                                 newDirectBuffer(e, content), (DomainSocketAddress) e.recipient()) : e;
189             }
190         }
191 
192         throw new UnsupportedOperationException(
193                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
194     }
195 
196     @Override
197     public boolean isActive() {
198         return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
199     }
200 
201     @Override
202     public boolean isConnected() {
203         return connected;
204     }
205 
206     @Override
207     public DomainSocketAddress localAddress() {
208         return (DomainSocketAddress) super.localAddress();
209     }
210 
211     @Override
212     protected DomainSocketAddress localAddress0() {
213         return local;
214     }
215 
216     @Override
217     protected AbstractKQueueUnsafe newUnsafe() {
218         return new KQueueDomainDatagramChannelUnsafe();
219     }
220 
221     /**
222      * Returns the unix credentials (uid, gid, pid) of the peer
223      * <a href=https://man7.org/linux/man-pages/man7/socket.7.html>SO_PEERCRED</a>
224      */
225     public PeerCredentials peerCredentials() throws IOException {
226         return socket.getPeerCredentials();
227     }
228 
229     @Override
230     public DomainSocketAddress remoteAddress() {
231         return (DomainSocketAddress) super.remoteAddress();
232     }
233 
234     @Override
235     protected DomainSocketAddress remoteAddress0() {
236         return remote;
237     }
238 
239     final class KQueueDomainDatagramChannelUnsafe extends AbstractKQueueUnsafe {
240 
241         @Override
242         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
243             assert eventLoop().inEventLoop();
244             final DomainDatagramChannelConfig config = config();
245             if (shouldBreakReadReady(config)) {
246                 clearReadFilter0();
247                 return;
248             }
249             final ChannelPipeline pipeline = pipeline();
250             final ByteBufAllocator allocator = config.getAllocator();
251             allocHandle.reset(config);
252 
253             Throwable exception = null;
254             try {
255                 ByteBuf byteBuf = null;
256                 try {
257                     boolean connected = isConnected();
258                     do {
259                         byteBuf = allocHandle.allocate(allocator);
260                         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
261 
262                         final DomainDatagramPacket packet;
263                         if (connected) {
264                             allocHandle.lastBytesRead(doReadBytes(byteBuf));
265                             if (allocHandle.lastBytesRead() <= 0) {
266                                 // nothing was read, release the buffer.
267                                 byteBuf.release();
268                                 break;
269                             }
270                             packet = new DomainDatagramPacket(byteBuf, (DomainSocketAddress) localAddress(),
271                                     (DomainSocketAddress) remoteAddress());
272                         } else {
273                             final DomainDatagramSocketAddress remoteAddress;
274                             if (byteBuf.hasMemoryAddress()) {
275                                 // has a memory address so use optimized call
276                                 remoteAddress = socket.recvFromAddressDomainSocket(byteBuf.memoryAddress(),
277                                         byteBuf.writerIndex(), byteBuf.capacity());
278                             } else {
279                                 ByteBuffer nioData = byteBuf.internalNioBuffer(
280                                         byteBuf.writerIndex(), byteBuf.writableBytes());
281                                 remoteAddress =
282                                         socket.recvFromDomainSocket(nioData, nioData.position(), nioData.limit());
283                             }
284 
285                             if (remoteAddress == null) {
286                                 allocHandle.lastBytesRead(-1);
287                                 byteBuf.release();
288                                 break;
289                             }
290                             DomainSocketAddress localAddress = remoteAddress.localAddress();
291                             if (localAddress == null) {
292                                 localAddress = (DomainSocketAddress) localAddress();
293                             }
294                             allocHandle.lastBytesRead(remoteAddress.receivedAmount());
295                             byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
296 
297                             packet = new DomainDatagramPacket(byteBuf, localAddress, remoteAddress);
298                         }
299 
300                         allocHandle.incMessagesRead(1);
301 
302                         readPending = false;
303                         pipeline.fireChannelRead(packet);
304 
305                         byteBuf = null;
306 
307                         // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
308                         // as we read anything).
309                     } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
310                 } catch (Throwable t) {
311                     if (byteBuf != null) {
312                         byteBuf.release();
313                     }
314                     exception = t;
315                 }
316 
317                 allocHandle.readComplete();
318                 pipeline.fireChannelReadComplete();
319 
320                 if (exception != null) {
321                     pipeline.fireExceptionCaught(exception);
322                 }
323             } finally {
324                 if (shouldStopReading(config)) {
325                     clearReadFilter0();
326                 }
327             }
328         }
329     }
330 }