View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.unix.DomainSocketAddress;
23  import io.netty.channel.unix.DomainSocketChannel;
24  import io.netty.channel.unix.FileDescriptor;
25  
26  import java.net.SocketAddress;
27  
28  public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel implements DomainSocketChannel {
29      private final EpollDomainSocketChannelConfig config = new EpollDomainSocketChannelConfig(this);
30  
31      private volatile DomainSocketAddress local;
32      private volatile DomainSocketAddress remote;
33  
34      public EpollDomainSocketChannel() {
35          super(Native.socketDomainFd());
36      }
37  
38      public EpollDomainSocketChannel(Channel parent, FileDescriptor fd) {
39          super(parent, fd.intValue());
40      }
41  
42      /**
43       * Creates a new {@link EpollDomainSocketChannel} from an existing {@link FileDescriptor}
44       */
45      public EpollDomainSocketChannel(FileDescriptor fd) {
46          super(fd);
47      }
48  
49      EpollDomainSocketChannel(Channel parent, int fd) {
50          super(parent, fd);
51      }
52  
53      @Override
54      protected AbstractEpollUnsafe newUnsafe() {
55          return new EpollDomainUnsafe();
56      }
57  
58      @Override
59      protected DomainSocketAddress localAddress0() {
60          return local;
61      }
62  
63      @Override
64      protected DomainSocketAddress remoteAddress0() {
65          return remote;
66      }
67  
68      @Override
69      protected void doBind(SocketAddress localAddress) throws Exception {
70          Native.bind(fd().intValue(), localAddress);
71          local = (DomainSocketAddress) localAddress;
72      }
73  
74      @Override
75      public EpollDomainSocketChannelConfig config() {
76          return config;
77      }
78  
79      @Override
80      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
81          if (super.doConnect(remoteAddress, localAddress)) {
82              local = (DomainSocketAddress) localAddress;
83              remote = (DomainSocketAddress) remoteAddress;
84              return true;
85          }
86          return false;
87      }
88  
89      @Override
90      public DomainSocketAddress remoteAddress() {
91          return (DomainSocketAddress) super.remoteAddress();
92      }
93  
94      @Override
95      public DomainSocketAddress localAddress() {
96          return (DomainSocketAddress) super.localAddress();
97      }
98  
99      @Override
100     protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
101         Object msg = in.current();
102         if (msg instanceof FileDescriptor && Native.sendFd(fd().intValue(), ((FileDescriptor) msg).intValue()) > 0) {
103             // File descriptor was written, so remove it.
104             in.remove();
105             return true;
106         }
107         return super.doWriteSingle(in, writeSpinCount);
108     }
109 
110     @Override
111     protected Object filterOutboundMessage(Object msg) {
112         if (msg instanceof FileDescriptor) {
113             return msg;
114         }
115         return super.filterOutboundMessage(msg);
116     }
117 
118     private final class EpollDomainUnsafe extends EpollStreamUnsafe {
119         @Override
120         void epollInReady() {
121             switch (config().getReadMode()) {
122                 case BYTES:
123                     super.epollInReady();
124                     break;
125                 case FILE_DESCRIPTORS:
126                     epollInReadFd();
127                     break;
128                 default:
129                     throw new Error();
130             }
131         }
132 
133         private void epollInReadFd() {
134             boolean edgeTriggered = isFlagSet(Native.EPOLLET);
135             final ChannelConfig config = config();
136             if (!readPending && !edgeTriggered && !config.isAutoRead()) {
137                 // ChannelConfig.setAutoRead(false) was called in the meantime
138                 clearEpollIn0();
139                 return;
140             }
141 
142             final ChannelPipeline pipeline = pipeline();
143 
144             try {
145                 // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
146                 final int maxMessagesPerRead = edgeTriggered
147                         ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
148                 int messages = 0;
149                 do {
150                     int socketFd = Native.recvFd(fd().intValue());
151                     if (socketFd == 0) {
152                         break;
153                     }
154                     if (socketFd == -1) {
155                         close(voidPromise());
156                         return;
157                     }
158                     readPending = false;
159 
160                     try {
161                         pipeline.fireChannelRead(new FileDescriptor(socketFd));
162                     } catch (Throwable t) {
163                         // keep on reading as we use epoll ET and need to consume everything from the socket
164                         pipeline.fireChannelReadComplete();
165                         pipeline.fireExceptionCaught(t);
166                     } finally {
167                         if (!edgeTriggered && !config.isAutoRead()) {
168                             // This is not using EPOLLET so we can stop reading
169                             // ASAP as we will get notified again later with
170                             // pending data
171                             break;
172                         }
173                     }
174                 } while (++ messages < maxMessagesPerRead);
175 
176                 pipeline.fireChannelReadComplete();
177 
178             } catch (Throwable t) {
179                 pipeline.fireChannelReadComplete();
180                 pipeline.fireExceptionCaught(t);
181                 // trigger a read again as there may be something left to read and because of epoll ET we
182                 // will not get notified again until we read everything from the socket
183                 eventLoop().execute(new Runnable() {
184                     @Override
185                     public void run() {
186                         epollInReady();
187                     }
188                 });
189             } finally {
190                 // Check if there is a readPending which was not processed yet.
191                 // This could be for two reasons:
192                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
193                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
194                 //
195                 // See https://github.com/netty/netty/issues/2254
196                 if (!readPending && !config.isAutoRead()) {
197                     clearEpollIn0();
198                 }
199             }
200         }
201     }
202 }