1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.internal.ChannelUtils;
32 import io.netty.channel.socket.DuplexChannel;
33 import io.netty.channel.unix.IovArray;
34 import io.netty.channel.unix.SocketWritableByteChannel;
35 import io.netty.channel.unix.UnixChannelUtil;
36 import io.netty.util.LeakPresenceDetector;
37 import io.netty.util.internal.StringUtil;
38 import io.netty.util.internal.UnstableApi;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.io.IOException;
43 import java.net.SocketAddress;
44 import java.nio.ByteBuffer;
45 import java.nio.channels.WritableByteChannel;
46 import java.util.concurrent.Executor;
47
48 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
49 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
50 import static io.netty.util.internal.StringUtil.className;
51
52 public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
53 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class);
54 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
55 private static final String EXPECTED_TYPES =
56 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
57 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
58 private WritableByteChannel byteChannel;
59 private final Runnable flushTask = new Runnable() {
60 @Override
61 public void run() {
62
63
64 ((AbstractKQueueUnsafe) unsafe()).flush0();
65 }
66 };
67
68 AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) {
69 super(parent, fd, active);
70 }
71
72 AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
73 super(parent, fd, remote);
74 }
75
76 AbstractKQueueStreamChannel(BsdSocket fd) {
77 this(null, fd, isSoErrorZero(fd));
78 }
79
80 @Override
81 protected AbstractKQueueUnsafe newUnsafe() {
82 return new KQueueStreamUnsafe();
83 }
84
85 @Override
86 public ChannelMetadata metadata() {
87 return METADATA;
88 }
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
105 int readableBytes = buf.readableBytes();
106 if (readableBytes == 0) {
107 in.remove();
108 return 0;
109 }
110
111 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
112 return doWriteBytes(in, buf);
113 } else {
114 ByteBuffer[] nioBuffers = buf.nioBuffers();
115 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
116 config().getMaxBytesPerGatheringWrite());
117 }
118 }
119
120 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
121
122
123
124 if (attempted == written) {
125 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
126 config().setMaxBytesPerGatheringWrite(attempted << 1);
127 }
128 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
129 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
130 }
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
149 final long expectedWrittenBytes = array.size();
150 assert expectedWrittenBytes != 0;
151 final int cnt = array.count();
152 assert cnt != 0;
153
154 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
155 if (localWrittenBytes > 0) {
156 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
157 in.removeBytes(localWrittenBytes);
158 return 1;
159 }
160 return WRITE_STATUS_SNDBUF_FULL;
161 }
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181 private int writeBytesMultiple(
182 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
183 long maxBytesPerGatheringWrite) throws IOException {
184 assert expectedWrittenBytes != 0;
185 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
186 expectedWrittenBytes = maxBytesPerGatheringWrite;
187 }
188
189 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
190 if (localWrittenBytes > 0) {
191 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
192 in.removeBytes(localWrittenBytes);
193 return 1;
194 }
195 return WRITE_STATUS_SNDBUF_FULL;
196 }
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
213 final long regionCount = region.count();
214 final long offset = region.transferred();
215
216 if (offset >= regionCount) {
217 in.remove();
218 return 0;
219 }
220
221 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
222 if (flushedAmount > 0) {
223 in.progress(flushedAmount);
224 if (region.transferred() >= regionCount) {
225 in.remove();
226 }
227 return 1;
228 } else if (flushedAmount == 0) {
229 validateFileRegion(region, offset);
230 }
231 return WRITE_STATUS_SNDBUF_FULL;
232 }
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
249 if (region.transferred() >= region.count()) {
250 in.remove();
251 return 0;
252 }
253
254 if (byteChannel == null) {
255 byteChannel = new KQueueSocketWritableByteChannel();
256 }
257 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
258 if (flushedAmount > 0) {
259 in.progress(flushedAmount);
260 if (region.transferred() >= region.count()) {
261 in.remove();
262 }
263 return 1;
264 }
265 return WRITE_STATUS_SNDBUF_FULL;
266 }
267
268 @Override
269 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
270 int writeSpinCount = config().getWriteSpinCount();
271 do {
272 final int msgCount = in.size();
273
274 if (msgCount > 1 && in.current() instanceof ByteBuf) {
275 writeSpinCount -= doWriteMultiple(in);
276 } else if (msgCount == 0) {
277
278 writeFilter(false);
279
280 return;
281 } else {
282 writeSpinCount -= doWriteSingle(in);
283 }
284
285
286
287
288 } while (writeSpinCount > 0);
289
290 if (writeSpinCount == 0) {
291
292
293
294
295 writeFilter(false);
296
297
298 eventLoop().execute(flushTask);
299 } else {
300
301
302 writeFilter(true);
303 }
304 }
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
321
322 Object msg = in.current();
323 if (msg instanceof ByteBuf) {
324 return writeBytes(in, (ByteBuf) msg);
325 } else if (msg instanceof DefaultFileRegion) {
326 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
327 } else if (msg instanceof FileRegion) {
328 return writeFileRegion(in, (FileRegion) msg);
329 } else {
330
331 throw new Error("Unexpected message type: " + className(msg));
332 }
333 }
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
350 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
351 IovArray array = ((NativeArrays) registration().attachment()).cleanIovArray();
352 array.maxBytes(maxBytesPerGatheringWrite);
353 in.forEachFlushedMessage(array);
354
355 if (array.count() >= 1) {
356
357 return writeBytesMultiple(in, array);
358 }
359
360 in.removeBytes(0);
361 return 0;
362 }
363
364 @Override
365 protected Object filterOutboundMessage(Object msg) {
366 if (msg instanceof ByteBuf) {
367 ByteBuf buf = (ByteBuf) msg;
368 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
369 }
370
371 if (msg instanceof FileRegion) {
372 return msg;
373 }
374
375 throw new UnsupportedOperationException(
376 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
377 }
378
379 @UnstableApi
380 @Override
381 protected final void doShutdownOutput() throws Exception {
382 socket.shutdown(false, true);
383 }
384
385 @Override
386 public boolean isOutputShutdown() {
387 return socket.isOutputShutdown();
388 }
389
390 @Override
391 public boolean isInputShutdown() {
392 return socket.isInputShutdown();
393 }
394
395 @Override
396 public boolean isShutdown() {
397 return socket.isShutdown();
398 }
399
400 @Override
401 public ChannelFuture shutdownOutput() {
402 return shutdownOutput(newPromise());
403 }
404
405 @Override
406 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
407 EventLoop loop = eventLoop();
408 if (loop.inEventLoop()) {
409 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
410 } else {
411 loop.execute(new Runnable() {
412 @Override
413 public void run() {
414 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
415 }
416 });
417 }
418 return promise;
419 }
420
421 @Override
422 public ChannelFuture shutdownInput() {
423 return shutdownInput(newPromise());
424 }
425
426 @Override
427 public ChannelFuture shutdownInput(final ChannelPromise promise) {
428 EventLoop loop = eventLoop();
429 if (loop.inEventLoop()) {
430 shutdownInput0(promise);
431 } else {
432 loop.execute(new Runnable() {
433 @Override
434 public void run() {
435 shutdownInput0(promise);
436 }
437 });
438 }
439 return promise;
440 }
441
442 private void shutdownInput0(ChannelPromise promise) {
443 try {
444 socket.shutdown(true, false);
445 } catch (Throwable cause) {
446 promise.setFailure(cause);
447 return;
448 }
449 promise.setSuccess();
450 }
451
452 @Override
453 public ChannelFuture shutdown() {
454 return shutdown(newPromise());
455 }
456
457 @Override
458 public ChannelFuture shutdown(final ChannelPromise promise) {
459 ChannelFuture shutdownOutputFuture = shutdownOutput();
460 if (shutdownOutputFuture.isDone()) {
461 shutdownOutputDone(shutdownOutputFuture, promise);
462 } else {
463 shutdownOutputFuture.addListener(new ChannelFutureListener() {
464 @Override
465 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
466 shutdownOutputDone(shutdownOutputFuture, promise);
467 }
468 });
469 }
470 return promise;
471 }
472
473 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
474 ChannelFuture shutdownInputFuture = shutdownInput();
475 if (shutdownInputFuture.isDone()) {
476 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
477 } else {
478 shutdownInputFuture.addListener(new ChannelFutureListener() {
479 @Override
480 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
481 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
482 }
483 });
484 }
485 }
486
487 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
488 ChannelFuture shutdownInputFuture,
489 ChannelPromise promise) {
490 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
491 Throwable shutdownInputCause = shutdownInputFuture.cause();
492 if (shutdownOutputCause != null) {
493 if (shutdownInputCause != null) {
494 logger.debug("Exception suppressed because a previous exception occurred.",
495 shutdownInputCause);
496 }
497 promise.setFailure(shutdownOutputCause);
498 } else if (shutdownInputCause != null) {
499 promise.setFailure(shutdownInputCause);
500 } else {
501 promise.setSuccess();
502 }
503 }
504
505 class KQueueStreamUnsafe extends AbstractKQueueUnsafe {
506
507 @Override
508 protected Executor prepareToClose() {
509 return super.prepareToClose();
510 }
511
512 @Override
513 void readReady(final KQueueRecvByteAllocatorHandle allocHandle) {
514 final ChannelConfig config = config();
515 if (shouldBreakReadReady(config)) {
516 clearReadFilter0();
517 return;
518 }
519 final ChannelPipeline pipeline = pipeline();
520 final ByteBufAllocator allocator = config.getAllocator();
521 allocHandle.reset(config);
522
523 ByteBuf byteBuf = null;
524 boolean close = false;
525 try {
526 do {
527
528
529 byteBuf = allocHandle.allocate(allocator);
530 allocHandle.lastBytesRead(doReadBytes(byteBuf));
531 if (allocHandle.lastBytesRead() <= 0) {
532
533 byteBuf.release();
534 byteBuf = null;
535 close = allocHandle.lastBytesRead() < 0;
536 if (close) {
537
538 readPending = false;
539 }
540 break;
541 }
542 allocHandle.incMessagesRead(1);
543 readPending = false;
544 pipeline.fireChannelRead(byteBuf);
545 byteBuf = null;
546
547 if (shouldBreakReadReady(config)) {
548
549
550
551
552
553
554
555
556
557
558
559 break;
560 }
561 } while (allocHandle.continueReading());
562
563 allocHandle.readComplete();
564 pipeline.fireChannelReadComplete();
565
566 if (close) {
567 shutdownInput(false);
568 }
569 } catch (Throwable t) {
570 handleReadException(pipeline, byteBuf, t, close, allocHandle);
571 } finally {
572 if (shouldStopReading(config)) {
573 clearReadFilter0();
574 }
575 }
576 }
577
578 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
579 KQueueRecvByteAllocatorHandle allocHandle) {
580 if (byteBuf != null) {
581 if (byteBuf.isReadable()) {
582 readPending = false;
583 pipeline.fireChannelRead(byteBuf);
584 } else {
585 byteBuf.release();
586 }
587 }
588 if (!failConnectPromise(cause)) {
589 allocHandle.readComplete();
590 pipeline.fireChannelReadComplete();
591 pipeline.fireExceptionCaught(cause);
592
593
594
595 if (close ||
596 cause instanceof OutOfMemoryError ||
597 cause instanceof LeakPresenceDetector.AllocationProhibitedException ||
598 cause instanceof IOException) {
599 shutdownInput(false);
600 }
601 }
602 }
603 }
604
605 private final class KQueueSocketWritableByteChannel extends SocketWritableByteChannel {
606 KQueueSocketWritableByteChannel() {
607 super(socket);
608 }
609
610 @Override
611 protected ByteBufAllocator alloc() {
612 return AbstractKQueueStreamChannel.this.alloc();
613 }
614 }
615 }