View Javadoc

1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.channel.EventLoop;
24  import io.netty.channel.ServerChannel;
25  import io.netty.channel.unix.FileDescriptor;
26  import io.netty.channel.unix.Socket;
27  
28  import java.net.InetSocketAddress;
29  import java.net.SocketAddress;
30  
31  
32  public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel {
33  
34      /**
35       * @deprecated Use {@link #AbstractEpollServerChannel(Socket, boolean)}.
36       */
37      @Deprecated
38      protected AbstractEpollServerChannel(int fd) {
39          this(new Socket(fd), false);
40      }
41  
42      /**
43       * @deprecated Use {@link #AbstractEpollServerChannel(Socket, boolean)}.
44       */
45      @Deprecated
46      protected AbstractEpollServerChannel(FileDescriptor fd) {
47          this(new Socket(fd.intValue()));
48      }
49  
50      /**
51       * @deprecated Use {@link #AbstractEpollServerChannel(Socket, boolean)}.
52       */
53      @Deprecated
54      protected AbstractEpollServerChannel(Socket fd) {
55          this(fd, isSoErrorZero(fd));
56      }
57  
58      protected AbstractEpollServerChannel(Socket fd, boolean active) {
59          super(null, fd, Native.EPOLLIN, active);
60      }
61  
62      @Override
63      protected boolean isCompatible(EventLoop loop) {
64          return loop instanceof EpollEventLoop;
65      }
66  
67      @Override
68      protected InetSocketAddress remoteAddress0() {
69          return null;
70      }
71  
72      @Override
73      protected AbstractEpollUnsafe newUnsafe() {
74          return new EpollServerSocketUnsafe();
75      }
76  
77      @Override
78      protected void doWrite(ChannelOutboundBuffer in) throws Exception {
79          throw new UnsupportedOperationException();
80      }
81  
82      @Override
83      protected Object filterOutboundMessage(Object msg) throws Exception {
84          throw new UnsupportedOperationException();
85      }
86  
87      abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
88  
89      final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
90          // Will hold the remote address after accept(...) was sucesssful.
91          // We need 24 bytes for the address as maximum + 1 byte for storing the length.
92          // So use 26 bytes as it's a power of two.
93          private final byte[] acceptedAddress = new byte[26];
94  
95          @Override
96          public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
97              // Connect not supported by ServerChannel implementations
98              channelPromise.setFailure(new UnsupportedOperationException());
99          }
100 
101         @Override
102         void epollInReady() {
103             assert eventLoop().inEventLoop();
104             if (fd().isInputShutdown()) {
105                 return;
106             }
107             boolean edgeTriggered = isFlagSet(Native.EPOLLET);
108 
109             final ChannelConfig config = config();
110             if (!readPending && !edgeTriggered && !config.isAutoRead()) {
111                 // ChannelConfig.setAutoRead(false) was called in the meantime
112                 clearEpollIn0();
113                 return;
114             }
115 
116             final ChannelPipeline pipeline = pipeline();
117             Throwable exception = null;
118             try {
119                 try {
120                     // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
121                     final int maxMessagesPerRead = edgeTriggered
122                             ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
123                     int messages = 0;
124                     do {
125                         int socketFd = fd().accept(acceptedAddress);
126                         if (socketFd == -1) {
127                             // this means everything was handled for now
128                             break;
129                         }
130                         readPending = false;
131 
132                         try {
133                             int len = acceptedAddress[0];
134                             pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len));
135                         } catch (Throwable t) {
136                             // keep on reading as we use epoll ET and need to consume everything from the socket
137                             pipeline.fireChannelReadComplete();
138                             pipeline.fireExceptionCaught(t);
139                         } finally {
140                             if (!edgeTriggered && !config.isAutoRead()) {
141                                 // This is not using EPOLLET so we can stop reading
142                                 // ASAP as we will get notified again later with
143                                 // pending data
144                                 break;
145                             }
146                         }
147                     } while (++ messages < maxMessagesPerRead || isRdHup());
148                 } catch (Throwable t) {
149                     exception = t;
150                 }
151                 pipeline.fireChannelReadComplete();
152 
153                 if (exception != null) {
154                     pipeline.fireExceptionCaught(exception);
155                 }
156             } finally {
157                 // Check if there is a readPending which was not processed yet.
158                 // This could be for two reasons:
159                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
160                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
161                 //
162                 // See https://github.com/netty/netty/issues/2254
163                 if (!readPending && !config.isAutoRead()) {
164                     clearEpollIn0();
165                 }
166             }
167         }
168     }
169 
170     @Override
171     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
172         throw new UnsupportedOperationException();
173     }
174 }