1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.epoll;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.buffer.api.DefaultBufferAllocators;
21 import io.netty5.channel.ChannelOption;
22 import io.netty5.channel.ChannelShutdownDirection;
23 import io.netty5.channel.RecvBufferAllocator;
24 import io.netty5.channel.socket.DomainSocketAddress;
25 import io.netty5.channel.socket.SocketProtocolFamily;
26 import io.netty5.channel.unix.IntegerUnixChannelOption;
27 import io.netty5.channel.unix.RawUnixChannelOption;
28 import io.netty5.util.Resource;
29 import io.netty5.channel.AbstractChannel;
30 import io.netty5.channel.ChannelException;
31 import io.netty5.channel.ChannelMetadata;
32 import io.netty5.channel.ChannelOutboundBuffer;
33 import io.netty5.channel.EventLoop;
34 import io.netty5.channel.unix.FileDescriptor;
35 import io.netty5.channel.unix.IovArray;
36 import io.netty5.channel.unix.Socket;
37 import io.netty5.channel.unix.UnixChannel;
38
39 import java.io.IOException;
40 import java.net.InetSocketAddress;
41 import java.net.SocketAddress;
42 import java.nio.ByteBuffer;
43 import java.nio.channels.UnresolvedAddressException;
44
45 import static io.netty5.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
46 import static io.netty5.channel.unix.UnixChannelUtil.computeRemoteAddr;
47 import static io.netty5.util.CharsetUtil.UTF_8;
48 import static java.util.Objects.requireNonNull;
49
50 abstract class AbstractEpollChannel<P extends UnixChannel>
51 extends AbstractChannel<P, SocketAddress, SocketAddress> implements UnixChannel {
52 protected final LinuxSocket socket;
53
54 private final Runnable epollInReadyRunnable = new Runnable() {
55 @Override
56 public void run() {
57 epollInReadyRunnablePending = false;
58 epollInReady();
59 }
60 };
61
62 protected volatile boolean active;
63
64 boolean readPending;
65
66 private EpollRegistration registration;
67
68 private int flags = Native.EPOLLET;
69 private boolean inputClosedSeenErrorOnRead;
70 private boolean epollInReadyRunnablePending;
71 private boolean maybeMoreDataToRead;
72
73 private boolean receivedRdHup;
74 private volatile SocketAddress localAddress;
75 private volatile SocketAddress remoteAddress;
76
77 AbstractEpollChannel(EventLoop eventLoop, ChannelMetadata metadata, int initialFlag,
78 RecvBufferAllocator defaultRecvAllocator, LinuxSocket fd) {
79 this(null, eventLoop, metadata, initialFlag, defaultRecvAllocator, fd, false);
80 }
81
82 @SuppressWarnings("unchecked")
83 AbstractEpollChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata, int initialFlag,
84 RecvBufferAllocator defaultRecvAllocator, LinuxSocket fd, boolean active) {
85 super(parent, eventLoop, metadata, defaultRecvAllocator);
86 flags |= initialFlag;
87 socket = requireNonNull(fd, "fd");
88 this.active = active;
89 if (active) {
90
91
92 localAddress = fd.localAddress();
93 remoteAddress = fd.remoteAddress();
94 }
95 }
96
97 @SuppressWarnings("unchecked")
98 AbstractEpollChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata, int initialFlag,
99 RecvBufferAllocator defaultRecvAllocator, LinuxSocket fd, SocketAddress remote) {
100 super(parent, eventLoop, metadata, defaultRecvAllocator);
101 flags |= initialFlag;
102 socket = requireNonNull(fd, "fd");
103 active = true;
104
105
106 remoteAddress = remote;
107 localAddress = fd.localAddress();
108 }
109
110 protected final boolean fetchLocalAddress() {
111 return socket.protocolFamily() != SocketProtocolFamily.UNIX;
112 }
113
114 protected static boolean isSoErrorZero(Socket fd) {
115 try {
116 return fd.getSoError() == 0;
117 } catch (IOException e) {
118 throw new ChannelException(e);
119 }
120 }
121
122 protected final void setFlag(int flag) throws IOException {
123 if (!isFlagSet(flag)) {
124 flags |= flag;
125 modifyEvents();
126 }
127 }
128
129 protected final void clearFlag(int flag) throws IOException {
130 if (isFlagSet(flag)) {
131 flags &= ~flag;
132 modifyEvents();
133 }
134 }
135
136 protected final EpollRegistration registration() {
137 assert registration != null;
138 return registration;
139 }
140
141 private boolean isFlagSet(int flag) {
142 return (flags & flag) != 0;
143 }
144
145 final int flags() {
146 return flags;
147 }
148
149 @Override
150 public final FileDescriptor fd() {
151 return socket;
152 }
153
154 @Override
155 public boolean isActive() {
156 return active;
157 }
158
159 @Override
160 protected void doClose() throws Exception {
161 active = false;
162
163
164 inputClosedSeenErrorOnRead = true;
165 socket.close();
166 }
167
168 final void resetCachedAddresses() {
169 cacheAddresses(localAddress, null);
170 remoteAddress = null;
171 }
172
173 @Override
174 protected void doDisconnect() throws Exception {
175 doClose();
176 }
177
178 @Override
179 public final boolean isOpen() {
180 return socket.isOpen();
181 }
182
183 final void register0(EpollRegistration registration) {
184
185
186
187 epollInReadyRunnablePending = false;
188 this.registration = registration;
189 }
190
191 final void deregister0() throws Exception {
192 if (registration != null) {
193 registration.remove();
194 }
195 }
196
197 @Override
198 protected final void doBeginRead() throws Exception {
199
200 readPending = true;
201
202
203
204
205 setFlag(Native.EPOLLIN);
206
207
208
209 if (maybeMoreDataToRead) {
210 executeEpollInReadyRunnable();
211 }
212 }
213
214 final boolean shouldBreakEpollInReady() {
215 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || isAllowHalfClosure());
216 }
217
218 private void clearEpollIn() {
219
220 if (isRegistered()) {
221 final EventLoop loop = executor();
222 if (loop.inEventLoop()) {
223 clearEpollIn0();
224 } else {
225
226 loop.execute(() -> {
227 if (!readPending && !isAutoRead()) {
228
229 clearEpollIn0();
230 }
231 });
232 }
233 } else {
234
235
236 flags &= ~Native.EPOLLIN;
237 }
238 }
239
240 private void modifyEvents() throws IOException {
241 if (isOpen() && isRegistered() && registration != null) {
242 registration.update();
243 }
244 }
245
246
247
248
249 protected final Buffer newDirectBuffer(Buffer buf) {
250 return newDirectBuffer(buf, buf);
251 }
252
253
254
255
256
257 protected final Buffer newDirectBuffer(Resource<?> holder, Buffer buf) {
258 BufferAllocator allocator = ioBufferAllocator();
259 try (holder) {
260 int readableBytes = buf.readableBytes();
261 Buffer directCopy = allocator.allocate(readableBytes);
262 if (readableBytes > 0) {
263 directCopy.writeBytes(buf);
264 }
265 return directCopy;
266 }
267 }
268
269 protected static void checkResolvable(InetSocketAddress addr) {
270 if (addr.isUnresolved()) {
271 throw new UnresolvedAddressException();
272 }
273 }
274
275
276
277
278 protected final void doReadBytes(Buffer buffer) throws Exception {
279 recvBufAllocHandle().attemptedBytesRead(buffer.writableBytes());
280 buffer.forEachWritable(0, (index, component) -> {
281 long address = component.writableNativeAddress();
282 assert address != 0;
283 int localReadAmount = socket.readAddress(address, 0, component.writableBytes());
284 recvBufAllocHandle().lastBytesRead(localReadAmount);
285 if (localReadAmount > 0) {
286 component.skipWritableBytes(localReadAmount);
287 }
288 return false;
289 });
290 }
291
292 protected final int doWriteBytes(ChannelOutboundBuffer in, Buffer buf) throws Exception {
293 int initialReaderOffset = buf.readerOffset();
294 buf.forEachReadable(0, (index, component) -> {
295 long address = component.readableNativeAddress();
296 assert address != 0;
297 int written = socket.writeAddress(address, 0, component.readableBytes());
298 if (written > 0) {
299 component.skipReadableBytes(written);
300 }
301 return false;
302 });
303 int readerOffset = buf.readerOffset();
304 if (initialReaderOffset < readerOffset) {
305 buf.readerOffset(initialReaderOffset);
306 int bytesWritten = readerOffset - initialReaderOffset;
307 in.removeBytes(bytesWritten);
308 return 1;
309 }
310 return WRITE_STATUS_SNDBUF_FULL;
311 }
312
313
314
315
316
317 protected final long doWriteOrSendBytes(Buffer data, SocketAddress remoteAddress, boolean fastOpen)
318 throws IOException {
319 assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
320
321 IovArray array = registration().cleanIovArray();
322 data.forEachReadable(0, array);
323 int count = array.count();
324 assert count != 0;
325 if (remoteAddress == null) {
326 return socket.writevAddresses(array.memoryAddress(0), count);
327 }
328 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
329 return socket.sendToAddressesDomainSocket(
330 array.memoryAddress(0), count, ((DomainSocketAddress) remoteAddress)
331 .path().getBytes(UTF_8));
332 } else {
333 InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
334 return socket.sendToAddresses(array.memoryAddress(0), count,
335 inetSocketAddress.getAddress(), inetSocketAddress.getPort(), fastOpen);
336 }
337 }
338
339 final void epollInReady() {
340 if (shouldBreakEpollInReady()) {
341 clearEpollIn0();
342 return;
343 }
344 maybeMoreDataToRead = false;
345 RecvBufferAllocator.Handle handle = recvBufAllocHandle();
346 handle.reset();
347
348 try {
349 epollInReady(handle, ioBufferAllocator(), receivedRdHup);
350 } finally {
351 this.maybeMoreDataToRead = maybeMoreDataToRead(handle) || receivedRdHup;
352
353 if (receivedRdHup || readPending && maybeMoreDataToRead) {
354
355
356
357
358
359
360
361 executeEpollInReadyRunnable();
362 } else if (!readPending && !isAutoRead()) {
363
364
365
366
367
368
369 clearEpollIn();
370 }
371 }
372 }
373
374
375
376
377 protected abstract void epollInReady(RecvBufferAllocator.Handle handle, BufferAllocator recvBufferAllocator,
378 boolean receivedRdHup);
379
380 protected abstract boolean maybeMoreDataToRead(RecvBufferAllocator.Handle handle);
381
382 private void executeEpollInReadyRunnable() {
383 if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady()) {
384 return;
385 }
386 epollInReadyRunnablePending = true;
387 executor().execute(epollInReadyRunnable);
388 }
389
390
391
392
393 final void epollRdHupReady() {
394
395 receivedRdHup = true;
396
397 if (isActive()) {
398
399
400
401 epollInReady();
402 } else {
403
404 shutdownInput(true);
405 }
406
407
408 clearEpollRdHup();
409 }
410
411
412
413
414 private void clearEpollRdHup() {
415 try {
416 clearFlag(Native.EPOLLRDHUP);
417 } catch (IOException e) {
418 pipeline().fireChannelExceptionCaught(e);
419 closeTransport(newPromise());
420 }
421 }
422
423
424
425
426 protected final void shutdownInput(boolean rdHup) {
427 if (!socket.isInputShutdown()) {
428 if (isAllowHalfClosure()) {
429 clearEpollIn();
430 shutdownTransport(ChannelShutdownDirection.Inbound, newPromise());
431 } else {
432 closeTransport(newPromise());
433 }
434 } else if (!rdHup) {
435 inputClosedSeenErrorOnRead = true;
436 }
437 }
438
439 @Override
440 protected final void writeFlushed() {
441
442
443
444 if (!isFlagSet(Native.EPOLLOUT)) {
445 super.writeFlushed();
446 }
447 }
448
449
450
451
452 final void epollOutReady() {
453 if (isConnectPending()) {
454
455 finishConnect();
456 } else if (!socket.isOutputShutdown()) {
457
458 super.writeFlushed();
459 }
460 }
461
462 private void clearEpollIn0() {
463 assert executor().inEventLoop();
464 try {
465 readPending = false;
466 clearFlag(Native.EPOLLIN);
467 } catch (IOException e) {
468
469
470 pipeline().fireChannelExceptionCaught(e);
471 closeTransport(newPromise());
472 }
473 }
474
475 @Override
476 protected boolean doFinishConnect(SocketAddress requestedRemoteAddress) throws Exception {
477 if (socket.finishConnect()) {
478 active = true;
479 clearFlag(Native.EPOLLOUT);
480 if (requestedRemoteAddress instanceof InetSocketAddress) {
481 remoteAddress = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
482 } else {
483 remoteAddress = requestedRemoteAddress;
484 }
485 return true;
486 }
487 setFlag(Native.EPOLLOUT);
488 return false;
489 }
490
491 @Override
492 protected void doBind(SocketAddress local) throws Exception {
493 if (local instanceof InetSocketAddress) {
494 checkResolvable((InetSocketAddress) local);
495 }
496 socket.bind(local);
497 if (fetchLocalAddress()) {
498 this.localAddress = socket.localAddress();
499 } else {
500 this.localAddress = local;
501 }
502 }
503
504
505
506
507 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
508 if (localAddress instanceof InetSocketAddress) {
509 checkResolvable((InetSocketAddress) localAddress);
510 }
511
512 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
513 ? (InetSocketAddress) remoteAddress : null;
514 if (remoteSocketAddr != null) {
515 checkResolvable(remoteSocketAddr);
516 }
517
518 if (localAddress != null) {
519 socket.bind(localAddress);
520 }
521
522 boolean connected = doConnect0(remoteAddress);
523 if (connected) {
524 this.remoteAddress = remoteSocketAddr == null ?
525 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
526 active = true;
527 }
528 if (fetchLocalAddress()) {
529
530
531
532 this.localAddress = socket.localAddress();
533 }
534 return connected;
535 }
536
537 protected boolean doConnect0(SocketAddress remote) throws Exception {
538 boolean success = false;
539 try {
540 boolean connected = socket.connect(remote);
541 if (!connected) {
542 setFlag(Native.EPOLLOUT);
543 }
544 success = true;
545 return connected;
546 } finally {
547 if (!success) {
548 doClose();
549 }
550 }
551 }
552
553 @Override
554 protected final SocketAddress localAddress0() {
555 return localAddress;
556 }
557
558 @Override
559 protected final SocketAddress remoteAddress0() {
560 return remoteAddress;
561 }
562
563 final void closeTransportNow() {
564 closeTransport(newPromise());
565 }
566
567 @SuppressWarnings("unchecked")
568 @Override
569 protected <T> T getExtendedOption(ChannelOption<T> option) {
570 try {
571 if (option instanceof IntegerUnixChannelOption) {
572 IntegerUnixChannelOption opt = (IntegerUnixChannelOption) option;
573 return (T) Integer.valueOf(socket.getIntOpt(opt.level(), opt.optname()));
574 }
575 if (option instanceof RawUnixChannelOption) {
576 RawUnixChannelOption opt = (RawUnixChannelOption) option;
577 ByteBuffer out = ByteBuffer.allocate(opt.length());
578 socket.getRawOpt(opt.level(), opt.optname(), out);
579 return (T) out.flip();
580 }
581 } catch (IOException e) {
582 throw new ChannelException(e);
583 }
584 return super.getExtendedOption(option);
585 }
586
587 @Override
588 protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
589 try {
590 if (option instanceof IntegerUnixChannelOption) {
591 IntegerUnixChannelOption opt = (IntegerUnixChannelOption) option;
592 socket.setIntOpt(opt.level(), opt.optname(), (Integer) value);
593 return;
594 } else if (option instanceof RawUnixChannelOption) {
595 RawUnixChannelOption opt = (RawUnixChannelOption) option;
596 socket.setRawOpt(opt.level(), opt.optname(), (ByteBuffer) value);
597 return;
598 }
599 } catch (IOException e) {
600 throw new ChannelException(e);
601 }
602 super.setExtendedOption(option, value);
603 }
604
605 @Override
606 protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
607 if (option instanceof IntegerUnixChannelOption || option instanceof RawUnixChannelOption) {
608 return true;
609 }
610 return super.isExtendedOptionSupported(option);
611 }
612
613 @Override
614 protected final void autoReadCleared() {
615 clearEpollIn();
616 }
617
618 private BufferAllocator ioBufferAllocator() {
619 BufferAllocator alloc = bufferAllocator();
620
621 if (!alloc.getAllocationType().isDirect()) {
622 return DefaultBufferAllocators.offHeapAllocator();
623 }
624 return alloc;
625 }
626 }