1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty5.handler.codec.http2;
16
17 import io.netty5.buffer.BufferUtil;
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.channel.ChannelHandlerContext;
21 import io.netty5.channel.ChannelOption;
22 import io.netty5.handler.codec.ByteToMessageDecoder;
23 import io.netty5.handler.codec.http.HttpResponseStatus;
24 import io.netty5.handler.codec.http2.Http2Exception.CompositeStreamException;
25 import io.netty5.handler.codec.http2.Http2Exception.StreamException;
26 import io.netty5.util.concurrent.Future;
27 import io.netty5.util.concurrent.FutureListener;
28 import io.netty5.util.concurrent.Promise;
29 import io.netty5.util.internal.UnstableApi;
30 import io.netty5.util.internal.logging.InternalLogger;
31 import io.netty5.util.internal.logging.InternalLoggerFactory;
32
33 import java.nio.charset.StandardCharsets;
34 import java.util.concurrent.TimeUnit;
35
36 import static io.netty5.channel.ChannelFutureListeners.CLOSE_ON_FAILURE;
37 import static io.netty5.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
38 import static io.netty5.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuffer;
39 import static io.netty5.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
40 import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
41 import static io.netty5.handler.codec.http2.Http2Error.NO_ERROR;
42 import static io.netty5.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
43 import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
44 import static io.netty5.handler.codec.http2.Http2Exception.isStreamError;
45 import static io.netty5.handler.codec.http2.Http2FrameTypes.SETTINGS;
46 import static io.netty5.handler.codec.http2.Http2Stream.State.IDLE;
47 import static io.netty5.util.CharsetUtil.UTF_8;
48 import static java.lang.Math.min;
49 import static java.util.Objects.requireNonNull;
50 import static java.util.concurrent.TimeUnit.MILLISECONDS;
51
52
53
54
55
56
57
58
59
60
61 @UnstableApi
62 public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager {
63
64 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
65
66 private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
67 HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
68 private static final Buffer HTTP_1_X_BUF =
69 BufferAllocator.offHeapUnpooled().copyOf(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'}).makeReadOnly();
70
71 private final Http2ConnectionDecoder decoder;
72 private final Http2ConnectionEncoder encoder;
73 private final Http2Settings initialSettings;
74 private final boolean decoupleCloseAndGoAway;
75 private final boolean flushPreface;
76 private FutureListener<Object> closeListener;
77 private BaseDecoder byteDecoder;
78 private long gracefulShutdownTimeoutMillis;
79
80 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
81 Http2Settings initialSettings) {
82 this(decoder, encoder, initialSettings, false);
83 }
84
85 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
86 Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
87 this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
88 }
89
90 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
91 Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
92 boolean flushPreface) {
93 this.initialSettings = requireNonNull(initialSettings, "initialSettings");
94 this.decoder = requireNonNull(decoder, "decoder");
95 this.encoder = requireNonNull(encoder, "encoder");
96 this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
97 this.flushPreface = flushPreface;
98 if (encoder.connection() != decoder.connection()) {
99 throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
100 }
101 }
102
103
104
105
106
107
108 public long gracefulShutdownTimeoutMillis() {
109 return gracefulShutdownTimeoutMillis;
110 }
111
112
113
114
115
116
117
118 public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
119 if (gracefulShutdownTimeoutMillis < -1) {
120 throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
121 " (expected: -1 for indefinite or >= 0)");
122 }
123 this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
124 }
125
126 public Http2Connection connection() {
127 return encoder.connection();
128 }
129
130 public Http2ConnectionDecoder decoder() {
131 return decoder;
132 }
133
134 public Http2ConnectionEncoder encoder() {
135 return encoder;
136 }
137
138 private boolean prefaceSent() {
139 return byteDecoder != null && byteDecoder.prefaceSent();
140 }
141
142
143
144
145
146 public void onHttpClientUpgrade() throws Http2Exception {
147 if (connection().isServer()) {
148 throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
149 }
150 if (!prefaceSent()) {
151
152
153 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
154 }
155 if (decoder.prefaceReceived()) {
156 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
157 }
158
159
160 connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
161 }
162
163
164
165
166
167 public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
168 if (!connection().isServer()) {
169 throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
170 }
171 if (!prefaceSent()) {
172
173
174 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
175 }
176 if (decoder.prefaceReceived()) {
177 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
178 }
179
180
181 encoder.remoteSettings(settings);
182
183
184 connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
185 }
186
187 @Override
188 public void flush(ChannelHandlerContext ctx) {
189 try {
190
191 encoder.flowController().writePendingBytes();
192 ctx.flush();
193 } catch (Http2Exception e) {
194 onError(ctx, true, e);
195 } catch (Throwable cause) {
196 onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
197 }
198 }
199
200 private abstract class BaseDecoder {
201 public abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;
202 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
203 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
204
205 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
206
207 encoder().close();
208 decoder().close();
209
210
211
212 connection().close(ctx.newPromise());
213 }
214
215
216
217
218 public boolean prefaceSent() {
219 return true;
220 }
221 }
222
223 private final class PrefaceDecoder extends BaseDecoder {
224 private Buffer clientPrefaceString;
225 private boolean prefaceSent;
226
227 PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
228 clientPrefaceString = clientPrefaceString(encoder.connection());
229
230
231 sendPreface(ctx);
232 }
233
234 @Override
235 public boolean prefaceSent() {
236 return prefaceSent;
237 }
238
239 @Override
240 public void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
241 try {
242 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
243
244 byteDecoder = new FrameDecoder();
245 byteDecoder.decode(ctx, in);
246 }
247 } catch (Throwable e) {
248 onError(ctx, false, e);
249 }
250 }
251
252 @Override
253 public void channelActive(ChannelHandlerContext ctx) throws Exception {
254
255 sendPreface(ctx);
256
257 if (flushPreface) {
258
259
260
261 ctx.flush();
262 }
263 }
264
265 @Override
266 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
267 cleanup();
268 super.channelInactive(ctx);
269 }
270
271
272
273
274 @Override
275 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
276 cleanup();
277 }
278
279
280
281
282 private void cleanup() {
283 if (clientPrefaceString != null) {
284 clientPrefaceString.close();
285 clientPrefaceString = null;
286 }
287 }
288
289
290
291
292
293
294
295 private boolean readClientPrefaceString(Buffer in) throws Http2Exception {
296 if (clientPrefaceString == null) {
297 return true;
298 }
299
300 int prefaceRemaining = clientPrefaceString.readableBytes();
301 int bytesRead = min(in.readableBytes(), prefaceRemaining);
302
303
304 if (bytesRead == 0 || !BufferUtil.equals(in, in.readerOffset(),
305 clientPrefaceString, clientPrefaceString.readerOffset(),
306 bytesRead)) {
307 int maxSearch = 1024;
308 in.makeReadOnly();
309 try (Buffer preface = in.copy(in.readerOffset(), min(in.readableBytes(), maxSearch), true)) {
310 int http1Index = preface.bytesBefore(HTTP_1_X_BUF);
311 if (http1Index != -1) {
312 try (Buffer chunk = preface.readSplit(http1Index)) {
313 throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s",
314 chunk.toString(StandardCharsets.US_ASCII));
315 }
316 }
317 }
318 String receivedBytes = BufferUtil.hexDump(in, in.readerOffset(),
319 min(in.readableBytes(), clientPrefaceString.readableBytes()));
320 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
321 "Hex dump for received bytes: %s", receivedBytes);
322 }
323 in.skipReadableBytes(bytesRead);
324 clientPrefaceString.skipReadableBytes(bytesRead);
325
326 if (clientPrefaceString.readableBytes() == 0) {
327
328 clientPrefaceString.close();
329 clientPrefaceString = null;
330 return true;
331 }
332 return false;
333 }
334
335
336
337
338
339
340
341
342
343 private boolean verifyFirstFrameIsSettings(Buffer in) throws Http2Exception {
344 if (in.readableBytes() < 5) {
345
346 return false;
347 }
348
349 int frameType = in.getUnsignedByte(in.readerOffset() + 3);
350 int flags = in.getUnsignedByte(in.readerOffset() + 4);
351 if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
352 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
353 "Hex dump for first 5 bytes: %s",
354 BufferUtil.hexDump(in, in.readerOffset(), 5));
355 }
356 return true;
357 }
358
359
360
361
362 private void sendPreface(ChannelHandlerContext ctx) throws Exception {
363 if (prefaceSent || !ctx.channel().isActive()) {
364 return;
365 }
366
367 prefaceSent = true;
368
369 final boolean isClient = !connection().isServer();
370 if (isClient) {
371
372 ctx.write(connectionPrefaceBuffer()).addListener(ctx.channel(), CLOSE_ON_FAILURE);
373 }
374
375
376 encoder.writeSettings(ctx, initialSettings)
377 .addListener(ctx.channel(), CLOSE_ON_FAILURE);
378
379 if (isClient) {
380
381
382
383 channelInboundEvent(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
384 }
385 }
386 }
387
388 private final class FrameDecoder extends BaseDecoder {
389 @Override
390 public void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
391 try {
392 decoder.decodeFrame(ctx, in);
393 } catch (Throwable e) {
394 onError(ctx, false, e);
395 }
396 }
397 }
398
399 @Override
400 public void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
401
402 encoder.lifecycleManager(this);
403 decoder.lifecycleManager(this);
404 encoder.flowController().channelHandlerContext(ctx);
405 decoder.flowController().channelHandlerContext(ctx);
406 byteDecoder = new PrefaceDecoder(ctx);
407 }
408
409 @Override
410 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
411 if (byteDecoder != null) {
412 byteDecoder.handlerRemoved(ctx);
413 byteDecoder = null;
414 }
415 }
416
417 @Override
418 public void channelActive(ChannelHandlerContext ctx) throws Exception {
419 if (byteDecoder == null) {
420 byteDecoder = new PrefaceDecoder(ctx);
421 }
422 byteDecoder.channelActive(ctx);
423 super.channelActive(ctx);
424 }
425
426 @Override
427 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
428
429 super.channelInactive(ctx);
430 if (byteDecoder != null) {
431 byteDecoder.channelInactive(ctx);
432 byteDecoder = null;
433 }
434 }
435
436 @Override
437 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
438
439
440 try {
441 if (ctx.channel().isWritable()) {
442 flush(ctx);
443 }
444 encoder.flowController().channelWritabilityChanged();
445 } finally {
446 super.channelWritabilityChanged(ctx);
447 }
448 }
449
450 @Override
451 protected void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
452 byteDecoder.decode(ctx, in);
453 }
454
455 @Override
456 public Future<Void> close(ChannelHandlerContext ctx) {
457 if (decoupleCloseAndGoAway) {
458 return ctx.close();
459 }
460
461 if (!ctx.channel().isActive() || !prefaceSent()) {
462 return ctx.close();
463 }
464
465
466
467
468
469
470 Future<Void> f = connection().goAwaySent() ? ctx.write(ctx.bufferAllocator().allocate(0)) : goAway(ctx, null);
471 ctx.flush();
472 Promise<Void> promise = ctx.newPromise();
473 doGracefulShutdown(ctx, f, promise);
474 return promise.asFuture();
475 }
476
477 private FutureListener<Object> newClosingChannelFutureListener(
478 ChannelHandlerContext ctx, Promise<Void> promise) {
479 long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
480 return gracefulShutdownTimeoutMillis < 0 ?
481 new ClosingChannelFutureListener(ctx, promise) :
482 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
483 }
484
485 private void doGracefulShutdown(ChannelHandlerContext ctx, Future<Void> future, final Promise<Void> promise) {
486 FutureListener<Object> listener = newClosingChannelFutureListener(ctx, promise);
487 if (isGracefulShutdownComplete()) {
488
489
490 future.addListener(listener);
491 } else {
492
493
494
495
496 if (closeListener == null) {
497 closeListener = listener;
498 } else if (promise != null) {
499 FutureListener<Object> oldCloseListener = closeListener;
500 closeListener = future1 -> {
501 try {
502 oldCloseListener.operationComplete(future1);
503 } finally {
504 listener.operationComplete(future1);
505 }
506 };
507 }
508 }
509 }
510
511 @Override
512 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
513
514
515 try {
516
517 channelReadComplete0(ctx);
518 } finally {
519 flush(ctx);
520 }
521 }
522
523 final void channelReadComplete0(ChannelHandlerContext ctx) {
524
525 discardSomeReadBytes();
526
527
528
529
530 if (!ctx.channel().getOption(ChannelOption.AUTO_READ)) {
531 ctx.read();
532 }
533
534 ctx.fireChannelReadComplete();
535 }
536
537
538
539
540 @Override
541 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
542 if (getEmbeddedHttp2Exception(cause) != null) {
543
544 onError(ctx, false, cause);
545 } else {
546 super.channelExceptionCaught(ctx, cause);
547 }
548 }
549
550
551
552
553
554
555
556
557 @Override
558 public void closeStreamLocal(Http2Stream stream, Future<Void> future) {
559 switch (stream.state()) {
560 case HALF_CLOSED_LOCAL:
561 case OPEN:
562 stream.closeLocalSide();
563 break;
564 default:
565 closeStream(stream, future);
566 break;
567 }
568 }
569
570
571
572
573
574
575
576
577 @Override
578 public void closeStreamRemote(Http2Stream stream, Future<Void> future) {
579 switch (stream.state()) {
580 case HALF_CLOSED_REMOTE:
581 case OPEN:
582 stream.closeRemoteSide();
583 break;
584 default:
585 closeStream(stream, future);
586 break;
587 }
588 }
589
590 @Override
591 public void closeStream(final Http2Stream stream, Future<Void> future) {
592 stream.close();
593
594 if (future.isDone()) {
595 checkCloseConnection(future);
596 } else {
597 future.addListener(this::checkCloseConnection);
598 }
599 }
600
601
602
603
604 @Override
605 public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
606 Http2Exception embedded = getEmbeddedHttp2Exception(cause);
607 if (isStreamError(embedded)) {
608 onStreamError(ctx, outbound, cause, (StreamException) embedded);
609 } else if (embedded instanceof CompositeStreamException) {
610 CompositeStreamException compositException = (CompositeStreamException) embedded;
611 for (StreamException streamException : compositException) {
612 onStreamError(ctx, outbound, cause, streamException);
613 }
614 } else {
615 onConnectionError(ctx, outbound, cause, embedded);
616 }
617 ctx.flush();
618 }
619
620
621
622
623
624
625 protected boolean isGracefulShutdownComplete() {
626 return connection().numActiveStreams() == 0;
627 }
628
629
630
631
632
633
634
635
636
637
638
639 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
640 Throwable cause, Http2Exception http2Ex) {
641 if (http2Ex == null) {
642 http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
643 }
644
645 Promise<Void> promise = ctx.newPromise();
646 Future<Void> future = goAway(ctx, http2Ex);
647 if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
648 doGracefulShutdown(ctx, future, promise);
649 } else {
650 future.addListener(newClosingChannelFutureListener(ctx, promise));
651 }
652 }
653
654
655
656
657
658
659
660
661
662
663 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
664 @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
665 final int streamId = http2Ex.streamId();
666 Http2Stream stream = connection().stream(streamId);
667
668
669 if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
670 ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
671 connection().isServer()) {
672
673
674
675
676
677
678
679 if (stream == null) {
680 try {
681 stream = encoder.connection().remote().createStream(streamId, true);
682 } catch (Http2Exception e) {
683 resetUnknownStream(ctx, streamId, http2Ex.error().code());
684 return;
685 }
686 }
687
688
689 if (stream != null && !stream.isHeadersSent()) {
690 try {
691 handleServerHeaderDecodeSizeError(ctx, stream);
692 } catch (Throwable cause2) {
693 onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
694 }
695 }
696 }
697
698 if (stream == null) {
699 if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
700 resetUnknownStream(ctx, streamId, http2Ex.error().code());
701 }
702 } else {
703 resetStream(ctx, stream, http2Ex.error().code());
704 }
705 }
706
707
708
709
710
711
712
713
714 protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
715 encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true);
716 }
717
718 protected Http2FrameWriter frameWriter() {
719 return encoder().frameWriter();
720 }
721
722
723
724
725
726
727 private Future<Void> resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode) {
728 Future<Void> future = frameWriter().writeRstStream(ctx, streamId, errorCode);
729 if (future.isDone()) {
730 closeConnectionOnError(ctx, future);
731 } else {
732 future.addListener(ctx, this::closeConnectionOnError);
733 }
734 return future;
735 }
736
737 @Override
738 public Future<Void> resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode) {
739 final Http2Stream stream = connection().stream(streamId);
740 if (stream == null) {
741 return resetUnknownStream(ctx, streamId, errorCode);
742 }
743
744 return resetStream(ctx, stream, errorCode);
745 }
746
747 private Future<Void> resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
748 long errorCode) {
749 if (stream.isResetSent()) {
750
751 return ctx.newSucceededFuture();
752 }
753
754
755
756
757
758 stream.resetSent();
759
760 final Future<Void> future;
761
762
763 if (stream.state() == IDLE ||
764 connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
765 future = ctx.newSucceededFuture();
766 } else {
767 future = frameWriter().writeRstStream(ctx, stream.id(), errorCode);
768 }
769 if (future.isDone()) {
770 processRstStreamWriteResult(ctx, stream, future);
771 } else {
772 future.addListener(future1 -> processRstStreamWriteResult(ctx, stream, future1));
773 }
774
775 return future;
776 }
777
778 @Override
779 public Future<Void> goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
780 final Buffer debugData) {
781 final Http2Connection connection = connection();
782 try {
783 if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
784 debugData.close();
785 return ctx.newSucceededFuture();
786 }
787 } catch (Throwable cause) {
788 debugData.close();
789 return ctx.newFailedFuture(cause);
790 }
791
792
793
794 debugData.makeReadOnly();
795 Buffer debugDataCopy = debugData.copy(true);
796 Future<Void> future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData);
797
798 if (future.isDone()) {
799 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugDataCopy, future);
800 } else {
801 future.addListener(future1 ->
802 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugDataCopy, future1));
803 }
804
805 return future;
806 }
807
808
809
810
811
812 private void checkCloseConnection(Future<?> future) {
813
814
815 if (closeListener != null && isGracefulShutdownComplete()) {
816 FutureListener<Object> closeListener = this.closeListener;
817
818
819 this.closeListener = null;
820 try {
821 closeListener.operationComplete(future);
822 } catch (Exception e) {
823 throw new IllegalStateException("Close listener threw an unexpected exception", e);
824 }
825 }
826 }
827
828
829
830
831
832 private Future<Void> goAway(ChannelHandlerContext ctx, Http2Exception cause) {
833 long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
834 int lastKnownStream;
835 if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
836
837
838
839
840 lastKnownStream = Integer.MAX_VALUE;
841 } else {
842 lastKnownStream = connection().remote().lastStreamCreated();
843 }
844 return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toBuffer(ctx, cause));
845 }
846
847 @SuppressWarnings("unchecked")
848 private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, Future<?> future) {
849 if (future.isSuccess()) {
850 closeStream(stream, (Future<Void>) future);
851 } else {
852
853 onConnectionError(ctx, true, future.cause(), null);
854 }
855 }
856
857 private void closeConnectionOnError(ChannelHandlerContext ctx, Future<?> future) {
858 if (future.isFailed()) {
859 onConnectionError(ctx, true, future.cause(), null);
860 }
861 }
862
863
864
865
866 private static Buffer clientPrefaceString(Http2Connection connection) {
867 return connection.isServer() ? connectionPrefaceBuffer() : null;
868 }
869
870 private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
871 final long errorCode, final Buffer debugData, Future<?> future) {
872 try (debugData) {
873 if (future.isSuccess()) {
874 if (errorCode != NO_ERROR.code()) {
875 if (logger.isDebugEnabled()) {
876 logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
877 "debugData '{}'. Forcing shutdown of the connection.",
878 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
879 }
880 ctx.close();
881 }
882 } else {
883 if (logger.isDebugEnabled()) {
884 logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
885 "debugData '{}'. Forcing shutdown of the connection.",
886 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
887 }
888 ctx.close();
889 }
890 }
891 }
892
893
894
895
896 private static final class ClosingChannelFutureListener implements FutureListener<Object> {
897 private final ChannelHandlerContext ctx;
898 private final Promise<Void> promise;
899 private final Future<?> timeoutTask;
900 private boolean closed;
901
902 ClosingChannelFutureListener(ChannelHandlerContext ctx, Promise<Void> promise) {
903 this.ctx = ctx;
904 this.promise = promise;
905 timeoutTask = null;
906 }
907
908 ClosingChannelFutureListener(final ChannelHandlerContext ctx, final Promise<Void> promise,
909 long timeout, TimeUnit unit) {
910 this.ctx = ctx;
911 this.promise = promise;
912 timeoutTask = ctx.executor().schedule(this::doClose, timeout, unit);
913 }
914
915 @Override
916 public void operationComplete(Future<?> sentGoAwayFuture) {
917 if (timeoutTask != null) {
918 timeoutTask.cancel();
919 }
920 doClose();
921 }
922
923 private void doClose() {
924
925
926 if (closed) {
927
928 assert timeoutTask != null;
929 return;
930 }
931 closed = true;
932
933
934 if (!ctx.isRemoved()) {
935 if (promise == null) {
936 ctx.close();
937 } else {
938 ctx.close().cascadeTo(promise);
939 }
940 } else if (promise != null) {
941 promise.setSuccess(null);
942 }
943 }
944 }
945 }