1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelException;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.channel.DefaultFileRegion;
28 import io.netty.channel.EventLoop;
29 import io.netty.channel.FileRegion;
30 import io.netty.channel.IoRegistration;
31 import io.netty.channel.socket.DuplexChannel;
32 import io.netty.channel.unix.IovArray;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.net.SocketAddress;
39 import java.nio.ByteBuffer;
40 import java.nio.channels.WritableByteChannel;
41
42 import static io.netty.channel.unix.Errors.ioResult;
43
44 abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
45 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
46 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
47
48
49
50
51
52
53
54 private static final int FILE_REGION_MAX_CHUNK_SIZE = Math.min(16 * 1024 * 1024,
55 Math.max(1, SystemPropertyUtil.getInt("io.netty.iouring.fileRegionChunkSize", 64 * 1024)));
56
57
58 byte writeOpCode;
59
60 long writeId;
61 byte readOpCode;
62 long readId;
63
64
65 private IoUringBufferRing bufferRing;
66
67 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
68 super(parent, socket, active);
69 }
70
71 AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
72 super(parent, socket, remote);
73 }
74
75 @Override
76 protected final boolean isStreamSocket() {
77 return true;
78 }
79
80 @Override
81 public ChannelMetadata metadata() {
82 return METADATA;
83 }
84
85 @Override
86 protected AbstractUringUnsafe newUnsafe() {
87 return new IoUringStreamUnsafe();
88 }
89
90 @Override
91 public final ChannelFuture shutdown() {
92 return shutdown(newPromise());
93 }
94
95 @Override
96 public final ChannelFuture shutdown(final ChannelPromise promise) {
97 ChannelFuture shutdownOutputFuture = shutdownOutput();
98 if (shutdownOutputFuture.isDone()) {
99 shutdownOutputDone(shutdownOutputFuture, promise);
100 } else {
101 shutdownOutputFuture.addListener(new ChannelFutureListener() {
102 @Override
103 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
104 shutdownOutputDone(shutdownOutputFuture, promise);
105 }
106 });
107 }
108 return promise;
109 }
110
111 @Override
112 protected final void doShutdownOutput() throws Exception {
113 socket.shutdown(false, true);
114 }
115
116 private void shutdownInput0(final ChannelPromise promise) {
117 try {
118 socket.shutdown(true, false);
119 promise.setSuccess();
120 } catch (Throwable cause) {
121 promise.setFailure(cause);
122 }
123 }
124
125 @Override
126 public final boolean isOutputShutdown() {
127 return socket.isOutputShutdown();
128 }
129
130 @Override
131 public final boolean isInputShutdown() {
132 return socket.isInputShutdown();
133 }
134
135 @Override
136 public final boolean isShutdown() {
137 return socket.isShutdown();
138 }
139
140 @Override
141 public final ChannelFuture shutdownOutput() {
142 return shutdownOutput(newPromise());
143 }
144
145 @Override
146 public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
147 EventLoop loop = eventLoop();
148 if (loop.inEventLoop()) {
149 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
150 } else {
151 loop.execute(new Runnable() {
152 @Override
153 public void run() {
154 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
155 }
156 });
157 }
158
159 return promise;
160 }
161
162 @Override
163 public final ChannelFuture shutdownInput() {
164 return shutdownInput(newPromise());
165 }
166
167 @Override
168 public final ChannelFuture shutdownInput(final ChannelPromise promise) {
169 EventLoop loop = eventLoop();
170 if (loop.inEventLoop()) {
171 shutdownInput0(promise);
172 } else {
173 loop.execute(new Runnable() {
174 @Override
175 public void run() {
176 shutdownInput0(promise);
177 }
178 });
179 }
180 return promise;
181 }
182
183 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
184 ChannelFuture shutdownInputFuture = shutdownInput();
185 if (shutdownInputFuture.isDone()) {
186 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
187 } else {
188 shutdownInputFuture.addListener(new ChannelFutureListener() {
189 @Override
190 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
191 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
192 }
193 });
194 }
195 }
196
197 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
198 ChannelFuture shutdownInputFuture,
199 ChannelPromise promise) {
200 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
201 Throwable shutdownInputCause = shutdownInputFuture.cause();
202 if (shutdownOutputCause != null) {
203 if (shutdownInputCause != null) {
204 logger.info("Exception suppressed because a previous exception occurred.",
205 shutdownInputCause);
206 }
207 promise.setFailure(shutdownOutputCause);
208 } else if (shutdownInputCause != null) {
209 promise.setFailure(shutdownInputCause);
210 } else {
211 promise.setSuccess();
212 }
213 }
214
215 @Override
216 protected final void doRegister(ChannelPromise promise) {
217 ChannelPromise registerPromise = this.newPromise();
218
219 registerPromise.addListener(f -> {
220 if (f.isSuccess()) {
221 try {
222 short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
223 if (bgid >= 0) {
224 final IoUringIoHandler ioUringIoHandler = registration().attachment();
225 bufferRing = ioUringIoHandler.findBufferRing(bgid);
226 }
227 if (active) {
228
229 schedulePollRdHup();
230 }
231 } finally {
232 promise.setSuccess();
233 }
234 } else {
235 promise.setFailure(f.cause());
236 }
237 });
238
239 super.doRegister(registerPromise);
240 }
241
242 @Override
243 protected Object filterOutboundMessage(Object msg) {
244 if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
245 return new IoUringFileRegion((DefaultFileRegion) msg);
246 }
247
248 if (msg instanceof FileRegion) {
249
250 return msg;
251 }
252
253 return super.filterOutboundMessage(msg);
254 }
255
256 protected class IoUringStreamUnsafe extends AbstractUringUnsafe {
257
258 private ByteBuf readBuffer;
259
260
261 private ByteBuf fileRegionChunkBuf;
262
263 @Override
264 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
265 assert writeId == 0;
266
267 int fd = fd().intValue();
268 IoRegistration registration = registration();
269 IoUringIoHandler handler = registration.attachment();
270 IovArray iovArray = handler.iovArray();
271 int offset = iovArray.count();
272
273 try {
274 in.forEachFlushedMessage(filterWriteMultiple(iovArray));
275 } catch (Exception e) {
276
277 return scheduleWriteSingle(in.current());
278 }
279 long iovArrayAddress = iovArray.memoryAddress(offset);
280 int iovArrayLength = iovArray.count() - offset;
281
282 IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArrayAddress, iovArrayLength, nextOpsId());
283
284 byte opCode = ops.opcode();
285 writeId = registration.submit(ops);
286 writeOpCode = opCode;
287 if (writeId == 0) {
288 return 0;
289 }
290 return 1;
291 }
292
293 protected ChannelOutboundBuffer.MessageProcessor filterWriteMultiple(IovArray iovArray) {
294 return iovArray;
295 }
296
297 @Override
298 protected int scheduleWriteSingle(Object msg) {
299 assert writeId == 0;
300
301 int fd = fd().intValue();
302 IoRegistration registration = registration();
303 final IoUringIoOps ops;
304 if (msg instanceof IoUringFileRegion) {
305 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
306 try {
307 fileRegion.open();
308 } catch (IOException e) {
309 this.handleWriteError(e);
310 return 0;
311 }
312 ops = fileRegion.splice(fd);
313 } else if (msg instanceof FileRegion) {
314 return scheduleWriteFileRegion(fd, registration, (FileRegion) msg);
315 } else {
316 ByteBuf buf = (ByteBuf) msg;
317 long address = IoUring.memoryAddress(buf) + buf.readerIndex();
318 int length = buf.readableBytes();
319 short opsid = nextOpsId();
320
321 ops = IoUringIoOps.newSend(fd, (byte) 0, 0, address, length, opsid);
322 }
323 byte opCode = ops.opcode();
324 writeId = registration.submit(ops);
325 writeOpCode = opCode;
326 if (writeId == 0) {
327 return 0;
328 }
329 return 1;
330 }
331
332
333
334
335 private int scheduleWriteFileRegion(int fd, IoRegistration registration, FileRegion region) {
336 ByteBuf buf = fileRegionChunkBuf;
337 if (buf == null) {
338 long remaining = region.count() - region.transferred();
339 if (remaining > 0) {
340 int chunkSize = (int) Math.min(remaining, FILE_REGION_MAX_CHUNK_SIZE);
341 buf = alloc().directBuffer(chunkSize);
342 try {
343 ByteBufWritableByteChannel ch = new ByteBufWritableByteChannel(buf);
344
345
346
347
348 while (buf.writableBytes() > 0 && region.transferred() < region.count()) {
349 long t = region.transferTo(ch, region.transferred());
350 if (t <= 0) {
351 break;
352 }
353 }
354 if (buf.readableBytes() == 0) {
355 buf.release();
356 handleWriteError(new ChannelException(
357 "FileRegion.transferTo(...) produced 0 bytes (count="
358 + region.count() + ", transferred=" + region.transferred() + ')'));
359 return 0;
360 }
361 } catch (Exception e) {
362 buf.release();
363 handleWriteError(e);
364 return 0;
365 }
366 } else {
367
368
369 buf = alloc().directBuffer(0);
370 }
371 fileRegionChunkBuf = buf;
372 }
373 long address = IoUring.memoryAddress(buf) + buf.readerIndex();
374 int length = buf.readableBytes();
375 IoUringIoOps ops = IoUringIoOps.newSend(fd, (byte) 0, 0, address, length, nextOpsId());
376 byte opCode = ops.opcode();
377 writeId = registration.submit(ops);
378 writeOpCode = opCode;
379 if (writeId == 0) {
380
381
382
383
384 return 0;
385 }
386 return 1;
387 }
388
389 private int calculateRecvFlags(boolean first) {
390
391
392
393
394
395
396 if (first) {
397 return 0;
398 }
399 return Native.MSG_DONTWAIT;
400 }
401
402 private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
403
404
405
406 if (first) {
407
408
409 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
410 Native.IORING_RECVSEND_POLL_FIRST : 0;
411 }
412 return 0;
413 }
414
415 @Override
416 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
417 assert readBuffer == null;
418 assert readId == 0 : readId;
419 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
420
421 if (bufferRing != null && bufferRing.isUsable()) {
422 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
423 }
424
425
426 ByteBuf byteBuf = allocHandle.allocate(alloc());
427 try {
428 int fd = fd().intValue();
429 IoRegistration registration = registration();
430 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
431 int recvFlags = calculateRecvFlags(first);
432
433 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
434 IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
435 readId = registration.submit(ops);
436 readOpCode = Native.IORING_OP_RECV;
437 if (readId == 0) {
438 return 0;
439 }
440 readBuffer = byteBuf;
441 byteBuf = null;
442 return 1;
443 } finally {
444 if (byteBuf != null) {
445 byteBuf.release();
446 }
447 }
448 }
449
450 private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
451 short bgId = bufferRing.bufferGroupId();
452 try {
453 boolean multishot = IoUring.isRecvMultishotEnabled();
454 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
455 short ioPrio;
456 final int recvFlags;
457 if (multishot) {
458 ioPrio = Native.IORING_RECV_MULTISHOT;
459 recvFlags = 0;
460 } else {
461
462
463 ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
464 recvFlags = calculateRecvFlags(first);
465 }
466 if (IoUring.isRecvsendBundleEnabled()) {
467
468
469 ioPrio |= Native.IORING_RECVSEND_BUNDLE;
470 }
471 IoRegistration registration = registration();
472 int fd = fd().intValue();
473 IoUringIoOps ops = IoUringIoOps.newRecv(
474 fd, flags, ioPrio, recvFlags, 0,
475 0, nextOpsId(), bgId
476 );
477 readId = registration.submit(ops);
478 readOpCode = Native.IORING_OP_RECV;
479 if (readId == 0) {
480 return 0;
481 }
482 if (multishot) {
483
484 return -1;
485 }
486 return 1;
487 } catch (IllegalArgumentException illegalArgumentException) {
488 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
489 return 0;
490 }
491 }
492
493 @Override
494 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
495 ByteBuf byteBuf = readBuffer;
496 readBuffer = null;
497 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
498 readId = 0;
499
500
501 if (byteBuf != null) {
502
503 byteBuf.release();
504 }
505 return;
506 }
507 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
508 boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
509 short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
510 boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
511
512 boolean empty = socketIsEmpty(flags);
513 if (rearm) {
514
515 readId = 0;
516 }
517
518 boolean allDataRead = false;
519
520 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
521 final ChannelPipeline pipeline = pipeline();
522
523 try {
524 if (res < 0) {
525 if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
526
527 if (!bufferRing.expand()) {
528
529
530
531 pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
532 }
533
534
535
536
537 scheduleRead(allocHandle.isFirstRead());
538 return;
539 }
540
541
542
543 allocHandle.lastBytesRead(ioResult("io_uring read", res));
544 } else if (res > 0) {
545 if (useBufferRing) {
546
547
548
549
550
551 int read = res;
552 for (;;) {
553 int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
554 byteBuf = bufferRing.useBuffer(bid, read, more);
555 read -= byteBuf.readableBytes();
556 allocHandle.attemptedBytesRead(attemptedBytesRead);
557 allocHandle.lastBytesRead(byteBuf.readableBytes());
558
559 assert read >= 0;
560 if (read == 0) {
561
562
563 break;
564 }
565 allocHandle.incMessagesRead(1);
566 pipeline.fireChannelRead(byteBuf);
567 byteBuf = null;
568 bid = bufferRing.nextBid(bid);
569 if (!allocHandle.continueReading()) {
570
571 allocHandle.readComplete();
572 pipeline.fireChannelReadComplete();
573 allocHandle.reset(config());
574 }
575 }
576 } else {
577 int attemptedBytesRead = byteBuf.writableBytes();
578 byteBuf.writerIndex(byteBuf.writerIndex() + res);
579 allocHandle.attemptedBytesRead(attemptedBytesRead);
580 allocHandle.lastBytesRead(res);
581 }
582 } else {
583
584 allocHandle.lastBytesRead(-1);
585 }
586 if (allocHandle.lastBytesRead() <= 0) {
587
588 if (byteBuf != null) {
589
590 byteBuf.release();
591 byteBuf = null;
592 }
593 allDataRead = allocHandle.lastBytesRead() < 0;
594 if (allDataRead) {
595
596 shutdownInput(true);
597 }
598 allocHandle.readComplete();
599 pipeline.fireChannelReadComplete();
600 return;
601 }
602
603 allocHandle.incMessagesRead(1);
604 pipeline.fireChannelRead(byteBuf);
605 byteBuf = null;
606 scheduleNextRead(pipeline, allocHandle, rearm, empty);
607 } catch (Throwable t) {
608 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
609 }
610 }
611
612 private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
613 boolean rearm, boolean empty) {
614 if (allocHandle.continueReading() && !empty) {
615 if (rearm) {
616
617
618 scheduleRead(false);
619 }
620 } else {
621
622 allocHandle.readComplete();
623 pipeline.fireChannelReadComplete();
624 }
625 }
626
627 protected final void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
628 Throwable cause, boolean allDataRead,
629 IoUringRecvByteAllocatorHandle allocHandle) {
630 if (byteBuf != null) {
631 if (byteBuf.isReadable()) {
632 pipeline.fireChannelRead(byteBuf);
633 } else {
634 byteBuf.release();
635 }
636 }
637 allocHandle.readComplete();
638 pipeline.fireChannelReadComplete();
639 pipeline.fireExceptionCaught(cause);
640 if (allDataRead || cause instanceof IOException) {
641 shutdownInput(true);
642 }
643 }
644
645 private boolean handleWriteCompleteFileRegion(ChannelOutboundBuffer channelOutboundBuffer,
646 IoUringFileRegion fileRegion, int res, short data) {
647 try {
648 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
649 return true;
650 }
651 int result = res >= 0 ? res : ioResult("io_uring splice", res);
652 if (result == 0 && fileRegion.count() > 0) {
653 validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
654 return false;
655 }
656 int progress = fileRegion.handleResult(result, data);
657 if (progress == -1) {
658
659 channelOutboundBuffer.remove();
660 } else if (progress > 0) {
661 channelOutboundBuffer.progress(progress);
662 }
663 } catch (Throwable cause) {
664 handleWriteError(cause);
665 }
666 return true;
667 }
668
669 @Override
670 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
671 if ((flags & Native.IORING_CQE_F_NOTIF) == 0) {
672
673
674
675
676 writeId = 0;
677 writeOpCode = 0;
678 }
679 ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
680 Object current = channelOutboundBuffer.current();
681 if (current instanceof IoUringFileRegion) {
682 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
683 return handleWriteCompleteFileRegion(channelOutboundBuffer, fileRegion, res, data);
684 }
685
686 if (current instanceof FileRegion) {
687 return handleWriteCompleteGenericFileRegion(
688 channelOutboundBuffer, (FileRegion) current, res);
689 }
690
691 if (res >= 0) {
692 channelOutboundBuffer.removeBytes(res);
693 } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
694 return true;
695 } else {
696 try {
697 if (ioResult("io_uring write", res) == 0) {
698 return false;
699 }
700 } catch (Throwable cause) {
701 handleWriteError(cause);
702 }
703 }
704 return true;
705 }
706
707
708
709
710
711 private boolean handleWriteCompleteGenericFileRegion(
712 ChannelOutboundBuffer channelOutboundBuffer, FileRegion region, int res) {
713 try {
714 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
715 releaseFileRegionChunkBuf();
716 return true;
717 }
718 if (res >= 0) {
719 ByteBuf buf = fileRegionChunkBuf;
720 assert buf != null;
721 buf.skipBytes(res);
722 channelOutboundBuffer.progress(res);
723 if (!buf.isReadable()) {
724
725 releaseFileRegionChunkBuf();
726 if (region.transferred() >= region.count()) {
727 channelOutboundBuffer.remove();
728 }
729 } else {
730
731 return false;
732 }
733 } else {
734
735
736
737
738 if (ioResult("io_uring write", res) == 0) {
739 return false;
740 }
741 }
742 } catch (Throwable cause) {
743 releaseFileRegionChunkBuf();
744 handleWriteError(cause);
745 }
746 return true;
747 }
748
749 private void releaseFileRegionChunkBuf() {
750 if (fileRegionChunkBuf != null) {
751 fileRegionChunkBuf.release();
752 fileRegionChunkBuf = null;
753 }
754 }
755
756 @Override
757 public void unregistered() {
758 super.unregistered();
759 assert readBuffer == null;
760 releaseFileRegionChunkBuf();
761 }
762 }
763
764 @Override
765 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
766 if (readId != 0) {
767
768 assert numOutstandingReads == 1 || numOutstandingReads == -1;
769 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, readOpCode);
770 long id = registration.submit(ops);
771 assert id != 0;
772 readId = 0;
773 }
774 }
775
776 @Override
777 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
778 if (writeId != 0) {
779
780
781 assert numOutstandingWrites == 1;
782 assert writeOpCode != 0;
783 long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
784 assert id != 0;
785 writeId = 0;
786 }
787 }
788
789 @Override
790 protected boolean socketIsEmpty(int flags) {
791 return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
792 }
793
794 @Override
795 boolean isPollInFirst() {
796 return bufferRing == null || !bufferRing.isUsable();
797 }
798
799
800
801
802
803
804 private static final class ByteBufWritableByteChannel implements WritableByteChannel {
805 private final ByteBuf buf;
806
807 ByteBufWritableByteChannel(ByteBuf buf) {
808 this.buf = buf;
809 }
810
811 @Override
812 public int write(ByteBuffer src) {
813 int toWrite = Math.min(src.remaining(), buf.writableBytes());
814 if (toWrite == 0) {
815 return 0;
816 }
817 if (toWrite < src.remaining()) {
818 int oldLimit = src.limit();
819 src.limit(src.position() + toWrite);
820 buf.writeBytes(src);
821 src.limit(oldLimit);
822 return toWrite;
823 }
824 buf.writeBytes(src);
825 return toWrite;
826 }
827
828 @Override
829 public boolean isOpen() {
830 return true;
831 }
832
833 @Override
834 public void close() {
835
836 }
837 }
838 }