View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.unix.DomainSocketAddress;
23  import io.netty.channel.unix.DomainSocketChannel;
24  import io.netty.channel.unix.FileDescriptor;
25  import io.netty.channel.unix.PeerCredentials;
26  import io.netty.channel.unix.Socket;
27  
28  import java.net.SocketAddress;
29  
30  import static io.netty.channel.unix.Socket.newSocketDomain;
31  import java.io.IOException;
32  
33  public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel implements DomainSocketChannel {
34      private final EpollDomainSocketChannelConfig config = new EpollDomainSocketChannelConfig(this);
35  
36      private volatile DomainSocketAddress local;
37      private volatile DomainSocketAddress remote;
38  
39      public EpollDomainSocketChannel() {
40          super(newSocketDomain(), false);
41      }
42  
43      /**
44       * @deprecated Use {@link #EpollDomainSocketChannel(Channel, Socket)}.
45       */
46      @Deprecated
47      public EpollDomainSocketChannel(Channel parent, FileDescriptor fd) {
48          super(parent, new Socket(fd.intValue()));
49      }
50  
51      /**
52       * @deprecated Use {@link #EpollDomainSocketChannel(Socket, boolean)}.
53       * <p>
54       * Creates a new {@link EpollDomainSocketChannel} from an existing {@link FileDescriptor}
55       */
56      @Deprecated
57      public EpollDomainSocketChannel(FileDescriptor fd) {
58          super(fd);
59      }
60  
61      public EpollDomainSocketChannel(Channel parent, Socket fd) {
62          super(parent, fd);
63      }
64  
65      /**
66       * Creates a new {@link EpollDomainSocketChannel} from an existing {@link FileDescriptor}
67       */
68      public EpollDomainSocketChannel(Socket fd, boolean active) {
69          super(fd, active);
70      }
71  
72      @Override
73      protected AbstractEpollUnsafe newUnsafe() {
74          return new EpollDomainUnsafe();
75      }
76  
77      @Override
78      protected DomainSocketAddress localAddress0() {
79          return local;
80      }
81  
82      @Override
83      protected DomainSocketAddress remoteAddress0() {
84          return remote;
85      }
86  
87      @Override
88      protected void doBind(SocketAddress localAddress) throws Exception {
89          fd().bind(localAddress);
90          local = (DomainSocketAddress) localAddress;
91      }
92  
93      @Override
94      public EpollDomainSocketChannelConfig config() {
95          return config;
96      }
97  
98      @Override
99      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
100         if (super.doConnect(remoteAddress, localAddress)) {
101             local = (DomainSocketAddress) localAddress;
102             remote = (DomainSocketAddress) remoteAddress;
103             return true;
104         }
105         return false;
106     }
107 
108     @Override
109     public DomainSocketAddress remoteAddress() {
110         return (DomainSocketAddress) super.remoteAddress();
111     }
112 
113     @Override
114     public DomainSocketAddress localAddress() {
115         return (DomainSocketAddress) super.localAddress();
116     }
117 
118     @Override
119     protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
120         Object msg = in.current();
121         if (msg instanceof FileDescriptor && Native.sendFd(fd().intValue(), ((FileDescriptor) msg).intValue()) > 0) {
122             // File descriptor was written, so remove it.
123             in.remove();
124             return true;
125         }
126         return super.doWriteSingle(in, writeSpinCount);
127     }
128 
129     @Override
130     protected Object filterOutboundMessage(Object msg) {
131         if (msg instanceof FileDescriptor) {
132             return msg;
133         }
134         return super.filterOutboundMessage(msg);
135     }
136 
137     /**
138      * Returns the unix credentials (uid, gid, pid) of the peer
139      * <a href=http://man7.org/linux/man-pages/man7/socket.7.html>SO_PEERCRED</a>
140      */
141     public PeerCredentials peerCredentials() throws IOException {
142         return fd().getPeerCredentials();
143     }
144 
145     private final class EpollDomainUnsafe extends EpollStreamUnsafe {
146         @Override
147         void epollInReady() {
148             switch (config().getReadMode()) {
149                 case BYTES:
150                     super.epollInReady();
151                     break;
152                 case FILE_DESCRIPTORS:
153                     epollInReadFd();
154                     break;
155                 default:
156                     throw new Error();
157             }
158         }
159 
160         private void epollInReadFd() {
161             if (fd().isInputShutdown()) {
162                 return;
163             }
164             boolean edgeTriggered = isFlagSet(Native.EPOLLET);
165             final ChannelConfig config = config();
166             if (!readPending && !edgeTriggered && !config.isAutoRead()) {
167                 // ChannelConfig.setAutoRead(false) was called in the meantime
168                 clearEpollIn0();
169                 return;
170             }
171 
172             final ChannelPipeline pipeline = pipeline();
173 
174             try {
175                 // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
176                 final int maxMessagesPerRead = edgeTriggered
177                         ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
178                 int messages = 0;
179                 do {
180                     int socketFd = Native.recvFd(fd().intValue());
181                     if (socketFd == 0) {
182                         break;
183                     }
184                     if (socketFd == -1) {
185                         close(voidPromise());
186                         return;
187                     }
188                     readPending = false;
189 
190                     try {
191                         pipeline.fireChannelRead(new FileDescriptor(socketFd));
192                     } catch (Throwable t) {
193                         // keep on reading as we use epoll ET and need to consume everything from the socket
194                         pipeline.fireChannelReadComplete();
195                         pipeline.fireExceptionCaught(t);
196                     } finally {
197                         if (!edgeTriggered && !config.isAutoRead()) {
198                             // This is not using EPOLLET so we can stop reading
199                             // ASAP as we will get notified again later with
200                             // pending data
201                             break;
202                         }
203                     }
204                 } while (++ messages < maxMessagesPerRead || isRdHup());
205 
206                 pipeline.fireChannelReadComplete();
207 
208             } catch (Throwable t) {
209                 pipeline.fireChannelReadComplete();
210                 pipeline.fireExceptionCaught(t);
211                 // trigger a read again as there may be something left to read and because of epoll ET we
212                 // will not get notified again until we read everything from the socket
213                 eventLoop().execute(new Runnable() {
214                     @Override
215                     public void run() {
216                         epollInReady();
217                     }
218                 });
219             } finally {
220                 // Check if there is a readPending which was not processed yet.
221                 // This could be for two reasons:
222                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
223                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
224                 //
225                 // See https://github.com/netty/netty/issues/2254
226                 if (!readPending && !config.isAutoRead()) {
227                     clearEpollIn0();
228                 }
229             }
230         }
231     }
232 }