View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.EventLoop;
26  import io.netty.channel.unix.FileDescriptor;
27  import io.netty.channel.unix.UnixChannel;
28  import io.netty.util.ReferenceCountUtil;
29  import io.netty.util.internal.OneTimeTask;
30  
31  import java.net.InetSocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.channels.UnresolvedAddressException;
34  
35  abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
36      private static final ChannelMetadata DATA = new ChannelMetadata(false);
37      private final int readFlag;
38      private final FileDescriptor fileDescriptor;
39      protected int flags = Native.EPOLLET;
40  
41      protected volatile boolean active;
42  
43      AbstractEpollChannel(int fd, int flag) {
44          this(null, fd, flag, false);
45      }
46  
47      AbstractEpollChannel(Channel parent, int fd, int flag, boolean active) {
48          this(parent, new FileDescriptor(fd), flag, active);
49      }
50  
51      AbstractEpollChannel(Channel parent, FileDescriptor fd, int flag, boolean active) {
52          super(parent);
53          if (fd == null) {
54              throw new NullPointerException("fd");
55          }
56          readFlag = flag;
57          flags |= flag;
58          this.active = active;
59          fileDescriptor = fd;
60      }
61  
62      void setFlag(int flag) {
63          if (!isFlagSet(flag)) {
64              flags |= flag;
65              modifyEvents();
66          }
67      }
68  
69      void clearFlag(int flag) {
70          if (isFlagSet(flag)) {
71              flags &= ~flag;
72              modifyEvents();
73          }
74      }
75  
76      boolean isFlagSet(int flag) {
77          return (flags & flag) != 0;
78      }
79  
80      @Override
81      public final FileDescriptor fd() {
82          return fileDescriptor;
83      }
84  
85      @Override
86      public abstract EpollChannelConfig config();
87  
88      @Override
89      public boolean isActive() {
90          return active;
91      }
92  
93      @Override
94      public ChannelMetadata metadata() {
95          return DATA;
96      }
97  
98      @Override
99      protected void doClose() throws Exception {
100         active = false;
101 
102         // deregister from epoll now
103         doDeregister();
104 
105         FileDescriptor fd = fileDescriptor;
106         fd.close();
107     }
108 
109     @Override
110     protected void doDisconnect() throws Exception {
111         doClose();
112     }
113 
114     @Override
115     protected boolean isCompatible(EventLoop loop) {
116         return loop instanceof EpollEventLoop;
117     }
118 
119     @Override
120     public boolean isOpen() {
121         return fileDescriptor.isOpen();
122     }
123 
124     @Override
125     protected void doDeregister() throws Exception {
126         ((EpollEventLoop) eventLoop().unwrap()).remove(this);
127     }
128 
129     @Override
130     protected void doBeginRead() throws Exception {
131         // Channel.read() or ChannelHandlerContext.read() was called
132         ((AbstractEpollUnsafe) unsafe()).readPending = true;
133 
134         setFlag(readFlag);
135     }
136 
137     final void clearEpollIn() {
138         // Only clear if registered with an EventLoop as otherwise
139         if (isRegistered()) {
140             final EventLoop loop = eventLoop();
141             final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
142             if (loop.inEventLoop()) {
143                 unsafe.clearEpollIn0();
144             } else {
145                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
146                 loop.execute(new OneTimeTask() {
147                     @Override
148                     public void run() {
149                         if (!config().isAutoRead() && !unsafe.readPending) {
150                             // Still no read triggered so clear it now
151                             unsafe.clearEpollIn0();
152                         }
153                     }
154                 });
155             }
156         } else  {
157             // The EventLoop is not registered atm so just update the flags so the correct value
158             // will be used once the channel is registered
159             flags &= ~readFlag;
160         }
161     }
162 
163     private void modifyEvents() {
164         if (isOpen() && isRegistered()) {
165             ((EpollEventLoop) eventLoop().unwrap()).modify(this);
166         }
167     }
168 
169     @Override
170     protected void doRegister() throws Exception {
171         ((EpollEventLoop) eventLoop().unwrap()).add(this);
172     }
173 
174     @Override
175     protected abstract AbstractEpollUnsafe newUnsafe();
176 
177     /**
178      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
179      */
180     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
181         return newDirectBuffer(buf, buf);
182     }
183 
184     /**
185      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
186      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
187      * this method.
188      */
189     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
190         final int readableBytes = buf.readableBytes();
191         if (readableBytes == 0) {
192             ReferenceCountUtil.safeRelease(holder);
193             return Unpooled.EMPTY_BUFFER;
194         }
195 
196         final ByteBufAllocator alloc = alloc();
197         if (alloc.isDirectBufferPooled()) {
198             return newDirectBuffer0(holder, buf, alloc, readableBytes);
199         }
200 
201         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
202         if (directBuf == null) {
203             return newDirectBuffer0(holder, buf, alloc, readableBytes);
204         }
205 
206         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
207         ReferenceCountUtil.safeRelease(holder);
208         return directBuf;
209     }
210 
211     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
212         final ByteBuf directBuf = alloc.directBuffer(capacity);
213         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
214         ReferenceCountUtil.safeRelease(holder);
215         return directBuf;
216     }
217 
218     protected static void checkResolvable(InetSocketAddress addr) {
219         if (addr.isUnresolved()) {
220             throw new UnresolvedAddressException();
221         }
222     }
223 
224     /**
225      * Read bytes into the given {@link ByteBuf} and return the amount.
226      */
227     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
228         int writerIndex = byteBuf.writerIndex();
229         int localReadAmount;
230         if (byteBuf.hasMemoryAddress()) {
231             localReadAmount = Native.readAddress(
232                     fileDescriptor.intValue(), byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
233         } else {
234             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
235             localReadAmount = Native.read(fileDescriptor.intValue(), buf, buf.position(), buf.limit());
236         }
237         if (localReadAmount > 0) {
238             byteBuf.writerIndex(writerIndex + localReadAmount);
239         }
240         return localReadAmount;
241     }
242 
243     protected final int doWriteBytes(ByteBuf buf, int writeSpinCount) throws Exception {
244         int readableBytes = buf.readableBytes();
245         int writtenBytes = 0;
246         if (buf.hasMemoryAddress()) {
247             long memoryAddress = buf.memoryAddress();
248             int readerIndex = buf.readerIndex();
249             int writerIndex = buf.writerIndex();
250             for (int i = writeSpinCount - 1; i >= 0; i--) {
251                 int localFlushedAmount = Native.writeAddress(
252                         fileDescriptor.intValue(), memoryAddress, readerIndex, writerIndex);
253                 if (localFlushedAmount > 0) {
254                     writtenBytes += localFlushedAmount;
255                     if (writtenBytes == readableBytes) {
256                         return writtenBytes;
257                     }
258                     readerIndex += localFlushedAmount;
259                 } else {
260                     break;
261                 }
262             }
263         } else {
264             ByteBuffer nioBuf;
265             if (buf.nioBufferCount() == 1) {
266                 nioBuf = buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes());
267             } else {
268                 nioBuf = buf.nioBuffer();
269             }
270             for (int i = writeSpinCount - 1; i >= 0; i--) {
271                 int pos = nioBuf.position();
272                 int limit = nioBuf.limit();
273                 int localFlushedAmount = Native.write(fileDescriptor.intValue(), nioBuf, pos, limit);
274                 if (localFlushedAmount > 0) {
275                     nioBuf.position(pos + localFlushedAmount);
276                     writtenBytes += localFlushedAmount;
277                     if (writtenBytes == readableBytes) {
278                         return writtenBytes;
279                     }
280                 } else {
281                     break;
282                 }
283             }
284         }
285         if (writtenBytes < readableBytes) {
286             // Returned EAGAIN need to set EPOLLOUT
287             setFlag(Native.EPOLLOUT);
288         }
289         return writtenBytes;
290     }
291 
292     protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
293         protected boolean readPending;
294 
295         /**
296          * Called once EPOLLIN event is ready to be processed
297          */
298         abstract void epollInReady();
299 
300         /**
301          * Called once EPOLLRDHUP event is ready to be processed
302          */
303         void epollRdHupReady() {
304             // NOOP
305         }
306 
307         @Override
308         protected void flush0() {
309             // Flush immediately only when there's no pending flush.
310             // If there's a pending flush operation, event loop will call forceFlush() later,
311             // and thus there's no need to call it now.
312             if (isFlagSet(Native.EPOLLOUT)) {
313                 return;
314             }
315             super.flush0();
316         }
317 
318         /**
319          * Called once a EPOLLOUT event is ready to be processed
320          */
321         void epollOutReady() {
322             // directly call super.flush0() to force a flush now
323             super.flush0();
324         }
325 
326         protected final void clearEpollIn0() {
327             clearFlag(readFlag);
328         }
329     }
330 }