View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.channel.EventLoop;
24  import io.netty.channel.ServerChannel;
25  import io.netty.channel.unix.FileDescriptor;
26  
27  import java.net.InetSocketAddress;
28  import java.net.SocketAddress;
29  
30  
31  public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel {
32  
33      protected AbstractEpollServerChannel(int fd) {
34          super(fd, Native.EPOLLIN);
35      }
36  
37      protected AbstractEpollServerChannel(FileDescriptor fd) {
38          super(null, fd, Native.EPOLLIN, Native.getSoError(fd.intValue()) == 0);
39      }
40  
41      @Override
42      protected boolean isCompatible(EventLoop loop) {
43          return loop instanceof EpollEventLoop;
44      }
45  
46      @Override
47      protected InetSocketAddress remoteAddress0() {
48          return null;
49      }
50  
51      @Override
52      protected AbstractEpollUnsafe newUnsafe() {
53          return new EpollServerSocketUnsafe();
54      }
55  
56      @Override
57      protected void doWrite(ChannelOutboundBuffer in) throws Exception {
58          throw new UnsupportedOperationException();
59      }
60  
61      @Override
62      protected Object filterOutboundMessage(Object msg) throws Exception {
63          throw new UnsupportedOperationException();
64      }
65  
66      abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
67  
68      final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
69          // Will hold the remote address after accept(...) was sucesssful.
70          // We need 24 bytes for the address as maximum + 1 byte for storing the length.
71          // So use 26 bytes as it's a power of two.
72          private final byte[] acceptedAddress = new byte[26];
73  
74          @Override
75          public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
76              // Connect not supported by ServerChannel implementations
77              channelPromise.setFailure(new UnsupportedOperationException());
78          }
79  
80          @Override
81          void epollInReady() {
82              assert eventLoop().inEventLoop();
83              boolean edgeTriggered = isFlagSet(Native.EPOLLET);
84  
85              final ChannelConfig config = config();
86              if (!readPending && !edgeTriggered && !config.isAutoRead()) {
87                  // ChannelConfig.setAutoRead(false) was called in the meantime
88                  clearEpollIn0();
89                  return;
90              }
91  
92              final ChannelPipeline pipeline = pipeline();
93              Throwable exception = null;
94              try {
95                  try {
96                      // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
97                      final int maxMessagesPerRead = edgeTriggered
98                              ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
99                      int messages = 0;
100                     do {
101                         int socketFd = Native.accept(fd().intValue(), acceptedAddress);
102                         if (socketFd == -1) {
103                             // this means everything was handled for now
104                             break;
105                         }
106                         readPending = false;
107 
108                         try {
109                             int len = acceptedAddress[0];
110                             pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len));
111                         } catch (Throwable t) {
112                             // keep on reading as we use epoll ET and need to consume everything from the socket
113                             pipeline.fireChannelReadComplete();
114                             pipeline.fireExceptionCaught(t);
115                         } finally {
116                             if (!edgeTriggered && !config.isAutoRead()) {
117                                 // This is not using EPOLLET so we can stop reading
118                                 // ASAP as we will get notified again later with
119                                 // pending data
120                                 break;
121                             }
122                         }
123                     } while (++ messages < maxMessagesPerRead);
124                 } catch (Throwable t) {
125                     exception = t;
126                 }
127                 pipeline.fireChannelReadComplete();
128 
129                 if (exception != null) {
130                     pipeline.fireExceptionCaught(exception);
131                 }
132             } finally {
133                 // Check if there is a readPending which was not processed yet.
134                 // This could be for two reasons:
135                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
136                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
137                 //
138                 // See https://github.com/netty/netty/issues/2254
139                 if (!readPending && !config.isAutoRead()) {
140                     clearEpollIn0();
141                 }
142             }
143         }
144     }
145 }