1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http2;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.handler.codec.http2.Http2Stream.State;
20 import io.netty5.util.collection.IntObjectHashMap;
21 import io.netty5.util.collection.IntObjectMap;
22 import io.netty5.util.collection.IntObjectMap.PrimitiveEntry;
23 import io.netty5.util.concurrent.Promise;
24 import io.netty5.util.internal.EmptyArrays;
25 import io.netty5.util.internal.UnstableApi;
26 import io.netty5.util.internal.logging.InternalLogger;
27 import io.netty5.util.internal.logging.InternalLoggerFactory;
28
29 import java.util.ArrayDeque;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Iterator;
33 import java.util.LinkedHashSet;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.Set;
37
38 import static io.netty5.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
39 import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_RESERVED_STREAMS;
40 import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
41 import static io.netty5.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
42 import static io.netty5.handler.codec.http2.Http2Error.REFUSED_STREAM;
43 import static io.netty5.handler.codec.http2.Http2Exception.closedStreamError;
44 import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
45 import static io.netty5.handler.codec.http2.Http2Exception.streamError;
46 import static io.netty5.handler.codec.http2.Http2Stream.State.CLOSED;
47 import static io.netty5.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
48 import static io.netty5.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
49 import static io.netty5.handler.codec.http2.Http2Stream.State.IDLE;
50 import static io.netty5.handler.codec.http2.Http2Stream.State.OPEN;
51 import static io.netty5.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
52 import static io.netty5.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
53 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
54 import static java.lang.Integer.MAX_VALUE;
55 import static java.util.Objects.requireNonNull;
56
57
58
59
60 @UnstableApi
61 public class DefaultHttp2Connection implements Http2Connection {
62 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
63
64 final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
65 final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry();
66 final ConnectionStream connectionStream = new ConnectionStream();
67 final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
68 final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
69
70
71
72
73
74
75
76
77
78 final List<Listener> listeners = new ArrayList<>(4);
79 final ActiveStreams activeStreams;
80 Promise<Void> closePromise;
81
82
83
84
85
86 public DefaultHttp2Connection(boolean server) {
87 this(server, DEFAULT_MAX_RESERVED_STREAMS);
88 }
89
90
91
92
93
94
95 public DefaultHttp2Connection(boolean server, int maxReservedStreams) {
96 activeStreams = new ActiveStreams(listeners);
97
98
99
100
101
102 localEndpoint = new DefaultEndpoint<>(server, server ? MAX_VALUE : maxReservedStreams);
103 remoteEndpoint = new DefaultEndpoint<>(!server, maxReservedStreams);
104
105
106 streamMap.put(connectionStream.id(), connectionStream);
107 }
108
109
110
111
112 final boolean isClosed() {
113 return closePromise != null;
114 }
115
116 @Override
117 public void close(final Promise<Void> promise) {
118 requireNonNull(promise, "promise");
119
120
121 if (closePromise != null) {
122 if (closePromise == promise) {
123
124 } else {
125 closePromise.asFuture().cascadeTo(promise);
126 }
127 } else {
128 closePromise = promise;
129 }
130 if (isStreamMapEmpty()) {
131 promise.trySuccess(null);
132 return;
133 }
134
135 Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator();
136
137
138 if (activeStreams.allowModifications()) {
139 activeStreams.incrementPendingIterations();
140 try {
141 while (itr.hasNext()) {
142 DefaultStream stream = (DefaultStream) itr.next().value();
143 if (stream.id() != CONNECTION_STREAM_ID) {
144
145
146
147 stream.close(itr);
148 }
149 }
150 } finally {
151 activeStreams.decrementPendingIterations();
152 }
153 } else {
154 while (itr.hasNext()) {
155 Http2Stream stream = itr.next().value();
156 if (stream.id() != CONNECTION_STREAM_ID) {
157
158
159 stream.close();
160 }
161 }
162 }
163 }
164
165 @Override
166 public void addListener(Listener listener) {
167 listeners.add(listener);
168 }
169
170 @Override
171 public void removeListener(Listener listener) {
172 listeners.remove(listener);
173 }
174
175 @Override
176 public boolean isServer() {
177 return localEndpoint.isServer();
178 }
179
180 @Override
181 public Http2Stream connectionStream() {
182 return connectionStream;
183 }
184
185 @Override
186 public Http2Stream stream(int streamId) {
187 return streamMap.get(streamId);
188 }
189
190 @Override
191 public boolean streamMayHaveExisted(int streamId) {
192 return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId);
193 }
194
195 @Override
196 public int numActiveStreams() {
197 return activeStreams.size();
198 }
199
200 @Override
201 public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
202 return activeStreams.forEachActiveStream(visitor);
203 }
204
205 @Override
206 public Endpoint<Http2LocalFlowController> local() {
207 return localEndpoint;
208 }
209
210 @Override
211 public Endpoint<Http2RemoteFlowController> remote() {
212 return remoteEndpoint;
213 }
214
215 @Override
216 public boolean goAwayReceived() {
217 return localEndpoint.lastStreamKnownByPeer >= 0;
218 }
219
220 @Override
221 public void goAwayReceived(final int lastKnownStream, long errorCode, Buffer debugData) throws Http2Exception {
222 if (localEndpoint.lastStreamKnownByPeer() >= 0 && localEndpoint.lastStreamKnownByPeer() < lastKnownStream) {
223 throw connectionError(PROTOCOL_ERROR, "lastStreamId MUST NOT increase. Current value: %d new value: %d",
224 localEndpoint.lastStreamKnownByPeer(), lastKnownStream);
225 }
226
227 localEndpoint.lastStreamKnownByPeer(lastKnownStream);
228 for (int i = 0; i < listeners.size(); ++i) {
229 try {
230 listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData);
231 } catch (Throwable cause) {
232 logger.error("Caught Throwable from listener onGoAwayReceived.", cause);
233 }
234 }
235
236 closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, localEndpoint);
237 }
238
239 @Override
240 public boolean goAwaySent() {
241 return remoteEndpoint.lastStreamKnownByPeer >= 0;
242 }
243
244 @Override
245 public boolean goAwaySent(final int lastKnownStream, long errorCode, Buffer debugData) throws Http2Exception {
246 if (remoteEndpoint.lastStreamKnownByPeer() >= 0) {
247
248
249 if (lastKnownStream == remoteEndpoint.lastStreamKnownByPeer()) {
250 return false;
251 }
252 if (lastKnownStream > remoteEndpoint.lastStreamKnownByPeer()) {
253 throw connectionError(PROTOCOL_ERROR, "Last stream identifier must not increase between " +
254 "sending multiple GOAWAY frames (was '%d', is '%d').",
255 remoteEndpoint.lastStreamKnownByPeer(), lastKnownStream);
256 }
257 }
258
259 remoteEndpoint.lastStreamKnownByPeer(lastKnownStream);
260 for (int i = 0; i < listeners.size(); ++i) {
261 try {
262 listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData);
263 } catch (Throwable cause) {
264 logger.error("Caught Throwable from listener onGoAwaySent.", cause);
265 }
266 }
267
268 closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, remoteEndpoint);
269 return true;
270 }
271
272 private void closeStreamsGreaterThanLastKnownStreamId(final int lastKnownStream,
273 final DefaultEndpoint<?> endpoint) throws Http2Exception {
274 forEachActiveStream(stream -> {
275 if (stream.id() > lastKnownStream && endpoint.isValidStreamId(stream.id())) {
276 stream.close();
277 }
278 return true;
279 });
280 }
281
282
283
284
285 private boolean isStreamMapEmpty() {
286 return streamMap.size() == 1;
287 }
288
289
290
291
292
293
294
295 void removeStream(DefaultStream stream, Iterator<?> itr) {
296 final boolean removed;
297 if (itr == null) {
298 removed = streamMap.remove(stream.id()) != null;
299 } else {
300 itr.remove();
301 removed = true;
302 }
303
304 if (removed) {
305 for (int i = 0; i < listeners.size(); i++) {
306 try {
307 listeners.get(i).onStreamRemoved(stream);
308 } catch (Throwable cause) {
309 logger.error("Caught Throwable from listener onStreamRemoved.", cause);
310 }
311 }
312
313 if (closePromise != null && isStreamMapEmpty()) {
314 closePromise.trySuccess(null);
315 }
316 }
317 }
318
319 static State activeState(int streamId, State initialState, boolean isLocal, boolean halfClosed)
320 throws Http2Exception {
321 switch (initialState) {
322 case IDLE:
323 return halfClosed ? isLocal ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN;
324 case RESERVED_LOCAL:
325 return HALF_CLOSED_REMOTE;
326 case RESERVED_REMOTE:
327 return HALF_CLOSED_LOCAL;
328 default:
329 throw streamError(streamId, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: "
330 + initialState);
331 }
332 }
333
334 void notifyHalfClosed(Http2Stream stream) {
335 for (int i = 0; i < listeners.size(); i++) {
336 try {
337 listeners.get(i).onStreamHalfClosed(stream);
338 } catch (Throwable cause) {
339 logger.error("Caught Throwable from listener onStreamHalfClosed.", cause);
340 }
341 }
342 }
343
344 void notifyClosed(Http2Stream stream) {
345 for (int i = 0; i < listeners.size(); i++) {
346 try {
347 listeners.get(i).onStreamClosed(stream);
348 } catch (Throwable cause) {
349 logger.error("Caught Throwable from listener onStreamClosed.", cause);
350 }
351 }
352 }
353
354 @Override
355 public PropertyKey newKey() {
356 return propertyKeyRegistry.newKey();
357 }
358
359
360
361
362
363
364
365
366 final DefaultPropertyKey verifyKey(PropertyKey key) {
367 return requireNonNull((DefaultPropertyKey) key, "key").verifyConnection(this);
368 }
369
370
371
372
373 private class DefaultStream implements Http2Stream {
374 private static final byte META_STATE_SENT_RST = 1;
375 private static final byte META_STATE_SENT_HEADERS = 1 << 1;
376 private static final byte META_STATE_SENT_TRAILERS = 1 << 2;
377 private static final byte META_STATE_SENT_PUSHPROMISE = 1 << 3;
378 private static final byte META_STATE_RECV_HEADERS = 1 << 4;
379 private static final byte META_STATE_RECV_TRAILERS = 1 << 5;
380 private final int id;
381 private final PropertyMap properties = new PropertyMap();
382 private State state;
383 private byte metaState;
384
385 DefaultStream(int id, State state) {
386 this.id = id;
387 this.state = state;
388 }
389
390 @Override
391 public final int id() {
392 return id;
393 }
394
395 @Override
396 public final State state() {
397 return state;
398 }
399
400 @Override
401 public boolean isResetSent() {
402 return (metaState & META_STATE_SENT_RST) != 0;
403 }
404
405 @Override
406 public Http2Stream resetSent() {
407 metaState |= META_STATE_SENT_RST;
408 return this;
409 }
410
411 @Override
412 public Http2Stream headersSent(boolean isInformational) {
413 if (!isInformational) {
414 metaState |= isHeadersSent() ? META_STATE_SENT_TRAILERS : META_STATE_SENT_HEADERS;
415 }
416 return this;
417 }
418
419 @Override
420 public boolean isHeadersSent() {
421 return (metaState & META_STATE_SENT_HEADERS) != 0;
422 }
423
424 @Override
425 public boolean isTrailersSent() {
426 return (metaState & META_STATE_SENT_TRAILERS) != 0;
427 }
428
429 @Override
430 public Http2Stream headersReceived(boolean isInformational) {
431 if (!isInformational) {
432 metaState |= isHeadersReceived() ? META_STATE_RECV_TRAILERS : META_STATE_RECV_HEADERS;
433 }
434 return this;
435 }
436
437 @Override
438 public boolean isHeadersReceived() {
439 return (metaState & META_STATE_RECV_HEADERS) != 0;
440 }
441
442 @Override
443 public boolean isTrailersReceived() {
444 return (metaState & META_STATE_RECV_TRAILERS) != 0;
445 }
446
447 @Override
448 public Http2Stream pushPromiseSent() {
449 metaState |= META_STATE_SENT_PUSHPROMISE;
450 return this;
451 }
452
453 @Override
454 public boolean isPushPromiseSent() {
455 return (metaState & META_STATE_SENT_PUSHPROMISE) != 0;
456 }
457
458 @Override
459 public final <V> V setProperty(PropertyKey key, V value) {
460 return properties.add(verifyKey(key), value);
461 }
462
463 @Override
464 public final <V> V getProperty(PropertyKey key) {
465 return properties.get(verifyKey(key));
466 }
467
468 @Override
469 public final <V> V removeProperty(PropertyKey key) {
470 return properties.remove(verifyKey(key));
471 }
472
473 @Override
474 public Http2Stream open(boolean halfClosed) throws Http2Exception {
475 state = activeState(id, state, isLocal(), halfClosed);
476 final DefaultEndpoint<? extends Http2FlowController> endpoint = createdBy();
477 if (!endpoint.canOpenStream()) {
478 throw connectionError(PROTOCOL_ERROR, "Maximum active streams violated for this endpoint: " +
479 endpoint.maxActiveStreams());
480 }
481
482 activate();
483 return this;
484 }
485
486 void activate() {
487
488
489 if (state == HALF_CLOSED_LOCAL) {
490 headersSent( false);
491 } else if (state == HALF_CLOSED_REMOTE) {
492 headersReceived( false);
493 }
494 activeStreams.activate(this);
495 }
496
497 Http2Stream close(Iterator<?> itr) {
498 if (state == CLOSED) {
499 return this;
500 }
501
502 state = CLOSED;
503
504 --createdBy().numStreams;
505 activeStreams.deactivate(this, itr);
506 return this;
507 }
508
509 @Override
510 public Http2Stream close() {
511 return close(null);
512 }
513
514 @Override
515 public Http2Stream closeLocalSide() {
516 switch (state) {
517 case OPEN:
518 state = HALF_CLOSED_LOCAL;
519 notifyHalfClosed(this);
520 break;
521 case HALF_CLOSED_LOCAL:
522 break;
523 default:
524 close();
525 break;
526 }
527 return this;
528 }
529
530 @Override
531 public Http2Stream closeRemoteSide() {
532 switch (state) {
533 case OPEN:
534 state = HALF_CLOSED_REMOTE;
535 notifyHalfClosed(this);
536 break;
537 case HALF_CLOSED_REMOTE:
538 break;
539 default:
540 close();
541 break;
542 }
543 return this;
544 }
545
546 DefaultEndpoint<? extends Http2FlowController> createdBy() {
547 return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
548 }
549
550 final boolean isLocal() {
551 return localEndpoint.isValidStreamId(id);
552 }
553
554
555
556
557 private class PropertyMap {
558 Object[] values = EmptyArrays.EMPTY_OBJECTS;
559
560 <V> V add(DefaultPropertyKey key, V value) {
561 resizeIfNecessary(key.index);
562 @SuppressWarnings("unchecked")
563 V prevValue = (V) values[key.index];
564 values[key.index] = value;
565 return prevValue;
566 }
567
568 @SuppressWarnings("unchecked")
569 <V> V get(DefaultPropertyKey key) {
570 if (key.index >= values.length) {
571 return null;
572 }
573 return (V) values[key.index];
574 }
575
576 @SuppressWarnings("unchecked")
577 <V> V remove(DefaultPropertyKey key) {
578 V prevValue = null;
579 if (key.index < values.length) {
580 prevValue = (V) values[key.index];
581 values[key.index] = null;
582 }
583 return prevValue;
584 }
585
586 void resizeIfNecessary(int index) {
587 if (index >= values.length) {
588 values = Arrays.copyOf(values, propertyKeyRegistry.size());
589 }
590 }
591 }
592 }
593
594
595
596
597 private final class ConnectionStream extends DefaultStream {
598 ConnectionStream() {
599 super(CONNECTION_STREAM_ID, IDLE);
600 }
601
602 @Override
603 public boolean isResetSent() {
604 return false;
605 }
606
607 @Override
608 DefaultEndpoint<? extends Http2FlowController> createdBy() {
609 return null;
610 }
611
612 @Override
613 public Http2Stream resetSent() {
614 throw new UnsupportedOperationException();
615 }
616
617 @Override
618 public Http2Stream open(boolean halfClosed) {
619 throw new UnsupportedOperationException();
620 }
621
622 @Override
623 public Http2Stream close() {
624 throw new UnsupportedOperationException();
625 }
626
627 @Override
628 public Http2Stream closeLocalSide() {
629 throw new UnsupportedOperationException();
630 }
631
632 @Override
633 public Http2Stream closeRemoteSide() {
634 throw new UnsupportedOperationException();
635 }
636
637 @Override
638 public Http2Stream headersSent(boolean isInformational) {
639 throw new UnsupportedOperationException();
640 }
641
642 @Override
643 public boolean isHeadersSent() {
644 throw new UnsupportedOperationException();
645 }
646
647 @Override
648 public Http2Stream pushPromiseSent() {
649 throw new UnsupportedOperationException();
650 }
651
652 @Override
653 public boolean isPushPromiseSent() {
654 throw new UnsupportedOperationException();
655 }
656 }
657
658
659
660
661 private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
662 private final boolean server;
663
664
665
666
667
668 private int nextStreamIdToCreate;
669
670
671
672
673
674
675 private int nextReservationStreamId;
676 private int lastStreamKnownByPeer = -1;
677 private boolean pushToAllowed;
678 private F flowController;
679 private int maxStreams;
680 private int maxActiveStreams;
681 private final int maxReservedStreams;
682
683 int numActiveStreams;
684 int numStreams;
685
686 DefaultEndpoint(boolean server, int maxReservedStreams) {
687 this.server = server;
688
689
690
691
692
693 if (server) {
694 nextStreamIdToCreate = 2;
695 nextReservationStreamId = 0;
696 } else {
697 nextStreamIdToCreate = 1;
698
699 nextReservationStreamId = 1;
700 }
701
702
703 pushToAllowed = !server;
704 maxActiveStreams = MAX_VALUE;
705 this.maxReservedStreams = checkPositiveOrZero(maxReservedStreams, "maxReservedStreams");
706 updateMaxStreams();
707 }
708
709 @Override
710 public int incrementAndGetNextStreamId() {
711 return nextReservationStreamId >= 0 ? nextReservationStreamId += 2 : nextReservationStreamId;
712 }
713
714 private void incrementExpectedStreamId(int streamId) {
715 if (streamId > nextReservationStreamId && nextReservationStreamId >= 0) {
716 nextReservationStreamId = streamId;
717 }
718 nextStreamIdToCreate = streamId + 2;
719 ++numStreams;
720 }
721
722 @Override
723 public boolean isValidStreamId(int streamId) {
724 return streamId > 0 && server == ((streamId & 1) == 0);
725 }
726
727 @Override
728 public boolean mayHaveCreatedStream(int streamId) {
729 return isValidStreamId(streamId) && streamId <= lastStreamCreated();
730 }
731
732 @Override
733 public boolean canOpenStream() {
734 return numActiveStreams < maxActiveStreams;
735 }
736
737 @Override
738 public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
739 State state = activeState(streamId, IDLE, isLocal(), halfClosed);
740
741 checkNewStreamAllowed(streamId, state);
742
743
744 DefaultStream stream = new DefaultStream(streamId, state);
745
746 incrementExpectedStreamId(streamId);
747
748 addStream(stream);
749
750 stream.activate();
751 return stream;
752 }
753
754 @Override
755 public boolean created(Http2Stream stream) {
756 return stream instanceof DefaultStream && ((DefaultStream) stream).createdBy() == this;
757 }
758
759 @Override
760 public boolean isServer() {
761 return server;
762 }
763
764 @Override
765 public DefaultStream reservePushStream(int streamId, Http2Stream parent) throws Http2Exception {
766 if (parent == null) {
767 throw connectionError(PROTOCOL_ERROR, "Parent stream missing");
768 }
769 if (isLocal() ? !parent.state().localSideOpen() : !parent.state().remoteSideOpen()) {
770 throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id());
771 }
772 if (!opposite().allowPushTo()) {
773 throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint");
774 }
775 State state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE;
776 checkNewStreamAllowed(streamId, state);
777
778
779 DefaultStream stream = new DefaultStream(streamId, state);
780
781 incrementExpectedStreamId(streamId);
782
783
784 addStream(stream);
785 return stream;
786 }
787
788 private void addStream(DefaultStream stream) {
789
790 streamMap.put(stream.id(), stream);
791
792
793 for (int i = 0; i < listeners.size(); i++) {
794 try {
795 listeners.get(i).onStreamAdded(stream);
796 } catch (Throwable cause) {
797 logger.error("Caught Throwable from listener onStreamAdded.", cause);
798 }
799 }
800 }
801
802 @Override
803 public void allowPushTo(boolean allow) {
804 if (allow && server) {
805 throw new IllegalArgumentException("Servers do not allow push");
806 }
807 pushToAllowed = allow;
808 }
809
810 @Override
811 public boolean allowPushTo() {
812 return pushToAllowed;
813 }
814
815 @Override
816 public int numActiveStreams() {
817 return numActiveStreams;
818 }
819
820 @Override
821 public int maxActiveStreams() {
822 return maxActiveStreams;
823 }
824
825 @Override
826 public void maxActiveStreams(int maxActiveStreams) {
827 this.maxActiveStreams = maxActiveStreams;
828 updateMaxStreams();
829 }
830
831 @Override
832 public int lastStreamCreated() {
833 return nextStreamIdToCreate > 1 ? nextStreamIdToCreate - 2 : 0;
834 }
835
836 @Override
837 public int lastStreamKnownByPeer() {
838 return lastStreamKnownByPeer;
839 }
840
841 private void lastStreamKnownByPeer(int lastKnownStream) {
842 lastStreamKnownByPeer = lastKnownStream;
843 }
844
845 @Override
846 public F flowController() {
847 return flowController;
848 }
849
850 @Override
851 public void flowController(F flowController) {
852 this.flowController = requireNonNull(flowController, "flowController");
853 }
854
855 @Override
856 public Endpoint<? extends Http2FlowController> opposite() {
857 return isLocal() ? remoteEndpoint : localEndpoint;
858 }
859
860 private void updateMaxStreams() {
861 maxStreams = (int) Math.min(MAX_VALUE, (long) maxActiveStreams + maxReservedStreams);
862 }
863
864 private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
865 assert state != IDLE;
866 if (lastStreamKnownByPeer >= 0 && streamId > lastStreamKnownByPeer) {
867 throw streamError(streamId, REFUSED_STREAM,
868 "Cannot create stream %d greater than Last-Stream-ID %d from GOAWAY.",
869 streamId, lastStreamKnownByPeer);
870 }
871 if (!isValidStreamId(streamId)) {
872 if (streamId < 0) {
873 throw new Http2NoMoreStreamIdsException();
874 }
875 throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId,
876 server ? "server" : "client");
877 }
878
879
880 if (streamId < nextStreamIdToCreate) {
881 throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
882 streamId, nextStreamIdToCreate);
883 }
884 if (nextStreamIdToCreate <= 0) {
885
886
887 throw new Http2Exception(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.",
888 Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN);
889 }
890 boolean isReserved = state == RESERVED_LOCAL || state == RESERVED_REMOTE;
891 if (!isReserved && !canOpenStream() || isReserved && numStreams >= maxStreams) {
892 throw streamError(streamId, REFUSED_STREAM, "Maximum active streams violated for this endpoint: " +
893 (isReserved ? maxStreams : maxActiveStreams));
894 }
895 if (isClosed()) {
896 throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed",
897 streamId);
898 }
899 }
900
901 private boolean isLocal() {
902 return this == localEndpoint;
903 }
904 }
905
906
907
908
909
910 interface Event {
911
912
913
914
915
916
917 void process();
918 }
919
920
921
922
923
924 private final class ActiveStreams {
925 private final List<Listener> listeners;
926 private final Queue<Event> pendingEvents = new ArrayDeque<>(4);
927 private final Set<Http2Stream> streams = new LinkedHashSet<>();
928 private int pendingIterations;
929
930 ActiveStreams(List<Listener> listeners) {
931 this.listeners = listeners;
932 }
933
934 public int size() {
935 return streams.size();
936 }
937
938 public void activate(final DefaultStream stream) {
939 if (allowModifications()) {
940 addToActiveStreams(stream);
941 } else {
942 pendingEvents.add(() -> addToActiveStreams(stream));
943 }
944 }
945
946 public void deactivate(final DefaultStream stream, final Iterator<?> itr) {
947 if (allowModifications() || itr != null) {
948 removeFromActiveStreams(stream, itr);
949 } else {
950 pendingEvents.add(() -> removeFromActiveStreams(stream, itr));
951 }
952 }
953
954 public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
955 incrementPendingIterations();
956 try {
957 for (Http2Stream stream : streams) {
958 if (!visitor.visit(stream)) {
959 return stream;
960 }
961 }
962 return null;
963 } finally {
964 decrementPendingIterations();
965 }
966 }
967
968 void addToActiveStreams(DefaultStream stream) {
969 if (streams.add(stream)) {
970
971 stream.createdBy().numActiveStreams++;
972
973 for (int i = 0; i < listeners.size(); i++) {
974 try {
975 listeners.get(i).onStreamActive(stream);
976 } catch (Throwable cause) {
977 logger.error("Caught Throwable from listener onStreamActive.", cause);
978 }
979 }
980 }
981 }
982
983 void removeFromActiveStreams(DefaultStream stream, Iterator<?> itr) {
984 if (streams.remove(stream)) {
985
986 stream.createdBy().numActiveStreams--;
987 notifyClosed(stream);
988 }
989 removeStream(stream, itr);
990 }
991
992 boolean allowModifications() {
993 return pendingIterations == 0;
994 }
995
996 void incrementPendingIterations() {
997 ++pendingIterations;
998 }
999
1000 void decrementPendingIterations() {
1001 --pendingIterations;
1002 if (allowModifications()) {
1003 for (;;) {
1004 Event event = pendingEvents.poll();
1005 if (event == null) {
1006 break;
1007 }
1008 try {
1009 event.process();
1010 } catch (Throwable cause) {
1011 logger.error("Caught Throwable while processing pending ActiveStreams$Event.", cause);
1012 }
1013 }
1014 }
1015 }
1016 }
1017
1018
1019
1020
1021 final class DefaultPropertyKey implements PropertyKey {
1022 final int index;
1023
1024 DefaultPropertyKey(int index) {
1025 this.index = index;
1026 }
1027
1028 DefaultPropertyKey verifyConnection(Http2Connection connection) {
1029 if (connection != DefaultHttp2Connection.this) {
1030 throw new IllegalArgumentException("Using a key that was not created by this connection");
1031 }
1032 return this;
1033 }
1034 }
1035
1036
1037
1038
1039 private final class PropertyKeyRegistry {
1040
1041
1042
1043
1044
1045 final List<DefaultPropertyKey> keys = new ArrayList<>(4);
1046
1047
1048
1049
1050 DefaultPropertyKey newKey() {
1051 DefaultPropertyKey key = new DefaultPropertyKey(keys.size());
1052 keys.add(key);
1053 return key;
1054 }
1055
1056 int size() {
1057 return keys.size();
1058 }
1059 }
1060 }