View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.unix.DomainSocketAddress;
23  import io.netty.channel.unix.DomainSocketChannel;
24  import io.netty.channel.unix.FileDescriptor;
25  import io.netty.channel.unix.PeerCredentials;
26  import io.netty.util.internal.UnstableApi;
27  
28  import java.io.IOException;
29  import java.net.SocketAddress;
30  
31  import static io.netty.channel.kqueue.BsdSocket.newSocketDomain;
32  
33  @UnstableApi
34  public final class KQueueDomainSocketChannel extends AbstractKQueueStreamChannel implements DomainSocketChannel {
35      private final KQueueDomainSocketChannelConfig config = new KQueueDomainSocketChannelConfig(this);
36  
37      private volatile DomainSocketAddress local;
38      private volatile DomainSocketAddress remote;
39  
40      public KQueueDomainSocketChannel() {
41          super(null, newSocketDomain(), false);
42      }
43  
44      public KQueueDomainSocketChannel(int fd) {
45          this(null, new BsdSocket(fd));
46      }
47  
48      KQueueDomainSocketChannel(Channel parent, BsdSocket fd) {
49          super(parent, fd, true);
50      }
51  
52      @Override
53      protected AbstractKQueueUnsafe newUnsafe() {
54          return new KQueueDomainUnsafe();
55      }
56  
57      @Override
58      protected DomainSocketAddress localAddress0() {
59          return local;
60      }
61  
62      @Override
63      protected DomainSocketAddress remoteAddress0() {
64          return remote;
65      }
66  
67      @Override
68      protected void doBind(SocketAddress localAddress) throws Exception {
69          socket.bind(localAddress);
70          local = (DomainSocketAddress) localAddress;
71      }
72  
73      @Override
74      public KQueueDomainSocketChannelConfig config() {
75          return config;
76      }
77  
78      @Override
79      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
80          if (super.doConnect(remoteAddress, localAddress)) {
81              local = (DomainSocketAddress) localAddress;
82              remote = (DomainSocketAddress) remoteAddress;
83              return true;
84          }
85          return false;
86      }
87  
88      @Override
89      public DomainSocketAddress remoteAddress() {
90          return (DomainSocketAddress) super.remoteAddress();
91      }
92  
93      @Override
94      public DomainSocketAddress localAddress() {
95          return (DomainSocketAddress) super.localAddress();
96      }
97  
98      @Override
99      protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
100         Object msg = in.current();
101         if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
102             // File descriptor was written, so remove it.
103             in.remove();
104             return 1;
105         }
106         return super.doWriteSingle(in);
107     }
108 
109     @Override
110     protected Object filterOutboundMessage(Object msg) {
111         if (msg instanceof FileDescriptor) {
112             return msg;
113         }
114         return super.filterOutboundMessage(msg);
115     }
116 
117     /**
118      * Returns the unix credentials (uid, gid, pid) of the peer
119      * <a href=http://man7.org/linux/man-pages/man7/socket.7.html>SO_PEERCRED</a>
120      */
121     @UnstableApi
122     public PeerCredentials peerCredentials() throws IOException {
123         return socket.getPeerCredentials();
124     }
125 
126     private final class KQueueDomainUnsafe extends KQueueStreamUnsafe {
127         @Override
128         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
129             switch (config().getReadMode()) {
130                 case BYTES:
131                     super.readReady(allocHandle);
132                     break;
133                 case FILE_DESCRIPTORS:
134                     readReadyFd();
135                     break;
136                 default:
137                     throw new Error();
138             }
139         }
140 
141         private void readReadyFd() {
142             if (socket.isInputShutdown()) {
143                 super.clearReadFilter0();
144                 return;
145             }
146             final ChannelConfig config = config();
147             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
148 
149             final ChannelPipeline pipeline = pipeline();
150             allocHandle.reset(config);
151             readReadyBefore();
152 
153             try {
154                 readLoop: do {
155                     // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
156                     // KQueueRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
157                     // enabled.
158                     int recvFd = socket.recvFd();
159                     switch(recvFd) {
160                         case 0:
161                             allocHandle.lastBytesRead(0);
162                             break readLoop;
163                         case -1:
164                             allocHandle.lastBytesRead(-1);
165                             close(voidPromise());
166                             return;
167                         default:
168                             allocHandle.lastBytesRead(1);
169                             allocHandle.incMessagesRead(1);
170                             readPending = false;
171                             pipeline.fireChannelRead(new FileDescriptor(recvFd));
172                             break;
173                     }
174                 } while (allocHandle.continueReading());
175 
176                 allocHandle.readComplete();
177                 pipeline.fireChannelReadComplete();
178             } catch (Throwable t) {
179                 allocHandle.readComplete();
180                 pipeline.fireChannelReadComplete();
181                 pipeline.fireExceptionCaught(t);
182             } finally {
183                 readReadyFinally(config);
184             }
185         }
186     }
187 }