1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelException;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.channel.DefaultFileRegion;
28 import io.netty.channel.EventLoop;
29 import io.netty.channel.FileRegion;
30 import io.netty.channel.IoRegistration;
31 import io.netty.channel.socket.DuplexChannel;
32 import io.netty.channel.unix.IovArray;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.net.SocketAddress;
39 import java.nio.ByteBuffer;
40 import java.nio.channels.WritableByteChannel;
41
42 import static io.netty.channel.unix.Errors.ioResult;
43
44 abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
45 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
46 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
47
48
49
50
51
52
53
54 private static final int FILE_REGION_MAX_CHUNK_SIZE = Math.min(16 * 1024 * 1024,
55 Math.max(1, SystemPropertyUtil.getInt("io.netty.iouring.fileRegionChunkSize", 64 * 1024)));
56
57
58 byte writeOpCode;
59
60 long writeId;
61 byte readOpCode;
62 long readId;
63
64
65 private IoUringBufferRing bufferRing;
66
67 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
68 super(parent, socket, active);
69 }
70
71 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
72 super(parent, socket, remote);
73 }
74
75 @Override
76 protected final boolean isStreamSocket() {
77 return true;
78 }
79
80 @Override
81 public ChannelMetadata metadata() {
82 return METADATA;
83 }
84
85 @Override
86 protected AbstractUringUnsafe newUnsafe() {
87 return new IoUringStreamUnsafe();
88 }
89
90 @Override
91 public final ChannelFuture shutdown() {
92 return shutdown(newPromise());
93 }
94
95 @Override
96 public final ChannelFuture shutdown(final ChannelPromise promise) {
97 ChannelFuture shutdownOutputFuture = shutdownOutput();
98 if (shutdownOutputFuture.isDone()) {
99 shutdownOutputDone(shutdownOutputFuture, promise);
100 } else {
101 shutdownOutputFuture.addListener(new ChannelFutureListener() {
102 @Override
103 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
104 shutdownOutputDone(shutdownOutputFuture, promise);
105 }
106 });
107 }
108 return promise;
109 }
110
111 @Override
112 protected final void doShutdownOutput() throws Exception {
113 socket.shutdown(false, true);
114 }
115
116 private void shutdownInput0(final ChannelPromise promise) {
117 try {
118 socket.shutdown(true, false);
119 promise.setSuccess();
120 } catch (Throwable cause) {
121 promise.setFailure(cause);
122 }
123 }
124
125 @Override
126 public final boolean isOutputShutdown() {
127 return socket.isOutputShutdown();
128 }
129
130 @Override
131 public final boolean isInputShutdown() {
132 return socket.isInputShutdown();
133 }
134
135 @Override
136 public final boolean isShutdown() {
137 return socket.isShutdown();
138 }
139
140 @Override
141 public final ChannelFuture shutdownOutput() {
142 return shutdownOutput(newPromise());
143 }
144
145 @Override
146 public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
147 EventLoop loop = eventLoop();
148 if (loop.inEventLoop()) {
149 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
150 } else {
151 loop.execute(new Runnable() {
152 @Override
153 public void run() {
154 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
155 }
156 });
157 }
158
159 return promise;
160 }
161
162 @Override
163 public final ChannelFuture shutdownInput() {
164 return shutdownInput(newPromise());
165 }
166
167 @Override
168 public final ChannelFuture shutdownInput(final ChannelPromise promise) {
169 EventLoop loop = eventLoop();
170 if (loop.inEventLoop()) {
171 shutdownInput0(promise);
172 } else {
173 loop.execute(new Runnable() {
174 @Override
175 public void run() {
176 shutdownInput0(promise);
177 }
178 });
179 }
180 return promise;
181 }
182
183 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
184 ChannelFuture shutdownInputFuture = shutdownInput();
185 if (shutdownInputFuture.isDone()) {
186 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
187 } else {
188 shutdownInputFuture.addListener(new ChannelFutureListener() {
189 @Override
190 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
191 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
192 }
193 });
194 }
195 }
196
197 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
198 ChannelFuture shutdownInputFuture,
199 ChannelPromise promise) {
200 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
201 Throwable shutdownInputCause = shutdownInputFuture.cause();
202 if (shutdownOutputCause != null) {
203 if (shutdownInputCause != null) {
204 logger.info("Exception suppressed because a previous exception occurred.",
205 shutdownInputCause);
206 }
207 promise.setFailure(shutdownOutputCause);
208 } else if (shutdownInputCause != null) {
209 promise.setFailure(shutdownInputCause);
210 } else {
211 promise.setSuccess();
212 }
213 }
214
215 @Override
216 protected final void doRegister(ChannelPromise promise) {
217 ChannelPromise registerPromise = this.newPromise();
218
219 registerPromise.addListener(f -> {
220 if (f.isSuccess()) {
221 try {
222 short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
223 if (bgid >= 0) {
224 final IoUringIoHandler ioUringIoHandler = registration().attachment();
225 bufferRing = ioUringIoHandler.findBufferRing(bgid);
226 }
227 if (active) {
228
229 schedulePollRdHup();
230 }
231 } finally {
232 promise.setSuccess();
233 }
234 } else {
235 promise.setFailure(f.cause());
236 }
237 });
238
239 super.doRegister(registerPromise);
240 }
241
242 @Override
243 protected Object filterOutboundMessage(Object msg) {
244 if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
245 return new IoUringFileRegion((DefaultFileRegion) msg);
246 }
247
248 if (msg instanceof FileRegion) {
249
250 return msg;
251 }
252
253 return super.filterOutboundMessage(msg);
254 }
255
256 protected class IoUringStreamUnsafe extends AbstractUringUnsafe {
257
258 private ByteBuf readBuffer;
259
260
261 private ByteBuf fileRegionChunkBuf;
262
263 @Override
264 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
265 assert writeId == 0;
266
267 int fd = fd().intValue();
268 IoRegistration registration = registration();
269 IoUringIoHandler handler = registration.attachment();
270 IovArray iovArray = handler.iovArray();
271 int offset = iovArray.count();
272
273 try {
274 in.forEachFlushedMessage(filterWriteMultiple(iovArray));
275 } catch (Exception e) {
276
277 return scheduleWriteSingle(in.current());
278 }
279 long iovArrayAddress = iovArray.memoryAddress(offset);
280 int iovArrayLength = iovArray.count() - offset;
281
282 IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArrayAddress, iovArrayLength, nextOpsId());
283
284 byte opCode = ops.opcode();
285 writeId = registration.submit(ops);
286 writeOpCode = opCode;
287 if (writeId == 0) {
288 return 0;
289 }
290 return 1;
291 }
292
293 protected ChannelOutboundBuffer.MessageProcessor filterWriteMultiple(IovArray iovArray) {
294 return iovArray;
295 }
296
297 @Override
298 protected int scheduleWriteSingle(Object msg) {
299 assert writeId == 0;
300
301 int fd = fd().intValue();
302 IoRegistration registration = registration();
303 final IoUringIoOps ops;
304 if (msg instanceof IoUringFileRegion) {
305 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
306 try {
307 fileRegion.open();
308 } catch (IOException e) {
309 this.handleWriteError(e);
310 return 0;
311 }
312 ops = fileRegion.splice(fd);
313 } else if (msg instanceof FileRegion) {
314 return scheduleWriteFileRegion(fd, registration, (FileRegion) msg);
315 } else {
316 ByteBuf buf = (ByteBuf) msg;
317 long address = IoUring.memoryAddress(buf) + buf.readerIndex();
318 int length = buf.readableBytes();
319 short opsid = nextOpsId();
320
321 ops = IoUringIoOps.newSend(fd, (byte) 0, 0, address, length, opsid);
322 }
323 byte opCode = ops.opcode();
324 writeId = registration.submit(ops);
325 writeOpCode = opCode;
326 if (writeId == 0) {
327 return 0;
328 }
329 return 1;
330 }
331
332
333
334
335 private int scheduleWriteFileRegion(int fd, IoRegistration registration, FileRegion region) {
336 ByteBuf buf = fileRegionChunkBuf;
337 if (buf == null) {
338 long remaining = region.count() - region.transferred();
339 if (remaining > 0) {
340 int chunkSize = (int) Math.min(remaining, FILE_REGION_MAX_CHUNK_SIZE);
341 buf = alloc().directBuffer(chunkSize);
342 try {
343 ByteBufWritableByteChannel ch = new ByteBufWritableByteChannel(buf);
344 while (buf.writableBytes() > 0) {
345 long t = region.transferTo(ch, region.transferred());
346 if (t <= 0) {
347 break;
348 }
349 }
350 if (buf.readableBytes() == 0) {
351 buf.release();
352 handleWriteError(new ChannelException(
353 "FileRegion.transferTo(...) produced 0 bytes (count="
354 + region.count() + ", transferred=" + region.transferred() + ')'));
355 return 0;
356 }
357 } catch (Exception e) {
358 buf.release();
359 handleWriteError(e);
360 return 0;
361 }
362 } else {
363
364
365 buf = alloc().directBuffer(0);
366 }
367 fileRegionChunkBuf = buf;
368 }
369 long address = IoUring.memoryAddress(buf) + buf.readerIndex();
370 int length = buf.readableBytes();
371 IoUringIoOps ops = IoUringIoOps.newSend(fd, (byte) 0, 0, address, length, nextOpsId());
372 byte opCode = ops.opcode();
373 writeId = registration.submit(ops);
374 writeOpCode = opCode;
375 if (writeId == 0) {
376
377
378
379
380 return 0;
381 }
382 return 1;
383 }
384
385 private int calculateRecvFlags(boolean first) {
386
387
388
389
390
391
392 if (first) {
393 return 0;
394 }
395 return Native.MSG_DONTWAIT;
396 }
397
398 private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
399
400
401
402 if (first) {
403
404
405 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
406 Native.IORING_RECVSEND_POLL_FIRST : 0;
407 }
408 return 0;
409 }
410
411 @Override
412 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
413 assert readBuffer == null;
414 assert readId == 0 : readId;
415 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
416
417 if (bufferRing != null && bufferRing.isUsable()) {
418 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
419 }
420
421
422 ByteBuf byteBuf = allocHandle.allocate(alloc());
423 try {
424 int fd = fd().intValue();
425 IoRegistration registration = registration();
426 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
427 int recvFlags = calculateRecvFlags(first);
428
429 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
430 IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
431 readId = registration.submit(ops);
432 readOpCode = Native.IORING_OP_RECV;
433 if (readId == 0) {
434 return 0;
435 }
436 readBuffer = byteBuf;
437 byteBuf = null;
438 return 1;
439 } finally {
440 if (byteBuf != null) {
441 byteBuf.release();
442 }
443 }
444 }
445
446 private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
447 short bgId = bufferRing.bufferGroupId();
448 try {
449 boolean multishot = IoUring.isRecvMultishotEnabled();
450 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
451 short ioPrio;
452 final int recvFlags;
453 if (multishot) {
454 ioPrio = Native.IORING_RECV_MULTISHOT;
455 recvFlags = 0;
456 } else {
457
458
459 ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
460 recvFlags = calculateRecvFlags(first);
461 }
462 if (IoUring.isRecvsendBundleEnabled()) {
463
464
465 ioPrio |= Native.IORING_RECVSEND_BUNDLE;
466 }
467 IoRegistration registration = registration();
468 int fd = fd().intValue();
469 IoUringIoOps ops = IoUringIoOps.newRecv(
470 fd, flags, ioPrio, recvFlags, 0,
471 0, nextOpsId(), bgId
472 );
473 readId = registration.submit(ops);
474 readOpCode = Native.IORING_OP_RECV;
475 if (readId == 0) {
476 return 0;
477 }
478 if (multishot) {
479
480 return -1;
481 }
482 return 1;
483 } catch (IllegalArgumentException illegalArgumentException) {
484 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
485 return 0;
486 }
487 }
488
489 @Override
490 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
491 ByteBuf byteBuf = readBuffer;
492 readBuffer = null;
493 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
494 readId = 0;
495
496
497 if (byteBuf != null) {
498
499 byteBuf.release();
500 }
501 return;
502 }
503 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
504 boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
505 short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
506 boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
507
508 boolean empty = socketIsEmpty(flags);
509 if (rearm) {
510
511 readId = 0;
512 }
513
514 boolean allDataRead = false;
515
516 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
517 final ChannelPipeline pipeline = pipeline();
518
519 try {
520 if (res < 0) {
521 if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
522
523 if (!bufferRing.expand()) {
524
525
526
527 pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
528 }
529
530
531
532
533 scheduleRead(allocHandle.isFirstRead());
534 return;
535 }
536
537
538
539 allocHandle.lastBytesRead(ioResult("io_uring read", res));
540 } else if (res > 0) {
541 if (useBufferRing) {
542
543
544
545
546
547 int read = res;
548 for (;;) {
549 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
550 byteBuf = bufferRing.useBuffer(bid, read, more);
551 read -= byteBuf.readableBytes();
552 allocHandle.attemptedBytesRead(attemptedBytesRead);
553 allocHandle.lastBytesRead(byteBuf.readableBytes());
554
555 assert read >= 0;
556 if (read == 0) {
557
558
559 break;
560 }
561 allocHandle.incMessagesRead(1);
562 pipeline.fireChannelRead(byteBuf);
563 byteBuf = null;
564 bid = bufferRing.nextBid(bid);
565 if (!allocHandle.continueReading()) {
566
567 allocHandle.readComplete();
568 pipeline.fireChannelReadComplete();
569 allocHandle.reset(config());
570 }
571 }
572 } else {
573 int attemptedBytesRead = byteBuf.writableBytes();
574 byteBuf.writerIndex(byteBuf.writerIndex() + res);
575 allocHandle.attemptedBytesRead(attemptedBytesRead);
576 allocHandle.lastBytesRead(res);
577 }
578 } else {
579
580 allocHandle.lastBytesRead(-1);
581 }
582 if (allocHandle.lastBytesRead() <= 0) {
583
584 if (byteBuf != null) {
585
586 byteBuf.release();
587 byteBuf = null;
588 }
589 allDataRead = allocHandle.lastBytesRead() < 0;
590 if (allDataRead) {
591
592 shutdownInput(true);
593 }
594 allocHandle.readComplete();
595 pipeline.fireChannelReadComplete();
596 return;
597 }
598
599 allocHandle.incMessagesRead(1);
600 pipeline.fireChannelRead(byteBuf);
601 byteBuf = null;
602 scheduleNextRead(pipeline, allocHandle, rearm, empty);
603 } catch (Throwable t) {
604 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
605 }
606 }
607
608 private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
609 boolean rearm, boolean empty) {
610 if (allocHandle.continueReading() && !empty) {
611 if (rearm) {
612
613
614 scheduleRead(false);
615 }
616 } else {
617
618 allocHandle.readComplete();
619 pipeline.fireChannelReadComplete();
620 }
621 }
622
623 protected final void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
624 Throwable cause, boolean allDataRead,
625 IoUringRecvByteAllocatorHandle allocHandle) {
626 if (byteBuf != null) {
627 if (byteBuf.isReadable()) {
628 pipeline.fireChannelRead(byteBuf);
629 } else {
630 byteBuf.release();
631 }
632 }
633 allocHandle.readComplete();
634 pipeline.fireChannelReadComplete();
635 pipeline.fireExceptionCaught(cause);
636 if (allDataRead || cause instanceof IOException) {
637 shutdownInput(true);
638 }
639 }
640
641 private boolean handleWriteCompleteFileRegion(ChannelOutboundBuffer channelOutboundBuffer,
642 IoUringFileRegion fileRegion, int res, short data) {
643 try {
644 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
645 return true;
646 }
647 int result = res >= 0 ? res : ioResult("io_uring splice", res);
648 if (result == 0 && fileRegion.count() > 0) {
649 validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
650 return false;
651 }
652 int progress = fileRegion.handleResult(result, data);
653 if (progress == -1) {
654
655 channelOutboundBuffer.remove();
656 } else if (progress > 0) {
657 channelOutboundBuffer.progress(progress);
658 }
659 } catch (Throwable cause) {
660 handleWriteError(cause);
661 }
662 return true;
663 }
664
665 @Override
666 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
667 if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
668
669
670
671
672 writeId = 0;
673 writeOpCode = 0;
674 }
675 ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
676 Object current = channelOutboundBuffer.current();
677 if (current instanceof IoUringFileRegion) {
678 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
679 return handleWriteCompleteFileRegion(channelOutboundBuffer, fileRegion, res, data);
680 }
681
682 if (current instanceof FileRegion) {
683 return handleWriteCompleteGenericFileRegion(
684 channelOutboundBuffer, (FileRegion) current, res);
685 }
686
687 if (res >= 0) {
688 channelOutboundBuffer.removeBytes(res);
689 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
690 return true;
691 } else {
692 try {
693 if (ioResult("io_uring write", res) == 0) {
694 return false;
695 }
696 } catch (Throwable cause) {
697 handleWriteError(cause);
698 }
699 }
700 return true;
701 }
702
703
704
705
706
707 private boolean handleWriteCompleteGenericFileRegion(
708 ChannelOutboundBuffer channelOutboundBuffer, FileRegion region, int res) {
709 try {
710 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
711 releaseFileRegionChunkBuf();
712 return true;
713 }
714 if (res >= 0) {
715 ByteBuf buf = fileRegionChunkBuf;
716 assert buf != null;
717 buf.skipBytes(res);
718 channelOutboundBuffer.progress(res);
719 if (!buf.isReadable()) {
720
721 releaseFileRegionChunkBuf();
722 if (region.transferred() >= region.count()) {
723 channelOutboundBuffer.remove();
724 }
725 } else {
726
727 return false;
728 }
729 } else {
730
731
732
733
734 if (ioResult("io_uring write", res) == 0) {
735 return false;
736 }
737 }
738 } catch (Throwable cause) {
739 releaseFileRegionChunkBuf();
740 handleWriteError(cause);
741 }
742 return true;
743 }
744
745 private void releaseFileRegionChunkBuf() {
746 if (fileRegionChunkBuf != null) {
747 fileRegionChunkBuf.release();
748 fileRegionChunkBuf = null;
749 }
750 }
751
752 @Override
753 public void unregistered() {
754 super.unregistered();
755 assert readBuffer == null;
756 releaseFileRegionChunkBuf();
757 }
758 }
759
760 @Override
761 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
762 if (readId != 0) {
763
764 assert numOutstandingReads == 1 || numOutstandingReads == -1;
765 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, readOpCode);
766 long id = registration.submit(ops);
767 assert id != 0;
768 readId = 0;
769 }
770 }
771
772 @Override
773 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
774 if (writeId != 0) {
775
776
777 assert numOutstandingWrites == 1;
778 assert writeOpCode != 0;
779 long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
780 assert id != 0;
781 writeId = 0;
782 }
783 }
784
785 @Override
786 protected boolean socketIsEmpty(int flags) {
787 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
788 }
789
790 @Override
791 boolean isPollInFirst() {
792 return bufferRing == null || !bufferRing.isUsable();
793 }
794
795
796
797
798
799
800 private static final class ByteBufWritableByteChannel implements WritableByteChannel {
801 private final ByteBuf buf;
802
803 ByteBufWritableByteChannel(ByteBuf buf) {
804 this.buf = buf;
805 }
806
807 @Override
808 public int write(ByteBuffer src) {
809 int toWrite = Math.min(src.remaining(), buf.writableBytes());
810 if (toWrite == 0) {
811 return 0;
812 }
813 if (toWrite < src.remaining()) {
814 int oldLimit = src.limit();
815 src.limit(src.position() + toWrite);
816 buf.writeBytes(src);
817 src.limit(oldLimit);
818 return toWrite;
819 }
820 buf.writeBytes(src);
821 return toWrite;
822 }
823
824 @Override
825 public boolean isOpen() {
826 return true;
827 }
828
829 @Override
830 public void close() {
831
832 }
833 }
834 }