View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelMetadata;
21  import io.netty.channel.ChannelOutboundBuffer;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ServerChannel;
24  
25  import java.net.InetSocketAddress;
26  import java.net.SocketAddress;
27  
28  public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel implements ServerChannel {
29      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
30  
31      AbstractKQueueServerChannel(BsdSocket fd) {
32          this(fd, isSoErrorZero(fd));
33      }
34  
35      AbstractKQueueServerChannel(BsdSocket fd, boolean active) {
36          super(null, fd, active);
37      }
38  
39      @Override
40      public ChannelMetadata metadata() {
41          return METADATA;
42      }
43  
44      @Override
45      protected InetSocketAddress remoteAddress0() {
46          return null;
47      }
48  
49      @Override
50      protected AbstractKQueueUnsafe newUnsafe() {
51          return new KQueueServerSocketUnsafe();
52      }
53  
54      @Override
55      protected void doWrite(ChannelOutboundBuffer in) throws Exception {
56          throw new UnsupportedOperationException();
57      }
58  
59      @Override
60      protected Object filterOutboundMessage(Object msg) throws Exception {
61          throw new UnsupportedOperationException();
62      }
63  
64      abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
65  
66      @Override
67      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
68          throw new UnsupportedOperationException();
69      }
70  
71      final class KQueueServerSocketUnsafe extends AbstractKQueueUnsafe {
72          // Will hold the remote address after accept(...) was successful.
73          // We need 24 bytes for the address as maximum + 1 byte for storing the capacity.
74          // So use 26 bytes as it's a power of two.
75          private final byte[] acceptedAddress = new byte[26];
76  
77          @Override
78          void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
79              assert eventLoop().inEventLoop();
80              final ChannelConfig config = config();
81              if (shouldBreakReadReady(config)) {
82                  clearReadFilter0();
83                  return;
84              }
85              final ChannelPipeline pipeline = pipeline();
86              allocHandle.reset(config);
87              allocHandle.attemptedBytesRead(1);
88  
89              Throwable exception = null;
90              try {
91                  try {
92                      do {
93                          int acceptFd = socket.accept(acceptedAddress);
94                          if (acceptFd == -1) {
95                              // this means everything was handled for now
96                              allocHandle.lastBytesRead(-1);
97                              break;
98                          }
99                          allocHandle.lastBytesRead(1);
100                         allocHandle.incMessagesRead(1);
101 
102                         readPending = false;
103                         pipeline.fireChannelRead(newChildChannel(acceptFd, acceptedAddress, 1,
104                                                                  acceptedAddress[0]));
105                     } while (allocHandle.continueReading());
106                 } catch (Throwable t) {
107                     exception = t;
108                 }
109                 allocHandle.readComplete();
110                 pipeline.fireChannelReadComplete();
111 
112                 if (exception != null) {
113                     pipeline.fireExceptionCaught(exception);
114                 }
115             } finally {
116                 if (shouldStopReading(config)) {
117                     clearReadFilter0();
118                 }
119             }
120         }
121     }
122 }