View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.unix.DomainSocketAddress;
23  import io.netty.channel.unix.DomainSocketChannel;
24  import io.netty.channel.unix.FileDescriptor;
25  import io.netty.channel.unix.PeerCredentials;
26  import io.netty.util.internal.UnstableApi;
27  
28  import java.io.IOException;
29  import java.net.SocketAddress;
30  
31  import static io.netty.channel.kqueue.BsdSocket.newSocketDomain;
32  
33  public final class KQueueDomainSocketChannel extends AbstractKQueueStreamChannel implements DomainSocketChannel {
34      private final KQueueDomainSocketChannelConfig config = new KQueueDomainSocketChannelConfig(this);
35  
36      private volatile DomainSocketAddress local;
37      private volatile DomainSocketAddress remote;
38  
39      public KQueueDomainSocketChannel() {
40          super(null, newSocketDomain(), false);
41      }
42  
43      public KQueueDomainSocketChannel(int fd) {
44          this(null, new BsdSocket(fd));
45      }
46  
47      KQueueDomainSocketChannel(Channel parent, BsdSocket fd) {
48          super(parent, fd, true);
49          local = fd.localDomainSocketAddress();
50          remote = fd.remoteDomainSocketAddress();
51      }
52  
53      @Override
54      protected AbstractKQueueUnsafe newUnsafe() {
55          return new KQueueDomainUnsafe();
56      }
57  
58      @Override
59      protected DomainSocketAddress localAddress0() {
60          return local;
61      }
62  
63      @Override
64      protected DomainSocketAddress remoteAddress0() {
65          return remote;
66      }
67  
68      @Override
69      protected void doBind(SocketAddress localAddress) throws Exception {
70          socket.bind(localAddress);
71          local = (DomainSocketAddress) localAddress;
72      }
73  
74      @Override
75      public KQueueDomainSocketChannelConfig config() {
76          return config;
77      }
78  
79      @Override
80      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
81          if (super.doConnect(remoteAddress, localAddress)) {
82              local = localAddress != null ? (DomainSocketAddress) localAddress : socket.localDomainSocketAddress();
83              remote = (DomainSocketAddress) remoteAddress;
84              return true;
85          }
86          return false;
87      }
88  
89      @Override
90      public DomainSocketAddress remoteAddress() {
91          return (DomainSocketAddress) super.remoteAddress();
92      }
93  
94      @Override
95      public DomainSocketAddress localAddress() {
96          return (DomainSocketAddress) super.localAddress();
97      }
98  
99      @Override
100     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
101         Object msg = in.current();
102         if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
103             // File descriptor was written, so remove it.
104             in.remove();
105             return 1;
106         }
107         return super.doWriteSingle(in);
108     }
109 
110     @Override
111     protected Object filterOutboundMessage(Object msg) {
112         if (msg instanceof FileDescriptor) {
113             return msg;
114         }
115         return super.filterOutboundMessage(msg);
116     }
117 
118     /**
119      * Returns the unix credentials (uid, gid, pid) of the peer
120      * <a href=https://man7.org/linux/man-pages/man7/socket.7.html>SO_PEERCRED</a>
121      */
122     @UnstableApi
123     public PeerCredentials peerCredentials() throws IOException {
124         return socket.getPeerCredentials();
125     }
126 
127     private final class KQueueDomainUnsafe extends KQueueStreamUnsafe {
128         @Override
129         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
130             switch (config().getReadMode()) {
131                 case BYTES:
132                     super.readReady(allocHandle);
133                     break;
134                 case FILE_DESCRIPTORS:
135                     readReadyFd();
136                     break;
137                 default:
138                     throw new Error();
139             }
140         }
141 
142         private void readReadyFd() {
143             if (socket.isInputShutdown()) {
144                 super.clearReadFilter0();
145                 return;
146             }
147             final ChannelConfig config = config();
148             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
149 
150             final ChannelPipeline pipeline = pipeline();
151             allocHandle.reset(config);
152             readReadyBefore();
153 
154             try {
155                 readLoop: do {
156                     // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
157                     // KQueueRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
158                     // enabled.
159                     int recvFd = socket.recvFd();
160                     switch(recvFd) {
161                         case 0:
162                             allocHandle.lastBytesRead(0);
163                             break readLoop;
164                         case -1:
165                             allocHandle.lastBytesRead(-1);
166                             close(voidPromise());
167                             return;
168                         default:
169                             allocHandle.lastBytesRead(1);
170                             allocHandle.incMessagesRead(1);
171                             readPending = false;
172                             pipeline.fireChannelRead(new FileDescriptor(recvFd));
173                             break;
174                     }
175                 } while (allocHandle.continueReading());
176 
177                 allocHandle.readComplete();
178                 pipeline.fireChannelReadComplete();
179             } catch (Throwable t) {
180                 allocHandle.readComplete();
181                 pipeline.fireChannelReadComplete();
182                 pipeline.fireExceptionCaught(t);
183             } finally {
184                 readReadyFinally(config);
185             }
186         }
187     }
188 }