1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.handler.codec.http2.Http2Stream.State;
22 import io.netty.util.collection.IntObjectHashMap;
23 import io.netty.util.collection.IntObjectMap;
24 import io.netty.util.collection.IntObjectMap.PrimitiveEntry;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.Promise;
27 import io.netty.util.concurrent.PromiseNotifier;
28 import io.netty.util.internal.EmptyArrays;
29 import io.netty.util.internal.logging.InternalLogger;
30 import io.netty.util.internal.logging.InternalLoggerFactory;
31
32 import java.util.ArrayDeque;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Iterator;
36 import java.util.LinkedHashSet;
37 import java.util.List;
38 import java.util.Queue;
39 import java.util.Set;
40
41 import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
42 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_RESERVED_STREAMS;
43 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
46 import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
47 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
48 import static io.netty.handler.codec.http2.Http2Exception.streamError;
49 import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
50 import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
51 import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
52 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
53 import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
54 import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
55 import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
56 import static io.netty.util.internal.ObjectUtil.checkNotNull;
57 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
58 import static java.lang.Integer.MAX_VALUE;
59
60
61
62
63 public class DefaultHttp2Connection implements Http2Connection {
64 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
65
66 final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
67 final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry();
68 final ConnectionStream connectionStream = new ConnectionStream();
69 final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
70 final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
71
72
73
74
75
76
77
78
79
80 final List<Listener> listeners = new ArrayList<Listener>(4);
81 final ActiveStreams activeStreams;
82 Promise<Void> closePromise;
83
84
85
86
87
88 public DefaultHttp2Connection(boolean server) {
89 this(server, DEFAULT_MAX_RESERVED_STREAMS);
90 }
91
92
93
94
95
96
97 public DefaultHttp2Connection(boolean server, int maxReservedStreams) {
98 activeStreams = new ActiveStreams(listeners);
99
100
101
102
103
104 localEndpoint = new DefaultEndpoint<Http2LocalFlowController>(server, server ? MAX_VALUE : maxReservedStreams);
105 remoteEndpoint = new DefaultEndpoint<Http2RemoteFlowController>(!server, maxReservedStreams);
106
107
108 streamMap.put(connectionStream.id(), connectionStream);
109 }
110
111
112
113
114 final boolean isClosed() {
115 return closePromise != null;
116 }
117
118 @Override
119 public Future<Void> close(final Promise<Void> promise) {
120 checkNotNull(promise, "promise");
121
122
123 if (closePromise != null) {
124 if (closePromise == promise) {
125
126 } else if (promise instanceof ChannelPromise && ((ChannelFuture) closePromise).isVoid()) {
127 closePromise = promise;
128 } else {
129 PromiseNotifier.cascade(closePromise, promise);
130 }
131 } else {
132 closePromise = promise;
133 }
134 if (isStreamMapEmpty()) {
135 promise.trySuccess(null);
136 return promise;
137 }
138
139 Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator();
140
141
142 if (activeStreams.allowModifications()) {
143 activeStreams.incrementPendingIterations();
144 try {
145 while (itr.hasNext()) {
146 DefaultStream stream = (DefaultStream) itr.next().value();
147 if (stream.id() != CONNECTION_STREAM_ID) {
148
149
150
151 stream.close(itr);
152 }
153 }
154 } finally {
155 activeStreams.decrementPendingIterations();
156 }
157 } else {
158 while (itr.hasNext()) {
159 Http2Stream stream = itr.next().value();
160 if (stream.id() != CONNECTION_STREAM_ID) {
161
162
163 stream.close();
164 }
165 }
166 }
167 return closePromise;
168 }
169
170 @Override
171 public void addListener(Listener listener) {
172 listeners.add(listener);
173 }
174
175 @Override
176 public void removeListener(Listener listener) {
177 listeners.remove(listener);
178 }
179
180 @Override
181 public boolean isServer() {
182 return localEndpoint.isServer();
183 }
184
185 @Override
186 public Http2Stream connectionStream() {
187 return connectionStream;
188 }
189
190 @Override
191 public Http2Stream stream(int streamId) {
192 return streamMap.get(streamId);
193 }
194
195 @Override
196 public boolean streamMayHaveExisted(int streamId) {
197 return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId);
198 }
199
200 @Override
201 public int numActiveStreams() {
202 return activeStreams.size();
203 }
204
205 @Override
206 public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
207 return activeStreams.forEachActiveStream(visitor);
208 }
209
210 @Override
211 public Endpoint<Http2LocalFlowController> local() {
212 return localEndpoint;
213 }
214
215 @Override
216 public Endpoint<Http2RemoteFlowController> remote() {
217 return remoteEndpoint;
218 }
219
220 @Override
221 public boolean goAwayReceived() {
222 return localEndpoint.lastStreamKnownByPeer >= 0;
223 }
224
225 @Override
226 public void goAwayReceived(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
227 if (localEndpoint.lastStreamKnownByPeer() >= 0 && localEndpoint.lastStreamKnownByPeer() < lastKnownStream) {
228 throw connectionError(PROTOCOL_ERROR, "lastStreamId MUST NOT increase. Current value: %d new value: %d",
229 localEndpoint.lastStreamKnownByPeer(), lastKnownStream);
230 }
231
232 localEndpoint.lastStreamKnownByPeer(lastKnownStream);
233 for (int i = 0; i < listeners.size(); ++i) {
234 try {
235 listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData);
236 } catch (Throwable cause) {
237 logger.error("Caught Throwable from listener onGoAwayReceived.", cause);
238 }
239 }
240
241 closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, localEndpoint);
242 }
243
244 @Override
245 public boolean goAwaySent() {
246 return remoteEndpoint.lastStreamKnownByPeer >= 0;
247 }
248
249 @Override
250 public boolean goAwaySent(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
251 if (remoteEndpoint.lastStreamKnownByPeer() >= 0) {
252
253
254 if (lastKnownStream == remoteEndpoint.lastStreamKnownByPeer()) {
255 return false;
256 }
257 if (lastKnownStream > remoteEndpoint.lastStreamKnownByPeer()) {
258 throw connectionError(PROTOCOL_ERROR, "Last stream identifier must not increase between " +
259 "sending multiple GOAWAY frames (was '%d', is '%d').",
260 remoteEndpoint.lastStreamKnownByPeer(), lastKnownStream);
261 }
262 }
263
264 remoteEndpoint.lastStreamKnownByPeer(lastKnownStream);
265 for (int i = 0; i < listeners.size(); ++i) {
266 try {
267 listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData);
268 } catch (Throwable cause) {
269 logger.error("Caught Throwable from listener onGoAwaySent.", cause);
270 }
271 }
272
273 closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, remoteEndpoint);
274 return true;
275 }
276
277 private void closeStreamsGreaterThanLastKnownStreamId(final int lastKnownStream,
278 final DefaultEndpoint<?> endpoint) throws Http2Exception {
279 forEachActiveStream(new Http2StreamVisitor() {
280 @Override
281 public boolean visit(Http2Stream stream) {
282 if (stream.id() > lastKnownStream && endpoint.isValidStreamId(stream.id())) {
283 stream.close();
284 }
285 return true;
286 }
287 });
288 }
289
290
291
292
293 private boolean isStreamMapEmpty() {
294 return streamMap.size() == 1;
295 }
296
297
298
299
300
301
302
303 void removeStream(DefaultStream stream, Iterator<?> itr) {
304 final boolean removed;
305 if (itr == null) {
306 removed = streamMap.remove(stream.id()) != null;
307 } else {
308 itr.remove();
309 removed = true;
310 }
311
312 if (removed) {
313 for (int i = 0; i < listeners.size(); i++) {
314 try {
315 listeners.get(i).onStreamRemoved(stream);
316 } catch (Throwable cause) {
317 logger.error("Caught Throwable from listener onStreamRemoved.", cause);
318 }
319 }
320
321 if (closePromise != null && isStreamMapEmpty()) {
322 closePromise.trySuccess(null);
323 }
324 }
325 }
326
327 static State activeState(int streamId, State initialState, boolean isLocal, boolean halfClosed)
328 throws Http2Exception {
329 switch (initialState) {
330 case IDLE:
331 return halfClosed ? isLocal ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN;
332 case RESERVED_LOCAL:
333 return HALF_CLOSED_REMOTE;
334 case RESERVED_REMOTE:
335 return HALF_CLOSED_LOCAL;
336 default:
337 throw streamError(streamId, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: "
338 + initialState);
339 }
340 }
341
342 void notifyHalfClosed(Http2Stream stream) {
343 for (int i = 0; i < listeners.size(); i++) {
344 try {
345 listeners.get(i).onStreamHalfClosed(stream);
346 } catch (Throwable cause) {
347 logger.error("Caught Throwable from listener onStreamHalfClosed.", cause);
348 }
349 }
350 }
351
352 void notifyClosed(Http2Stream stream) {
353 for (int i = 0; i < listeners.size(); i++) {
354 try {
355 listeners.get(i).onStreamClosed(stream);
356 } catch (Throwable cause) {
357 logger.error("Caught Throwable from listener onStreamClosed.", cause);
358 }
359 }
360 }
361
362 @Override
363 public PropertyKey newKey() {
364 return propertyKeyRegistry.newKey();
365 }
366
367
368
369
370
371
372
373
374 final DefaultPropertyKey verifyKey(PropertyKey key) {
375 return checkNotNull((DefaultPropertyKey) key, "key").verifyConnection(this);
376 }
377
378
379
380
381 private class DefaultStream implements Http2Stream {
382 private static final byte META_STATE_SENT_RST = 1;
383 private static final byte META_STATE_SENT_HEADERS = 1 << 1;
384 private static final byte META_STATE_SENT_TRAILERS = 1 << 2;
385 private static final byte META_STATE_SENT_PUSHPROMISE = 1 << 3;
386 private static final byte META_STATE_RECV_HEADERS = 1 << 4;
387 private static final byte META_STATE_RECV_TRAILERS = 1 << 5;
388 private final int id;
389 private final long identity;
390 private final PropertyMap properties = new PropertyMap();
391 private State state;
392 private byte metaState;
393
394 DefaultStream(long identity, int id, State state) {
395 this.identity = identity;
396 this.id = id;
397 this.state = state;
398 }
399
400 @Override
401 public final int id() {
402 return id;
403 }
404
405 @Override
406 public final State state() {
407 return state;
408 }
409
410 @Override
411 public boolean isResetSent() {
412 return (metaState & META_STATE_SENT_RST) != 0;
413 }
414
415 @Override
416 public Http2Stream resetSent() {
417 metaState |= META_STATE_SENT_RST;
418 return this;
419 }
420
421 @Override
422 public Http2Stream headersSent(boolean isInformational) {
423 if (!isInformational) {
424 metaState |= isHeadersSent() ? META_STATE_SENT_TRAILERS : META_STATE_SENT_HEADERS;
425 }
426 return this;
427 }
428
429 @Override
430 public boolean isHeadersSent() {
431 return (metaState & META_STATE_SENT_HEADERS) != 0;
432 }
433
434 @Override
435 public boolean isTrailersSent() {
436 return (metaState & META_STATE_SENT_TRAILERS) != 0;
437 }
438
439 @Override
440 public Http2Stream headersReceived(boolean isInformational) {
441 if (!isInformational) {
442 metaState |= isHeadersReceived() ? META_STATE_RECV_TRAILERS : META_STATE_RECV_HEADERS;
443 }
444 return this;
445 }
446
447 @Override
448 public boolean isHeadersReceived() {
449 return (metaState & META_STATE_RECV_HEADERS) != 0;
450 }
451
452 @Override
453 public boolean isTrailersReceived() {
454 return (metaState & META_STATE_RECV_TRAILERS) != 0;
455 }
456
457 @Override
458 public Http2Stream pushPromiseSent() {
459 metaState |= META_STATE_SENT_PUSHPROMISE;
460 return this;
461 }
462
463 @Override
464 public boolean isPushPromiseSent() {
465 return (metaState & META_STATE_SENT_PUSHPROMISE) != 0;
466 }
467
468 @Override
469 public final <V> V setProperty(PropertyKey key, V value) {
470 return properties.add(verifyKey(key), value);
471 }
472
473 @Override
474 public final <V> V getProperty(PropertyKey key) {
475 return properties.get(verifyKey(key));
476 }
477
478 @Override
479 public final <V> V removeProperty(PropertyKey key) {
480 return properties.remove(verifyKey(key));
481 }
482
483 @Override
484 public Http2Stream open(boolean halfClosed) throws Http2Exception {
485 state = activeState(id, state, isLocal(), halfClosed);
486 final DefaultEndpoint<? extends Http2FlowController> endpoint = createdBy();
487 if (!endpoint.canOpenStream()) {
488 throw connectionError(PROTOCOL_ERROR, "Maximum active streams violated for this endpoint: " +
489 endpoint.maxActiveStreams());
490 }
491
492 activate();
493 return this;
494 }
495
496 void activate() {
497
498
499 if (state == HALF_CLOSED_LOCAL) {
500 headersSent( false);
501 } else if (state == HALF_CLOSED_REMOTE) {
502 headersReceived( false);
503 }
504 activeStreams.activate(this);
505 }
506
507 Http2Stream close(Iterator<?> itr) {
508 if (state == CLOSED) {
509 return this;
510 }
511
512 state = CLOSED;
513
514 --createdBy().numStreams;
515 activeStreams.deactivate(this, itr);
516 return this;
517 }
518
519 @Override
520 public Http2Stream close() {
521 return close(null);
522 }
523
524 @Override
525 public Http2Stream closeLocalSide() {
526 switch (state) {
527 case OPEN:
528 state = HALF_CLOSED_LOCAL;
529 notifyHalfClosed(this);
530 break;
531 case HALF_CLOSED_LOCAL:
532 break;
533 default:
534 close();
535 break;
536 }
537 return this;
538 }
539
540 @Override
541 public Http2Stream closeRemoteSide() {
542 switch (state) {
543 case OPEN:
544 state = HALF_CLOSED_REMOTE;
545 notifyHalfClosed(this);
546 break;
547 case HALF_CLOSED_REMOTE:
548 break;
549 default:
550 close();
551 break;
552 }
553 return this;
554 }
555
556 DefaultEndpoint<? extends Http2FlowController> createdBy() {
557 return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
558 }
559
560 final boolean isLocal() {
561 return localEndpoint.isValidStreamId(id);
562 }
563
564
565
566
567 private class PropertyMap {
568 Object[] values = EmptyArrays.EMPTY_OBJECTS;
569
570 <V> V add(DefaultPropertyKey key, V value) {
571 resizeIfNecessary(key.index);
572 @SuppressWarnings("unchecked")
573 V prevValue = (V) values[key.index];
574 values[key.index] = value;
575 return prevValue;
576 }
577
578 @SuppressWarnings("unchecked")
579 <V> V get(DefaultPropertyKey key) {
580 if (key.index >= values.length) {
581 return null;
582 }
583 return (V) values[key.index];
584 }
585
586 @SuppressWarnings("unchecked")
587 <V> V remove(DefaultPropertyKey key) {
588 V prevValue = null;
589 if (key.index < values.length) {
590 prevValue = (V) values[key.index];
591 values[key.index] = null;
592 }
593 return prevValue;
594 }
595
596 void resizeIfNecessary(int index) {
597 if (index >= values.length) {
598 values = Arrays.copyOf(values, propertyKeyRegistry.size());
599 }
600 }
601 }
602
603 @Override
604 public boolean equals(final Object obj) {
605 return super.equals(obj);
606 }
607
608 @Override
609 public int hashCode() {
610 long value = identity;
611 if (value == 0) {
612 return System.identityHashCode(this);
613 }
614 return (int) (value ^ (value >>> 32));
615 }
616 }
617
618
619
620
621 private final class ConnectionStream extends DefaultStream {
622 ConnectionStream() {
623 super(0, CONNECTION_STREAM_ID, IDLE);
624 }
625
626 @Override
627 public boolean isResetSent() {
628 return false;
629 }
630
631 @Override
632 DefaultEndpoint<? extends Http2FlowController> createdBy() {
633 return null;
634 }
635
636 @Override
637 public Http2Stream resetSent() {
638 throw new UnsupportedOperationException();
639 }
640
641 @Override
642 public Http2Stream open(boolean halfClosed) {
643 throw new UnsupportedOperationException();
644 }
645
646 @Override
647 public Http2Stream close() {
648 throw new UnsupportedOperationException();
649 }
650
651 @Override
652 public Http2Stream closeLocalSide() {
653 throw new UnsupportedOperationException();
654 }
655
656 @Override
657 public Http2Stream closeRemoteSide() {
658 throw new UnsupportedOperationException();
659 }
660
661 @Override
662 public Http2Stream headersSent(boolean isInformational) {
663 throw new UnsupportedOperationException();
664 }
665
666 @Override
667 public boolean isHeadersSent() {
668 throw new UnsupportedOperationException();
669 }
670
671 @Override
672 public Http2Stream pushPromiseSent() {
673 throw new UnsupportedOperationException();
674 }
675
676 @Override
677 public boolean isPushPromiseSent() {
678 throw new UnsupportedOperationException();
679 }
680 }
681
682
683
684
685 private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
686 private final boolean server;
687
688
689
690 private long lastCreatedStreamIdentity;
691
692
693
694
695
696 private int nextStreamIdToCreate;
697
698
699
700
701
702
703 private int nextReservationStreamId;
704 private int lastStreamKnownByPeer = -1;
705 private boolean pushToAllowed;
706 private F flowController;
707 private int maxStreams;
708 private int maxActiveStreams;
709 private final int maxReservedStreams;
710
711 int numActiveStreams;
712 int numStreams;
713
714 DefaultEndpoint(boolean server, int maxReservedStreams) {
715 this.lastCreatedStreamIdentity = 0;
716 this.server = server;
717
718
719
720
721
722 if (server) {
723 nextStreamIdToCreate = 2;
724 nextReservationStreamId = 0;
725 } else {
726 nextStreamIdToCreate = 1;
727
728 nextReservationStreamId = 1;
729 }
730
731
732 pushToAllowed = !server;
733 maxActiveStreams = MAX_VALUE;
734 this.maxReservedStreams = checkPositiveOrZero(maxReservedStreams, "maxReservedStreams");
735 updateMaxStreams();
736 }
737
738 @Override
739 public int incrementAndGetNextStreamId() {
740 return nextReservationStreamId >= 0 ? nextReservationStreamId += 2 : nextReservationStreamId;
741 }
742
743 private void incrementExpectedStreamId(int streamId) {
744 if (streamId > nextReservationStreamId && nextReservationStreamId >= 0) {
745 nextReservationStreamId = streamId;
746 }
747 nextStreamIdToCreate = streamId + 2;
748 ++numStreams;
749 }
750
751 @Override
752 public boolean isValidStreamId(int streamId) {
753 return streamId > 0 && server == ((streamId & 1) == 0);
754 }
755
756 @Override
757 public boolean mayHaveCreatedStream(int streamId) {
758 return isValidStreamId(streamId) && streamId <= lastStreamCreated();
759 }
760
761 @Override
762 public boolean canOpenStream() {
763 return numActiveStreams < maxActiveStreams;
764 }
765
766 @Override
767 public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
768 State state = activeState(streamId, IDLE, isLocal(), halfClosed);
769
770 checkNewStreamAllowed(streamId, state);
771
772 lastCreatedStreamIdentity++;
773
774
775 DefaultStream stream = new DefaultStream(lastCreatedStreamIdentity, streamId, state);
776
777 incrementExpectedStreamId(streamId);
778
779 addStream(stream);
780
781 stream.activate();
782 return stream;
783 }
784
785 @Override
786 public boolean created(Http2Stream stream) {
787 return stream instanceof DefaultStream && ((DefaultStream) stream).createdBy() == this;
788 }
789
790 @Override
791 public boolean isServer() {
792 return server;
793 }
794
795 @Override
796 public DefaultStream reservePushStream(int streamId, Http2Stream parent) throws Http2Exception {
797 if (parent == null) {
798 throw connectionError(PROTOCOL_ERROR, "Parent stream missing");
799 }
800 if (isLocal() ? !parent.state().localSideOpen() : !parent.state().remoteSideOpen()) {
801 throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id());
802 }
803 if (!opposite().allowPushTo()) {
804 throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint");
805 }
806 State state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE;
807 checkNewStreamAllowed(streamId, state);
808
809 lastCreatedStreamIdentity++;
810
811
812 DefaultStream stream = new DefaultStream(lastCreatedStreamIdentity, streamId, state);
813
814 incrementExpectedStreamId(streamId);
815
816
817 addStream(stream);
818 return stream;
819 }
820
821 private void addStream(DefaultStream stream) {
822
823 streamMap.put(stream.id(), stream);
824
825
826 for (int i = 0; i < listeners.size(); i++) {
827 try {
828 listeners.get(i).onStreamAdded(stream);
829 } catch (Throwable cause) {
830 logger.error("Caught Throwable from listener onStreamAdded.", cause);
831 }
832 }
833 }
834
835 @Override
836 public void allowPushTo(boolean allow) {
837 if (allow && server) {
838 throw new IllegalArgumentException("Servers do not allow push");
839 }
840 pushToAllowed = allow;
841 }
842
843 @Override
844 public boolean allowPushTo() {
845 return pushToAllowed;
846 }
847
848 @Override
849 public int numActiveStreams() {
850 return numActiveStreams;
851 }
852
853 @Override
854 public int maxActiveStreams() {
855 return maxActiveStreams;
856 }
857
858 @Override
859 public void maxActiveStreams(int maxActiveStreams) {
860 this.maxActiveStreams = maxActiveStreams;
861 updateMaxStreams();
862 }
863
864 @Override
865 public int lastStreamCreated() {
866
867
868
869
870 return Math.max(0, nextStreamIdToCreate - 2);
871 }
872
873 @Override
874 public int lastStreamKnownByPeer() {
875 return lastStreamKnownByPeer;
876 }
877
878 private void lastStreamKnownByPeer(int lastKnownStream) {
879 lastStreamKnownByPeer = lastKnownStream;
880 }
881
882 @Override
883 public F flowController() {
884 return flowController;
885 }
886
887 @Override
888 public void flowController(F flowController) {
889 this.flowController = checkNotNull(flowController, "flowController");
890 }
891
892 @Override
893 public Endpoint<? extends Http2FlowController> opposite() {
894 return isLocal() ? remoteEndpoint : localEndpoint;
895 }
896
897 private void updateMaxStreams() {
898 maxStreams = (int) Math.min(MAX_VALUE, (long) maxActiveStreams + maxReservedStreams);
899 }
900
901 private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
902 assert state != IDLE;
903 if (lastStreamKnownByPeer >= 0 && streamId > lastStreamKnownByPeer) {
904 throw streamError(streamId, REFUSED_STREAM,
905 "Cannot create stream %d greater than Last-Stream-ID %d from GOAWAY.",
906 streamId, lastStreamKnownByPeer);
907 }
908 if (!isValidStreamId(streamId)) {
909 if (streamId < 0) {
910 throw new Http2NoMoreStreamIdsException();
911 }
912 throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId,
913 server ? "server" : "client");
914 }
915
916
917 if (streamId < nextStreamIdToCreate) {
918 throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
919 streamId, nextStreamIdToCreate);
920 }
921 if (nextStreamIdToCreate <= 0) {
922
923
924 throw new Http2Exception(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.",
925 Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN);
926 }
927 boolean isReserved = state == RESERVED_LOCAL || state == RESERVED_REMOTE;
928 if (!isReserved && !canOpenStream() || isReserved && numStreams >= maxStreams) {
929 throw streamError(streamId, REFUSED_STREAM, "Maximum active streams violated for this endpoint: " +
930 (isReserved ? maxStreams : maxActiveStreams));
931 }
932 if (isClosed()) {
933 throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed",
934 streamId);
935 }
936 }
937
938 private boolean isLocal() {
939 return this == localEndpoint;
940 }
941 }
942
943
944
945
946
947 interface Event {
948
949
950
951
952
953
954 void process();
955 }
956
957
958
959
960
961 private final class ActiveStreams {
962 private final List<Listener> listeners;
963 private final Queue<Event> pendingEvents = new ArrayDeque<Event>(4);
964 private final Set<Http2Stream> streams = new LinkedHashSet<Http2Stream>();
965 private int pendingIterations;
966
967 ActiveStreams(List<Listener> listeners) {
968 this.listeners = listeners;
969 }
970
971 public int size() {
972 return streams.size();
973 }
974
975 public void activate(final DefaultStream stream) {
976 if (allowModifications()) {
977 addToActiveStreams(stream);
978 } else {
979 pendingEvents.add(new Event() {
980 @Override
981 public void process() {
982 addToActiveStreams(stream);
983 }
984 });
985 }
986 }
987
988 public void deactivate(final DefaultStream stream, final Iterator<?> itr) {
989 if (allowModifications() || itr != null) {
990 removeFromActiveStreams(stream, itr);
991 } else {
992 pendingEvents.add(new Event() {
993 @Override
994 public void process() {
995 removeFromActiveStreams(stream, itr);
996 }
997 });
998 }
999 }
1000
1001 public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
1002 incrementPendingIterations();
1003 try {
1004 for (Http2Stream stream : streams) {
1005 if (!visitor.visit(stream)) {
1006 return stream;
1007 }
1008 }
1009 return null;
1010 } finally {
1011 decrementPendingIterations();
1012 }
1013 }
1014
1015 void addToActiveStreams(DefaultStream stream) {
1016 if (streams.add(stream)) {
1017
1018 stream.createdBy().numActiveStreams++;
1019
1020 for (int i = 0; i < listeners.size(); i++) {
1021 try {
1022 listeners.get(i).onStreamActive(stream);
1023 } catch (Throwable cause) {
1024 logger.error("Caught Throwable from listener onStreamActive.", cause);
1025 }
1026 }
1027 }
1028 }
1029
1030 void removeFromActiveStreams(DefaultStream stream, Iterator<?> itr) {
1031 if (streams.remove(stream)) {
1032
1033 stream.createdBy().numActiveStreams--;
1034 notifyClosed(stream);
1035 }
1036 removeStream(stream, itr);
1037 }
1038
1039 boolean allowModifications() {
1040 return pendingIterations == 0;
1041 }
1042
1043 void incrementPendingIterations() {
1044 ++pendingIterations;
1045 }
1046
1047 void decrementPendingIterations() {
1048 --pendingIterations;
1049 if (allowModifications()) {
1050 for (;;) {
1051 Event event = pendingEvents.poll();
1052 if (event == null) {
1053 break;
1054 }
1055 try {
1056 event.process();
1057 } catch (Throwable cause) {
1058 logger.error("Caught Throwable while processing pending ActiveStreams$Event.", cause);
1059 }
1060 }
1061 }
1062 }
1063 }
1064
1065
1066
1067
1068 final class DefaultPropertyKey implements PropertyKey {
1069 final int index;
1070
1071 DefaultPropertyKey(int index) {
1072 this.index = index;
1073 }
1074
1075 DefaultPropertyKey verifyConnection(Http2Connection connection) {
1076 if (connection != DefaultHttp2Connection.this) {
1077 throw new IllegalArgumentException("Using a key that was not created by this connection");
1078 }
1079 return this;
1080 }
1081 }
1082
1083
1084
1085
1086 private final class PropertyKeyRegistry {
1087
1088
1089
1090
1091
1092 final List<DefaultPropertyKey> keys = new ArrayList<DefaultPropertyKey>(4);
1093
1094
1095
1096
1097 DefaultPropertyKey newKey() {
1098 DefaultPropertyKey key = new DefaultPropertyKey(keys.size());
1099 keys.add(key);
1100 return key;
1101 }
1102
1103 int size() {
1104 return keys.size();
1105 }
1106 }
1107 }