1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http2;
17
18 import io.netty5.buffer.api.BufferAllocator;
19 import io.netty5.buffer.api.DefaultBufferAllocators;
20 import io.netty5.channel.AdaptiveRecvBufferAllocator;
21 import io.netty5.channel.ChannelOption;
22 import io.netty5.channel.ChannelOutputShutdownException;
23 import io.netty5.channel.ChannelShutdownDirection;
24 import io.netty5.channel.MaxMessagesRecvBufferAllocator;
25 import io.netty5.util.Resource;
26 import io.netty5.channel.Channel;
27 import io.netty5.channel.ChannelHandler;
28 import io.netty5.channel.ChannelId;
29 import io.netty5.channel.ChannelMetadata;
30 import io.netty5.channel.ChannelPipeline;
31 import io.netty5.channel.DefaultChannelPipeline;
32 import io.netty5.channel.EventLoop;
33 import io.netty5.channel.MessageSizeEstimator;
34 import io.netty5.channel.RecvBufferAllocator;
35 import io.netty5.channel.WriteBufferWaterMark;
36 import io.netty5.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
37 import io.netty5.util.DefaultAttributeMap;
38 import io.netty5.util.concurrent.EventExecutor;
39 import io.netty5.util.concurrent.Future;
40 import io.netty5.util.concurrent.Promise;
41 import io.netty5.util.internal.StringUtil;
42 import io.netty5.util.internal.ThrowableUtil;
43 import io.netty5.util.internal.logging.InternalLogger;
44 import io.netty5.util.internal.logging.InternalLoggerFactory;
45
46 import java.io.IOException;
47 import java.net.SocketAddress;
48 import java.nio.channels.ClosedChannelException;
49 import java.nio.channels.NotYetConnectedException;
50 import java.util.ArrayDeque;
51 import java.util.Queue;
52 import java.util.concurrent.RejectedExecutionException;
53 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
54 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
55 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
56
57 import static io.netty5.channel.ChannelOption.ALLOW_HALF_CLOSURE;
58 import static io.netty5.channel.ChannelOption.AUTO_CLOSE;
59 import static io.netty5.channel.ChannelOption.AUTO_READ;
60 import static io.netty5.channel.ChannelOption.BUFFER_ALLOCATOR;
61 import static io.netty5.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
62 import static io.netty5.channel.ChannelOption.MAX_MESSAGES_PER_READ;
63 import static io.netty5.channel.ChannelOption.MAX_MESSAGES_PER_WRITE;
64 import static io.netty5.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
65 import static io.netty5.channel.ChannelOption.RCVBUFFER_ALLOCATOR;
66 import static io.netty5.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
67 import static io.netty5.channel.ChannelOption.WRITE_SPIN_COUNT;
68 import static io.netty5.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
69 import static io.netty5.util.internal.ObjectUtil.checkPositive;
70 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
71 import static java.lang.Math.min;
72 import static java.util.Objects.requireNonNull;
73
74 final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
75
76 static final Http2FrameStreamVisitor WRITABLE_VISITOR = stream -> {
77 final DefaultHttp2StreamChannel childChannel = (DefaultHttp2StreamChannel)
78 ((DefaultHttp2FrameStream) stream).attachment;
79 childChannel.trySetWritable();
80 return true;
81 };
82
83 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2StreamChannel.class);
84
85 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
86
87
88
89
90
91 private static final int MIN_HTTP2_FRAME_SIZE = 9;
92
93
94
95
96 private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
97
98 static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
99
100 private static final Handle HANDLE_INSTANCE = msg -> msg instanceof Http2DataFrame ?
101
102 (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
103 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
104
105 @Override
106 public Handle newHandle() {
107 return HANDLE_INSTANCE;
108 }
109 }
110
111 private static final AtomicLongFieldUpdater<DefaultHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
112 AtomicLongFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "totalPendingSize");
113
114 private static final AtomicIntegerFieldUpdater<DefaultHttp2StreamChannel> UNWRITABLE_UPDATER =
115 AtomicIntegerFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "unwritable");
116
117 private static void windowUpdateFrameWriteComplete(Channel streamChannel, Future<?> future) {
118 Throwable cause = future.cause();
119 if (cause != null) {
120 Throwable unwrappedCause;
121
122 if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
123 cause = unwrappedCause;
124 }
125
126
127 streamChannel.pipeline().fireChannelExceptionCaught(cause);
128 ((DefaultHttp2StreamChannel) streamChannel).closeTransport(streamChannel.newPromise());
129 }
130 }
131
132
133
134
135 private enum ReadStatus {
136
137
138
139 IDLE,
140
141
142
143
144 IN_PROGRESS,
145
146
147
148
149 REQUESTED
150 }
151
152 private static final AtomicIntegerFieldUpdater<DefaultHttp2StreamChannel> AUTOREAD_UPDATER =
153 AtomicIntegerFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "autoRead");
154 private static final AtomicReferenceFieldUpdater<DefaultHttp2StreamChannel, WriteBufferWaterMark>
155 WATERMARK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
156 DefaultHttp2StreamChannel.class, WriteBufferWaterMark.class, "writeBufferWaterMark");
157
158 private volatile BufferAllocator bufferAllocator = DefaultBufferAllocators.preferredAllocator();
159 private volatile RecvBufferAllocator rcvBufAllocator = new AdaptiveRecvBufferAllocator();
160
161 private volatile int connectTimeoutMillis = 30000;
162 private volatile int writeSpinCount = 16;
163 private volatile int maxMessagesPerWrite = Integer.MAX_VALUE;
164
165 @SuppressWarnings("FieldMayBeFinal")
166 private volatile int autoRead = 1;
167 private volatile boolean autoClose = true;
168 private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
169 private volatile boolean allowHalfClosure;
170 private final Http2MultiplexHandler handler;
171 private final ChannelId channelId;
172 private final ChannelPipeline pipeline;
173 private final DefaultHttp2FrameStream stream;
174 private final Promise<Void> closePromise;
175
176 private volatile boolean registered;
177
178 private volatile long totalPendingSize;
179 private volatile int unwritable;
180 private volatile boolean inputShutdown;
181 private volatile boolean outputShutdown;
182
183
184 private Runnable fireChannelWritabilityChangedTask;
185
186 private int flowControlledBytes;
187
188
189
190
191
192
193
194 private ReadStatus readStatus = ReadStatus.IDLE;
195
196 private Queue<Object> inboundBuffer;
197
198
199 private boolean firstFrameWritten;
200 private boolean readCompletePending;
201
202 private RecvBufferAllocator.Handle recvHandle;
203 private boolean writeDoneAndNoFlush;
204 private boolean closeInitiated;
205 private boolean readEOS;
206
207 DefaultHttp2StreamChannel(Http2MultiplexHandler handler, DefaultHttp2FrameStream stream, int id,
208 ChannelHandler inboundHandler) {
209 this.handler = handler;
210 this.stream = stream;
211 stream.attachment = this;
212 pipeline = new DefaultHttp2ChannelPipeline(this);
213 closePromise = pipeline.newPromise();
214 channelId = new Http2StreamChannelId(parent().id(), id);
215
216 if (inboundHandler != null) {
217
218 pipeline.addLast(inboundHandler);
219 }
220 }
221
222 private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
223 if (size == 0) {
224 return;
225 }
226
227 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
228 if (newWriteBufferSize > writeBufferWaterMark.high()) {
229 setUnwritable(invokeLater);
230 }
231 }
232
233 private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
234 if (size == 0) {
235 return;
236 }
237
238 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
239
240
241
242
243
244 if (newWriteBufferSize < writeBufferWaterMark.low() && parent().isWritable()) {
245 setWritable(invokeLater);
246 }
247 }
248
249 void trySetWritable() {
250
251
252
253
254 if (totalPendingSize < writeBufferWaterMark.low()) {
255 setWritable(false);
256 }
257 }
258
259 private void setWritable(boolean invokeLater) {
260 for (;;) {
261 final int oldValue = unwritable;
262 final int newValue = oldValue & ~1;
263 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
264 if (oldValue != 0 && newValue == 0) {
265 fireChannelWritabilityChanged(invokeLater);
266 }
267 break;
268 }
269 }
270 }
271
272 private void setUnwritable(boolean invokeLater) {
273 for (;;) {
274 final int oldValue = unwritable;
275 final int newValue = oldValue | 1;
276 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
277 if (oldValue == 0) {
278 fireChannelWritabilityChanged(invokeLater);
279 }
280 break;
281 }
282 }
283 }
284
285 private void fireChannelWritabilityChanged(boolean invokeLater) {
286 final ChannelPipeline pipeline = pipeline();
287 if (invokeLater) {
288 Runnable task = fireChannelWritabilityChangedTask;
289 if (task == null) {
290 fireChannelWritabilityChangedTask = task = pipeline::fireChannelWritabilityChanged;
291 }
292 executor().execute(task);
293 } else {
294 pipeline.fireChannelWritabilityChanged();
295 }
296 }
297 @Override
298 public Http2FrameStream stream() {
299 return stream;
300 }
301
302 void closeOutbound() {
303 outputShutdown = true;
304 }
305
306 void streamClosed() {
307 readEOS();
308
309
310 doBeginRead();
311 }
312
313 @Override
314 public ChannelMetadata metadata() {
315 return METADATA;
316 }
317
318 @Override
319 public boolean isOpen() {
320 return !closePromise.isDone();
321 }
322
323 @Override
324 public boolean isActive() {
325 return isOpen();
326 }
327
328 @Override
329 public boolean isShutdown(ChannelShutdownDirection direction) {
330 if (!isActive()) {
331 return true;
332 }
333 switch (direction) {
334 case Inbound:
335 return inputShutdown;
336 case Outbound:
337 return outputShutdown;
338 default:
339 throw new AssertionError();
340 }
341 }
342
343 @Override
344 public ChannelId id() {
345 return channelId;
346 }
347
348 @Override
349 public EventLoop executor() {
350 return parent().executor();
351 }
352
353 @Override
354 public Channel parent() {
355 return handler.parentContext().channel();
356 }
357
358 @Override
359 public boolean isRegistered() {
360 return registered;
361 }
362
363 @Override
364 public SocketAddress localAddress() {
365 return parent().localAddress();
366 }
367
368 @Override
369 public SocketAddress remoteAddress() {
370 return parent().remoteAddress();
371 }
372
373 @Override
374 public Future<Void> closeFuture() {
375 return closePromise.asFuture();
376 }
377
378 @Override
379 public long writableBytes() {
380 long bytes = writeBufferWaterMark.high()
381 - totalPendingSize - pipeline.pendingOutboundBytes();
382
383 if (bytes > 0) {
384 return unwritable == 0 ? bytes : 0;
385 }
386 return 0;
387 }
388
389 @Override
390 public ChannelPipeline pipeline() {
391 return pipeline;
392 }
393
394 @Override
395 public int hashCode() {
396 return id().hashCode();
397 }
398
399 @Override
400 public boolean equals(Object o) {
401 return this == o;
402 }
403
404 @Override
405 public int compareTo(Channel o) {
406 if (this == o) {
407 return 0;
408 }
409
410 return id().compareTo(o.id());
411 }
412
413 @Override
414 public String toString() {
415 return parent().toString() + "(H2 - " + stream + ')';
416 }
417
418
419
420
421
422 void fireChildRead(Http2Frame frame) {
423 assert executor().inEventLoop();
424 if (!isActive()) {
425 Resource.dispose(frame);
426 } else if (readStatus != ReadStatus.IDLE) {
427
428
429 assert inboundBuffer == null || inboundBuffer.isEmpty();
430 final RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
431 doRead0(frame, allocHandle);
432
433
434
435
436 if (allocHandle.continueReading(isAutoRead()) && !isShutdown(ChannelShutdownDirection.Inbound)) {
437 maybeAddChannelToReadCompletePendingQueue();
438 } else {
439 notifyReadComplete(allocHandle, true);
440 }
441 } else {
442 if (inboundBuffer == null) {
443 inboundBuffer = new ArrayDeque<>(4);
444 }
445 inboundBuffer.add(frame);
446 }
447 }
448
449 void fireChildReadComplete() {
450 assert executor().inEventLoop();
451 assert readStatus != ReadStatus.IDLE || !readCompletePending;
452 notifyReadComplete(recvBufAllocHandle(), false);
453 }
454
455 private void connectTransport(final SocketAddress remoteAddress,
456 SocketAddress localAddress, Promise<Void> promise) {
457 if (!promise.setUncancellable()) {
458 return;
459 }
460 promise.setFailure(new UnsupportedOperationException());
461 }
462
463 private RecvBufferAllocator.Handle recvBufAllocHandle() {
464 if (recvHandle == null) {
465 recvHandle = rcvBufAllocator.newHandle();
466 recvHandle.reset();
467 }
468 return recvHandle;
469 }
470
471 private void registerTransport(Promise<Void> promise) {
472 if (!promise.setUncancellable()) {
473 return;
474 }
475 if (registered) {
476 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
477 return;
478 }
479
480 registered = true;
481
482 promise.setSuccess(null);
483
484 pipeline().fireChannelRegistered();
485 if (isActive()) {
486 pipeline().fireChannelActive();
487 if (isAutoRead()) {
488 read();
489 }
490 }
491 }
492
493 private void bindTransport(SocketAddress localAddress, Promise<Void> promise) {
494 if (!promise.setUncancellable()) {
495 return;
496 }
497 promise.setFailure(new UnsupportedOperationException());
498 }
499
500 private void disconnectTransport(Promise<Void> promise) {
501 closeTransport(promise);
502 }
503
504 private void closeTransport(final Promise<Void> promise) {
505 if (!promise.setUncancellable()) {
506 return;
507 }
508 if (closeInitiated) {
509 if (closePromise.isDone()) {
510
511 promise.setSuccess(null);
512 } else {
513
514 closeFuture().addListener(promise, (p, future) -> p.setSuccess(null));
515 }
516 return;
517 }
518 closeInitiated = true;
519
520 readCompletePending = false;
521
522 final boolean wasActive = isActive();
523
524
525
526
527
528
529 if (parent().isActive() && !readEOS && isStreamIdValid(stream.id())) {
530 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
531 writeTransport(resetFrame, newPromise());
532 flush();
533 }
534
535 if (inboundBuffer != null) {
536 for (;;) {
537 Object msg = inboundBuffer.poll();
538 if (msg == null) {
539 break;
540 }
541 Resource.dispose(msg);
542 }
543 inboundBuffer = null;
544 }
545
546
547 outputShutdown = true;
548 closePromise.setSuccess(null);
549 promise.setSuccess(null);
550
551 fireChannelInactiveAndDeregister(newPromise(), wasActive);
552 }
553
554 private void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise) {
555 if (!promise.setUncancellable()) {
556 return;
557 }
558 if (!isActive()) {
559 if (isOpen()) {
560 promise.setFailure(new NotYetConnectedException());
561 } else {
562 promise.setFailure(new ClosedChannelException());
563 }
564 return;
565 }
566 if (isShutdown(direction)) {
567
568 promise.setSuccess(null);
569 return;
570 }
571 boolean fireEvent = false;
572 switch (direction) {
573 case Outbound:
574 fireEvent = shutdownOutput(true, promise);
575 break;
576 case Inbound:
577 try {
578 inputShutdown = true;
579 promise.setSuccess(null);
580 fireEvent = true;
581 } catch (Throwable cause) {
582 promise.setFailure(cause);
583 }
584 break;
585 default:
586
587 promise.setFailure(new IllegalStateException());
588 break;
589 }
590 if (fireEvent) {
591 pipeline().fireChannelShutdown(direction);
592 }
593 }
594
595 private boolean shutdownOutput(boolean writeFrame, Promise<Void> promise) {
596 if (!promise.setUncancellable()) {
597 return false;
598 }
599 if (isShutdown(ChannelShutdownDirection.Outbound)) {
600
601 promise.setSuccess(null);
602 return false;
603 }
604 if (writeFrame) {
605
606 writeTransport(new DefaultHttp2HeadersFrame(EmptyHttp2Headers.INSTANCE, true), newPromise());
607 }
608 outputShutdown = true;
609 promise.setSuccess(null);
610 return true;
611 }
612
613 void closeForcibly() {
614 closeTransport(newPromise());
615 }
616
617 private void deregisterTransport(Promise<Void> promise) {
618 fireChannelInactiveAndDeregister(promise, false);
619 }
620
621 private void fireChannelInactiveAndDeregister(Promise<Void> promise,
622 final boolean fireChannelInactive) {
623 if (!promise.setUncancellable()) {
624 return;
625 }
626
627 if (!registered) {
628 promise.setSuccess(null);
629 return;
630 }
631
632
633
634
635
636
637
638
639 invokeLater(()-> {
640 if (fireChannelInactive) {
641 pipeline.fireChannelInactive();
642 }
643
644
645 if (registered) {
646 registered = false;
647 pipeline.fireChannelUnregistered();
648 }
649 safeSetSuccess(promise);
650 });
651 }
652
653 private void safeSetSuccess(Promise<Void> promise) {
654 if (!promise.trySuccess(null)) {
655 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
656 }
657 }
658
659 private void invokeLater(Runnable task) {
660 try {
661
662
663
664
665
666
667
668
669
670
671
672 executor().execute(task);
673 } catch (RejectedExecutionException e) {
674 logger.warn("Can't invoke task later as EventLoop rejected it", e);
675 }
676 }
677
678 private void readTransport() {
679 if (!isActive()) {
680 return;
681 }
682 updateLocalWindowIfNeeded();
683
684 switch (readStatus) {
685 case IDLE:
686 readStatus = ReadStatus.IN_PROGRESS;
687 doBeginRead();
688 break;
689 case IN_PROGRESS:
690 readStatus = ReadStatus.REQUESTED;
691 break;
692 default:
693 break;
694 }
695 }
696
697 private Object pollQueuedMessage() {
698 return inboundBuffer == null ? null : inboundBuffer.poll();
699 }
700
701 void doBeginRead() {
702
703 while (readStatus != ReadStatus.IDLE) {
704 Object message = pollQueuedMessage();
705 if (message == null) {
706 if (readEOS) {
707 closeForcibly();
708 }
709
710
711 flush();
712 break;
713 }
714 final RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
715 allocHandle.reset();
716 boolean continueReading = false;
717 do {
718 doRead0((Http2Frame) message, allocHandle);
719 } while ((readEOS || (continueReading = allocHandle.continueReading(isAutoRead())))
720 && (message = pollQueuedMessage()) != null);
721
722 if (continueReading && handler.isParentReadInProgress() && !readEOS) {
723
724
725
726
727 maybeAddChannelToReadCompletePendingQueue();
728 } else {
729 notifyReadComplete(allocHandle, true);
730 }
731 }
732 }
733
734 void readEOS() {
735 readEOS = true;
736 }
737
738 private void updateLocalWindowIfNeeded() {
739 if (flowControlledBytes != 0) {
740 int bytes = flowControlledBytes;
741 flowControlledBytes = 0;
742 Future<Void> future = handler.parentContext()
743 .write(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
744
745
746
747 writeDoneAndNoFlush = true;
748
749
750
751
752 if (future.isDone()) {
753 windowUpdateFrameWriteComplete(DefaultHttp2StreamChannel.this, future);
754 } else {
755 future.addListener(DefaultHttp2StreamChannel.this,
756 DefaultHttp2StreamChannel::windowUpdateFrameWriteComplete);
757 }
758 }
759 }
760
761 void notifyReadComplete(RecvBufferAllocator.Handle allocHandle, boolean forceReadComplete) {
762 if (!readCompletePending && !forceReadComplete) {
763 return;
764 }
765
766 readCompletePending = false;
767
768 if (readStatus == ReadStatus.REQUESTED) {
769 readStatus = ReadStatus.IN_PROGRESS;
770 } else {
771 readStatus = ReadStatus.IDLE;
772 }
773
774 allocHandle.readComplete();
775 pipeline().fireChannelReadComplete();
776 if (isAutoRead()) {
777 read();
778 }
779
780
781
782
783 flush();
784 if (readEOS) {
785 closeForcibly();
786 }
787 }
788
789 private void doRead0(Http2Frame frame, RecvBufferAllocator.Handle allocHandle) {
790 if (isShutdown(ChannelShutdownDirection.Inbound)) {
791
792
793 if (frame instanceof Http2DataFrame || frame instanceof Http2HeadersFrame) {
794 Resource.dispose(frame);
795 return;
796 }
797 }
798 final int bytes;
799 if (frame instanceof Http2DataFrame) {
800 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
801
802
803
804
805
806
807 flowControlledBytes += bytes;
808 } else {
809 bytes = MIN_HTTP2_FRAME_SIZE;
810 }
811
812 allocHandle.attemptedBytesRead(bytes);
813 allocHandle.lastBytesRead(bytes);
814 allocHandle.incMessagesRead(1);
815
816 boolean shutdownInput = isShutdownNeeded(frame);
817 pipeline().fireChannelRead(frame);
818 if (shutdownInput) {
819 shutdownTransport(ChannelShutdownDirection.Inbound, newPromise());
820 }
821 }
822
823 private boolean isShutdownNeeded(Http2Frame frame) {
824 if (frame instanceof Http2HeadersFrame) {
825 return ((Http2HeadersFrame) frame).isEndStream();
826 }
827 if (frame instanceof Http2DataFrame) {
828 return ((Http2DataFrame) frame).isEndStream();
829 }
830 return false;
831 }
832
833 private void writeTransport(Object msg, Promise<Void> promise) {
834
835 if (!promise.setUncancellable()) {
836 Resource.dispose(msg);
837 return;
838 }
839
840 if (!isActive()) {
841 Resource.dispose(msg);
842 promise.setFailure(new ClosedChannelException());
843 return;
844 }
845
846
847 if (isShutdown(ChannelShutdownDirection.Outbound)
848 && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
849 Resource.dispose(msg);
850 promise.setFailure(newOutputShutdownException(
851 DefaultHttp2StreamChannel.class, "writeTransport(Object, Promise)"));
852 return;
853 }
854
855 try {
856 if (msg instanceof Http2StreamFrame) {
857 Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
858 boolean shutdownOutput = isShutdownNeeded(frame);
859 writeHttp2StreamFrame(frame, promise);
860 if (shutdownOutput) {
861 if (shutdownOutput(false, newPromise())) {
862 pipeline().fireChannelShutdown(ChannelShutdownDirection.Outbound);
863 }
864 }
865 } else {
866 String msgStr = msg.toString();
867 Resource.dispose(msg);
868 promise.setFailure(new IllegalArgumentException(
869 "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
870 ": " + msgStr));
871 }
872 } catch (Throwable t) {
873 promise.tryFailure(t);
874 }
875 }
876
877 private void writeHttp2StreamFrame(Http2StreamFrame frame, Promise<Void> promise) {
878 if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
879 Resource.dispose(frame);
880 promise.setFailure(
881 new IllegalArgumentException("The first frame must be a headers frame. Was: "
882 + frame.name()));
883 return;
884 }
885
886 final boolean firstWrite;
887 if (firstFrameWritten) {
888 firstWrite = false;
889 } else {
890 firstWrite = firstFrameWritten = true;
891 }
892
893 Future<Void> f = handler.parentContext().write(frame);
894 if (f.isDone()) {
895 if (firstWrite) {
896 firstWriteComplete(f, promise);
897 } else {
898 writeComplete(f, promise);
899 }
900 } else {
901 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
902 incrementPendingOutboundBytes(bytes, false);
903 f.addListener(future -> {
904 if (firstWrite) {
905 firstWriteComplete(future, promise);
906 } else {
907 writeComplete(future, promise);
908 }
909 decrementPendingOutboundBytes(bytes, false);
910 });
911 writeDoneAndNoFlush = true;
912 }
913 }
914
915 private void firstWriteComplete(Future<?> future, Promise<Void> promise) {
916 Throwable cause = future.cause();
917 if (cause == null) {
918 promise.setSuccess(null);
919 } else {
920
921 closeForcibly();
922 promise.setFailure(wrapStreamClosedError(cause));
923 }
924 }
925
926 private void writeComplete(Future<?> future, Promise<Void> promise) {
927 Throwable cause = future.cause();
928 if (cause == null) {
929 promise.setSuccess(null);
930 } else {
931 Throwable error = wrapStreamClosedError(cause);
932
933 if (error instanceof IOException) {
934 if (isAutoClose()) {
935
936 closeForcibly();
937 } else {
938 shutdownTransport(ChannelShutdownDirection.Outbound, newPromise());
939 }
940 }
941 promise.setFailure(error);
942 }
943 }
944
945 private Throwable wrapStreamClosedError(Throwable cause) {
946
947
948 if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
949 return new ClosedChannelException().initCause(cause);
950 }
951 return cause;
952 }
953
954 private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
955 if (frame.stream() != null && frame.stream() != stream) {
956 String msgString = frame.toString();
957 Resource.dispose(frame);
958 throw new IllegalArgumentException(
959 "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
960 }
961 return frame;
962 }
963
964 private void flushTransport() {
965
966
967
968
969 if (!writeDoneAndNoFlush || handler.isParentReadInProgress()) {
970
971 return;
972 }
973
974
975 writeDoneAndNoFlush = false;
976 handler.parentContext().flush();
977 }
978
979 private void sendOutboundEventTransport(Object event, Promise<Void> promise) {
980 Resource.dispose(event);
981 promise.setSuccess(null);
982 }
983
984 @Override
985 public BufferAllocator bufferAllocator() {
986 return bufferAllocator;
987 }
988
989 private <T> void validate(ChannelOption<T> option, T value) {
990 requireNonNull(option, "option");
991 option.validate(value);
992 }
993
994 @Override
995 @SuppressWarnings({ "unchecked", "deprecation" })
996 public <T> T getOption(ChannelOption<T> option) {
997 requireNonNull(option, "option");
998 if (option == AUTO_READ) {
999 return (T) Boolean.valueOf(isAutoRead());
1000 }
1001 if (option == WRITE_BUFFER_WATER_MARK) {
1002 return (T) getWriteBufferWaterMark();
1003 }
1004 if (option == CONNECT_TIMEOUT_MILLIS) {
1005 return (T) Integer.valueOf(getConnectTimeoutMillis());
1006 }
1007 if (option == MAX_MESSAGES_PER_READ) {
1008 return (T) Integer.valueOf(getMaxMessagesPerRead());
1009 }
1010 if (option == WRITE_SPIN_COUNT) {
1011 return (T) Integer.valueOf(getWriteSpinCount());
1012 }
1013 if (option == BUFFER_ALLOCATOR) {
1014 return (T) getBufferAllocator();
1015 }
1016 if (option == RCVBUFFER_ALLOCATOR) {
1017 return getRecvBufferAllocator();
1018 }
1019 if (option == AUTO_CLOSE) {
1020 return (T) Boolean.valueOf(isAutoClose());
1021 }
1022 if (option == MESSAGE_SIZE_ESTIMATOR) {
1023 return (T) getMessageSizeEstimator();
1024 }
1025 if (option == MAX_MESSAGES_PER_WRITE) {
1026 return (T) Integer.valueOf(getMaxMessagesPerWrite());
1027 }
1028 if (option == ALLOW_HALF_CLOSURE) {
1029 return (T) Boolean.valueOf(isAllowHalfClosure());
1030 }
1031
1032 return null;
1033 }
1034
1035 @Override
1036 @SuppressWarnings("deprecation")
1037 public <T> Channel setOption(ChannelOption<T> option, T value) {
1038 validate(option, value);
1039
1040 if (option == AUTO_READ) {
1041 setAutoRead((Boolean) value);
1042 } else if (option == WRITE_BUFFER_WATER_MARK) {
1043 setWriteBufferWaterMark((WriteBufferWaterMark) value);
1044 } else if (option == CONNECT_TIMEOUT_MILLIS) {
1045 setConnectTimeoutMillis((Integer) value);
1046 } else if (option == MAX_MESSAGES_PER_READ) {
1047 setMaxMessagesPerRead((Integer) value);
1048 } else if (option == WRITE_SPIN_COUNT) {
1049 setWriteSpinCount((Integer) value);
1050 } else if (option == BUFFER_ALLOCATOR) {
1051 setBufferAllocator((BufferAllocator) value);
1052 } else if (option == RCVBUFFER_ALLOCATOR) {
1053 setRecvBufferAllocator((RecvBufferAllocator) value);
1054 } else if (option == AUTO_CLOSE) {
1055 setAutoClose((Boolean) value);
1056 } else if (option == MESSAGE_SIZE_ESTIMATOR) {
1057 return this;
1058 } else if (option == MAX_MESSAGES_PER_WRITE) {
1059 setMaxMessagesPerWrite((Integer) value);
1060 } else if (option == ALLOW_HALF_CLOSURE) {
1061 setAllowHalfClosure((Boolean) value);
1062 }
1063 return this;
1064 }
1065
1066 @Override
1067 public boolean isOptionSupported(ChannelOption<?> option) {
1068 return option == AUTO_READ || option == WRITE_BUFFER_WATER_MARK || option == CONNECT_TIMEOUT_MILLIS ||
1069 option == MAX_MESSAGES_PER_READ || option == WRITE_SPIN_COUNT || option == BUFFER_ALLOCATOR ||
1070 option == RCVBUFFER_ALLOCATOR || option == AUTO_CLOSE || option == MESSAGE_SIZE_ESTIMATOR ||
1071 option == MAX_MESSAGES_PER_WRITE || option == ALLOW_HALF_CLOSURE;
1072 }
1073
1074 private int getConnectTimeoutMillis() {
1075 return connectTimeoutMillis;
1076 }
1077
1078 private void setConnectTimeoutMillis(int connectTimeoutMillis) {
1079 checkPositiveOrZero(connectTimeoutMillis, "connectTimeoutMillis");
1080 this.connectTimeoutMillis = connectTimeoutMillis;
1081 }
1082
1083
1084
1085
1086
1087 @Deprecated
1088 private int getMaxMessagesPerRead() {
1089 try {
1090 MaxMessagesRecvBufferAllocator allocator = getRecvBufferAllocator();
1091 return allocator.maxMessagesPerRead();
1092 } catch (ClassCastException e) {
1093 throw new IllegalStateException("getRecvBufferAllocator() must return an object of type " +
1094 "MaxMessagesRecvBufferAllocator", e);
1095 }
1096 }
1097
1098
1099
1100
1101
1102 @Deprecated
1103 private void setMaxMessagesPerRead(int maxMessagesPerRead) {
1104 try {
1105 MaxMessagesRecvBufferAllocator allocator = getRecvBufferAllocator();
1106 allocator.maxMessagesPerRead(maxMessagesPerRead);
1107 } catch (ClassCastException e) {
1108 throw new IllegalStateException("getRecvBufferAllocator() must return an object of type " +
1109 "MaxMessagesRecvBufferAllocator", e);
1110 }
1111 }
1112
1113
1114
1115
1116
1117 private int getMaxMessagesPerWrite() {
1118 return maxMessagesPerWrite;
1119 }
1120
1121
1122
1123
1124
1125 private void setMaxMessagesPerWrite(int maxMessagesPerWrite) {
1126 this.maxMessagesPerWrite = checkPositive(maxMessagesPerWrite, "maxMessagesPerWrite");
1127 }
1128
1129 private int getWriteSpinCount() {
1130 return writeSpinCount;
1131 }
1132
1133 private void setWriteSpinCount(int writeSpinCount) {
1134 checkPositive(writeSpinCount, "writeSpinCount");
1135
1136
1137
1138
1139 if (writeSpinCount == Integer.MAX_VALUE) {
1140 --writeSpinCount;
1141 }
1142 this.writeSpinCount = writeSpinCount;
1143 }
1144
1145 private BufferAllocator getBufferAllocator() {
1146 return bufferAllocator;
1147 }
1148
1149 public void setBufferAllocator(BufferAllocator bufferAllocator) {
1150 requireNonNull(bufferAllocator, "bufferAllocator");
1151 this.bufferAllocator = bufferAllocator;
1152 }
1153
1154 @SuppressWarnings("unchecked")
1155 private <T extends RecvBufferAllocator> T getRecvBufferAllocator() {
1156 return (T) rcvBufAllocator;
1157 }
1158
1159 private void setRecvBufferAllocator(RecvBufferAllocator allocator) {
1160 rcvBufAllocator = requireNonNull(allocator, "allocator");
1161 }
1162
1163
1164
1165
1166
1167
1168
1169 private void setRecvBufferAllocator(RecvBufferAllocator allocator, ChannelMetadata metadata) {
1170 requireNonNull(allocator, "allocator");
1171 requireNonNull(metadata, "metadata");
1172 if (allocator instanceof MaxMessagesRecvBufferAllocator) {
1173 ((MaxMessagesRecvBufferAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
1174 }
1175 setRecvBufferAllocator(allocator);
1176 }
1177
1178 private boolean isAutoRead() {
1179 return autoRead == 1;
1180 }
1181
1182 private void setAutoRead(boolean autoRead) {
1183 boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
1184 if (autoRead && !oldAutoRead) {
1185 read();
1186 } else if (!autoRead && oldAutoRead) {
1187 autoReadCleared();
1188 }
1189 }
1190
1191
1192
1193
1194
1195 protected void autoReadCleared() { }
1196
1197 private WriteBufferWaterMark getWriteBufferWaterMark() {
1198 return writeBufferWaterMark;
1199 }
1200
1201 private boolean isAutoClose() {
1202 return autoClose;
1203 }
1204
1205 private void setAutoClose(boolean autoClose) {
1206 this.autoClose = autoClose;
1207 }
1208
1209 private int getWriteBufferHighWaterMark() {
1210 return writeBufferWaterMark.high();
1211 }
1212
1213 private void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
1214 checkPositiveOrZero(writeBufferHighWaterMark, "writeBufferHighWaterMark");
1215 for (;;) {
1216 WriteBufferWaterMark waterMark = writeBufferWaterMark;
1217 if (writeBufferHighWaterMark < waterMark.low()) {
1218 throw new IllegalArgumentException(
1219 "writeBufferHighWaterMark cannot be less than " +
1220 "writeBufferLowWaterMark (" + waterMark.low() + "): " +
1221 writeBufferHighWaterMark);
1222 }
1223 if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
1224 new WriteBufferWaterMark(waterMark.low(), writeBufferHighWaterMark))) {
1225 return;
1226 }
1227 }
1228 }
1229
1230 private int getWriteBufferLowWaterMark() {
1231 return writeBufferWaterMark.low();
1232 }
1233
1234 private void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
1235 checkPositiveOrZero(writeBufferLowWaterMark, "writeBufferLowWaterMark");
1236 for (;;) {
1237 WriteBufferWaterMark waterMark = writeBufferWaterMark;
1238 if (writeBufferLowWaterMark > waterMark.high()) {
1239 throw new IllegalArgumentException(
1240 "writeBufferLowWaterMark cannot be greater than " +
1241 "writeBufferHighWaterMark (" + waterMark.high() + "): " +
1242 writeBufferLowWaterMark);
1243 }
1244 if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
1245 new WriteBufferWaterMark(writeBufferLowWaterMark, waterMark.high()))) {
1246 return;
1247 }
1248 }
1249 }
1250
1251 private void setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
1252 this.writeBufferWaterMark = requireNonNull(writeBufferWaterMark, "writeBufferWaterMark");
1253 }
1254
1255 private MessageSizeEstimator getMessageSizeEstimator() {
1256 return FlowControlledFrameSizeEstimator.INSTANCE;
1257 }
1258
1259 private boolean isAllowHalfClosure() {
1260 return allowHalfClosure;
1261 }
1262
1263 private void setAllowHalfClosure(boolean allowHalfClosure) {
1264 this.allowHalfClosure = allowHalfClosure;
1265 }
1266
1267 private void maybeAddChannelToReadCompletePendingQueue() {
1268 if (!readCompletePending) {
1269 readCompletePending = true;
1270 handler.addChannelToReadCompletePendingQueue(this);
1271 }
1272 }
1273
1274
1275
1276
1277 private static ChannelOutputShutdownException newOutputShutdownException(Class<?> clazz, String method) {
1278 return ThrowableUtil.unknownStackTrace(new ChannelOutputShutdownException() {
1279 @Override
1280 public Throwable fillInStackTrace() {
1281
1282 return this;
1283 }
1284 }, clazz, method);
1285 }
1286
1287 private static final class DefaultHttp2ChannelPipeline extends DefaultChannelPipeline {
1288 DefaultHttp2ChannelPipeline(Channel channel) {
1289 super(channel);
1290 }
1291
1292 private DefaultHttp2StreamChannel defaultHttp2StreamChannel() {
1293 return (DefaultHttp2StreamChannel) channel();
1294 }
1295
1296 @Override
1297 protected EventExecutor transportExecutor() {
1298 return defaultHttp2StreamChannel().executor();
1299 }
1300
1301 @Override
1302 protected void pendingOutboundBytesUpdated(long pendingOutboundBytes) {
1303
1304 }
1305
1306 @Override
1307 protected void registerTransport(Promise<Void> promise) {
1308 defaultHttp2StreamChannel().registerTransport(promise);
1309 }
1310
1311 @Override
1312 protected void bindTransport(SocketAddress localAddress, Promise<Void> promise) {
1313 defaultHttp2StreamChannel().bindTransport(localAddress, promise);
1314 }
1315
1316 @Override
1317 protected void connectTransport(
1318 SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise) {
1319 defaultHttp2StreamChannel().connectTransport(remoteAddress, localAddress, promise);
1320 }
1321
1322 @Override
1323 protected void disconnectTransport(Promise<Void> promise) {
1324 defaultHttp2StreamChannel().disconnectTransport(promise);
1325 }
1326
1327 @Override
1328 protected void closeTransport(Promise<Void> promise) {
1329 defaultHttp2StreamChannel().closeTransport(promise);
1330 }
1331
1332 @Override
1333 protected void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise) {
1334 defaultHttp2StreamChannel().shutdownTransport(direction, promise);
1335 }
1336
1337 @Override
1338 protected void deregisterTransport(Promise<Void> promise) {
1339 defaultHttp2StreamChannel().deregisterTransport(promise);
1340 }
1341
1342 @Override
1343 protected void readTransport() {
1344 defaultHttp2StreamChannel().readTransport();
1345 }
1346
1347 @Override
1348 protected void writeTransport(Object msg, Promise<Void> promise) {
1349 defaultHttp2StreamChannel().writeTransport(msg, promise);
1350 }
1351
1352 @Override
1353 protected void flushTransport() {
1354 defaultHttp2StreamChannel().flushTransport();
1355 }
1356
1357 @Override
1358 protected void sendOutboundEventTransport(Object event, Promise<Void> promise) {
1359 defaultHttp2StreamChannel().sendOutboundEventTransport(event, promise);
1360 }
1361 }
1362 }