View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelMetadata;
21  import io.netty.channel.ChannelOutboundBuffer;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.EventLoop;
24  import io.netty.channel.ServerChannel;
25  
26  import java.net.InetSocketAddress;
27  import java.net.SocketAddress;
28  
29  public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel implements ServerChannel {
30      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
31  
32      AbstractKQueueServerChannel(BsdSocket fd) {
33          this(fd, isSoErrorZero(fd));
34      }
35  
36      AbstractKQueueServerChannel(BsdSocket fd, boolean active) {
37          super(null, fd, active);
38      }
39  
40      @Override
41      public ChannelMetadata metadata() {
42          return METADATA;
43      }
44  
45      @Override
46      protected boolean isCompatible(EventLoop loop) {
47          return loop instanceof KQueueEventLoop;
48      }
49  
50      @Override
51      protected InetSocketAddress remoteAddress0() {
52          return null;
53      }
54  
55      @Override
56      protected AbstractKQueueUnsafe newUnsafe() {
57          return new KQueueServerSocketUnsafe();
58      }
59  
60      @Override
61      protected void doWrite(ChannelOutboundBuffer in) throws Exception {
62          throw new UnsupportedOperationException();
63      }
64  
65      @Override
66      protected Object filterOutboundMessage(Object msg) throws Exception {
67          throw new UnsupportedOperationException();
68      }
69  
70      abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
71  
72      @Override
73      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
74          throw new UnsupportedOperationException();
75      }
76  
77      final class KQueueServerSocketUnsafe extends AbstractKQueueUnsafe {
78          // Will hold the remote address after accept(...) was successful.
79          // We need 24 bytes for the address as maximum + 1 byte for storing the capacity.
80          // So use 26 bytes as it's a power of two.
81          private final byte[] acceptedAddress = new byte[26];
82  
83          @Override
84          void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
85              assert eventLoop().inEventLoop();
86              final ChannelConfig config = config();
87              if (shouldBreakReadReady(config)) {
88                  clearReadFilter0();
89                  return;
90              }
91              final ChannelPipeline pipeline = pipeline();
92              allocHandle.reset(config);
93              allocHandle.attemptedBytesRead(1);
94              readReadyBefore();
95  
96              Throwable exception = null;
97              try {
98                  try {
99                      do {
100                         int acceptFd = socket.accept(acceptedAddress);
101                         if (acceptFd == -1) {
102                             // this means everything was handled for now
103                             allocHandle.lastBytesRead(-1);
104                             break;
105                         }
106                         allocHandle.lastBytesRead(1);
107                         allocHandle.incMessagesRead(1);
108 
109                         readPending = false;
110                         pipeline.fireChannelRead(newChildChannel(acceptFd, acceptedAddress, 1,
111                                                                  acceptedAddress[0]));
112                     } while (allocHandle.continueReading());
113                 } catch (Throwable t) {
114                     exception = t;
115                 }
116                 allocHandle.readComplete();
117                 pipeline.fireChannelReadComplete();
118 
119                 if (exception != null) {
120                     pipeline.fireExceptionCaught(exception);
121                 }
122             } finally {
123                 readReadyFinally(config);
124             }
125         }
126     }
127 }