1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.handler.codec.http2.Http2Stream.State;
22 import io.netty.util.collection.IntObjectHashMap;
23 import io.netty.util.collection.IntObjectMap;
24 import io.netty.util.collection.IntObjectMap.PrimitiveEntry;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.Promise;
27 import io.netty.util.concurrent.PromiseNotifier;
28 import io.netty.util.internal.EmptyArrays;
29 import io.netty.util.internal.logging.InternalLogger;
30 import io.netty.util.internal.logging.InternalLoggerFactory;
31
32 import java.util.ArrayDeque;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Iterator;
36 import java.util.LinkedHashSet;
37 import java.util.List;
38 import java.util.Queue;
39 import java.util.Set;
40
41 import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
42 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_RESERVED_STREAMS;
43 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
46 import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
47 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
48 import static io.netty.handler.codec.http2.Http2Exception.streamError;
49 import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
50 import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
51 import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
52 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
53 import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
54 import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
55 import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
56 import static io.netty.util.internal.ObjectUtil.checkNotNull;
57 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
58 import static java.lang.Integer.MAX_VALUE;
59
60
61
62
63 public class DefaultHttp2Connection implements Http2Connection {
64 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
65
66 final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
67 final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry();
68 final ConnectionStream connectionStream = new ConnectionStream();
69 final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
70 final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
71
72
73
74
75
76
77
78
79
80 final List<Listener> listeners = new ArrayList<Listener>(4);
81 final ActiveStreams activeStreams;
82 Promise<Void> closePromise;
83
84
85
86
87
88 public DefaultHttp2Connection(boolean server) {
89 this(server, DEFAULT_MAX_RESERVED_STREAMS);
90 }
91
92
93
94
95
96
97 public DefaultHttp2Connection(boolean server, int maxReservedStreams) {
98 activeStreams = new ActiveStreams(listeners);
99
100
101
102
103
104 localEndpoint = new DefaultEndpoint<Http2LocalFlowController>(server, server ? MAX_VALUE : maxReservedStreams);
105 remoteEndpoint = new DefaultEndpoint<Http2RemoteFlowController>(!server, maxReservedStreams);
106
107
108 streamMap.put(connectionStream.id(), connectionStream);
109 }
110
111
112
113
114 final boolean isClosed() {
115 return closePromise != null;
116 }
117
118 @Override
119 public Future<Void> close(final Promise<Void> promise) {
120 checkNotNull(promise, "promise");
121
122
123 if (closePromise != null) {
124 if (closePromise == promise) {
125
126 } else if (promise instanceof ChannelPromise && ((ChannelFuture) closePromise).isVoid()) {
127 closePromise = promise;
128 } else {
129 PromiseNotifier.cascade(closePromise, promise);
130 }
131 } else {
132 closePromise = promise;
133 }
134 if (isStreamMapEmpty()) {
135 promise.trySuccess(null);
136 return promise;
137 }
138
139 Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator();
140
141
142 if (activeStreams.allowModifications()) {
143 activeStreams.incrementPendingIterations();
144 try {
145 while (itr.hasNext()) {
146 DefaultStream stream = (DefaultStream) itr.next().value();
147 if (stream.id() != CONNECTION_STREAM_ID) {
148
149
150
151 stream.close(itr);
152 }
153 }
154 } finally {
155 activeStreams.decrementPendingIterations();
156 }
157 } else {
158 while (itr.hasNext()) {
159 Http2Stream stream = itr.next().value();
160 if (stream.id() != CONNECTION_STREAM_ID) {
161
162
163 stream.close();
164 }
165 }
166 }
167 return closePromise;
168 }
169
170 @Override
171 public void addListener(Listener listener) {
172 listeners.add(listener);
173 }
174
175 @Override
176 public void removeListener(Listener listener) {
177 listeners.remove(listener);
178 }
179
180 @Override
181 public boolean isServer() {
182 return localEndpoint.isServer();
183 }
184
185 @Override
186 public Http2Stream connectionStream() {
187 return connectionStream;
188 }
189
190 @Override
191 public Http2Stream stream(int streamId) {
192 return streamMap.get(streamId);
193 }
194
195 @Override
196 public boolean streamMayHaveExisted(int streamId) {
197 return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId);
198 }
199
200 @Override
201 public int numActiveStreams() {
202 return activeStreams.size();
203 }
204
205 @Override
206 public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
207 return activeStreams.forEachActiveStream(visitor);
208 }
209
210 @Override
211 public Endpoint<Http2LocalFlowController> local() {
212 return localEndpoint;
213 }
214
215 @Override
216 public Endpoint<Http2RemoteFlowController> remote() {
217 return remoteEndpoint;
218 }
219
220 @Override
221 public boolean goAwayReceived() {
222 return localEndpoint.lastStreamKnownByPeer >= 0;
223 }
224
225 @Override
226 public void goAwayReceived(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
227 if (localEndpoint.lastStreamKnownByPeer() >= 0 && localEndpoint.lastStreamKnownByPeer() < lastKnownStream) {
228 throw connectionError(PROTOCOL_ERROR, "lastStreamId MUST NOT increase. Current value: %d new value: %d",
229 localEndpoint.lastStreamKnownByPeer(), lastKnownStream);
230 }
231
232 localEndpoint.lastStreamKnownByPeer(lastKnownStream);
233 for (int i = 0; i < listeners.size(); ++i) {
234 try {
235 listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData);
236 } catch (Throwable cause) {
237 logger.error("Caught Throwable from listener onGoAwayReceived.", cause);
238 }
239 }
240
241 closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, localEndpoint);
242 }
243
244 @Override
245 public boolean goAwaySent() {
246 return remoteEndpoint.lastStreamKnownByPeer >= 0;
247 }
248
249 @Override
250 public boolean goAwaySent(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
251 if (remoteEndpoint.lastStreamKnownByPeer() >= 0) {
252
253
254 if (lastKnownStream == remoteEndpoint.lastStreamKnownByPeer()) {
255 return false;
256 }
257 if (lastKnownStream > remoteEndpoint.lastStreamKnownByPeer()) {
258 throw connectionError(PROTOCOL_ERROR, "Last stream identifier must not increase between " +
259 "sending multiple GOAWAY frames (was '%d', is '%d').",
260 remoteEndpoint.lastStreamKnownByPeer(), lastKnownStream);
261 }
262 }
263
264 remoteEndpoint.lastStreamKnownByPeer(lastKnownStream);
265 for (int i = 0; i < listeners.size(); ++i) {
266 try {
267 listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData);
268 } catch (Throwable cause) {
269 logger.error("Caught Throwable from listener onGoAwaySent.", cause);
270 }
271 }
272
273 closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, remoteEndpoint);
274 return true;
275 }
276
277 private void closeStreamsGreaterThanLastKnownStreamId(final int lastKnownStream,
278 final DefaultEndpoint<?> endpoint) throws Http2Exception {
279 forEachActiveStream(new Http2StreamVisitor() {
280 @Override
281 public boolean visit(Http2Stream stream) {
282 if (stream.id() > lastKnownStream && endpoint.isValidStreamId(stream.id())) {
283 stream.close();
284 }
285 return true;
286 }
287 });
288 }
289
290
291
292
293 private boolean isStreamMapEmpty() {
294 return streamMap.size() == 1;
295 }
296
297
298
299
300
301
302
303 void removeStream(DefaultStream stream, Iterator<?> itr) {
304 final boolean removed;
305 if (itr == null) {
306 removed = streamMap.remove(stream.id()) != null;
307 } else {
308 itr.remove();
309 removed = true;
310 }
311
312 if (removed) {
313 for (int i = 0; i < listeners.size(); i++) {
314 try {
315 listeners.get(i).onStreamRemoved(stream);
316 } catch (Throwable cause) {
317 logger.error("Caught Throwable from listener onStreamRemoved.", cause);
318 }
319 }
320
321 if (closePromise != null && isStreamMapEmpty()) {
322 closePromise.trySuccess(null);
323 }
324 }
325 }
326
327 static State activeState(int streamId, State initialState, boolean isLocal, boolean halfClosed)
328 throws Http2Exception {
329 switch (initialState) {
330 case IDLE:
331 return halfClosed ? isLocal ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN;
332 case RESERVED_LOCAL:
333 return HALF_CLOSED_REMOTE;
334 case RESERVED_REMOTE:
335 return HALF_CLOSED_LOCAL;
336 default:
337 throw streamError(streamId, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: "
338 + initialState);
339 }
340 }
341
342 void notifyHalfClosed(Http2Stream stream) {
343 for (int i = 0; i < listeners.size(); i++) {
344 try {
345 listeners.get(i).onStreamHalfClosed(stream);
346 } catch (Throwable cause) {
347 logger.error("Caught Throwable from listener onStreamHalfClosed.", cause);
348 }
349 }
350 }
351
352 void notifyClosed(Http2Stream stream) {
353 for (int i = 0; i < listeners.size(); i++) {
354 try {
355 listeners.get(i).onStreamClosed(stream);
356 } catch (Throwable cause) {
357 logger.error("Caught Throwable from listener onStreamClosed.", cause);
358 }
359 }
360 }
361
362 @Override
363 public PropertyKey newKey() {
364 return propertyKeyRegistry.newKey();
365 }
366
367
368
369
370
371
372
373
374 final DefaultPropertyKey verifyKey(PropertyKey key) {
375 return checkNotNull((DefaultPropertyKey) key, "key").verifyConnection(this);
376 }
377
378
379
380
381 private class DefaultStream implements Http2Stream {
382 private static final byte META_STATE_SENT_RST = 1;
383 private static final byte META_STATE_SENT_HEADERS = 1 << 1;
384 private static final byte META_STATE_SENT_TRAILERS = 1 << 2;
385 private static final byte META_STATE_SENT_PUSHPROMISE = 1 << 3;
386 private static final byte META_STATE_RECV_HEADERS = 1 << 4;
387 private static final byte META_STATE_RECV_TRAILERS = 1 << 5;
388 private final int id;
389 private final long identity;
390 private final PropertyMap properties = new PropertyMap();
391 private State state;
392 private byte metaState;
393
394 DefaultStream(long identity, int id, State state) {
395 this.identity = identity;
396 this.id = id;
397 this.state = state;
398 }
399
400 @Override
401 public final int id() {
402 return id;
403 }
404
405 @Override
406 public final State state() {
407 return state;
408 }
409
410 @Override
411 public boolean isResetSent() {
412 return (metaState & META_STATE_SENT_RST) != 0;
413 }
414
415 @Override
416 public Http2Stream resetSent() {
417 metaState |= META_STATE_SENT_RST;
418 return this;
419 }
420
421 @Override
422 public Http2Stream headersSent(boolean isInformational) {
423 if (!isInformational) {
424 metaState |= isHeadersSent() ? META_STATE_SENT_TRAILERS : META_STATE_SENT_HEADERS;
425 }
426 return this;
427 }
428
429 @Override
430 public boolean isHeadersSent() {
431 return (metaState & META_STATE_SENT_HEADERS) != 0;
432 }
433
434 @Override
435 public boolean isTrailersSent() {
436 return (metaState & META_STATE_SENT_TRAILERS) != 0;
437 }
438
439 @Override
440 public Http2Stream headersReceived(boolean isInformational) {
441 if (!isInformational) {
442 metaState |= isHeadersReceived() ? META_STATE_RECV_TRAILERS : META_STATE_RECV_HEADERS;
443 }
444 return this;
445 }
446
447 @Override
448 public boolean isHeadersReceived() {
449 return (metaState & META_STATE_RECV_HEADERS) != 0;
450 }
451
452 @Override
453 public boolean isTrailersReceived() {
454 return (metaState & META_STATE_RECV_TRAILERS) != 0;
455 }
456
457 @Override
458 public Http2Stream pushPromiseSent() {
459 metaState |= META_STATE_SENT_PUSHPROMISE;
460 return this;
461 }
462
463 @Override
464 public boolean isPushPromiseSent() {
465 return (metaState & META_STATE_SENT_PUSHPROMISE) != 0;
466 }
467
468 @Override
469 public final <V> V setProperty(PropertyKey key, V value) {
470 return properties.add(verifyKey(key), value);
471 }
472
473 @Override
474 public final <V> V getProperty(PropertyKey key) {
475 return properties.get(verifyKey(key));
476 }
477
478 @Override
479 public final <V> V removeProperty(PropertyKey key) {
480 return properties.remove(verifyKey(key));
481 }
482
483 @Override
484 public Http2Stream open(boolean halfClosed) throws Http2Exception {
485 state = activeState(id, state, isLocal(), halfClosed);
486 final DefaultEndpoint<? extends Http2FlowController> endpoint = createdBy();
487 if (!endpoint.canOpenStream()) {
488 throw connectionError(PROTOCOL_ERROR, "Maximum active streams violated for this endpoint: " +
489 endpoint.maxActiveStreams());
490 }
491
492 activate();
493 return this;
494 }
495
496 void activate() {
497
498
499 if (state == HALF_CLOSED_LOCAL) {
500 headersSent( false);
501 } else if (state == HALF_CLOSED_REMOTE) {
502 headersReceived( false);
503 }
504 activeStreams.activate(this);
505 }
506
507 Http2Stream close(Iterator<?> itr) {
508 if (state == CLOSED) {
509 return this;
510 }
511
512 state = CLOSED;
513
514 --createdBy().numStreams;
515 activeStreams.deactivate(this, itr);
516 return this;
517 }
518
519 @Override
520 public Http2Stream close() {
521 return close(null);
522 }
523
524 @Override
525 public Http2Stream closeLocalSide() {
526 switch (state) {
527 case OPEN:
528 state = HALF_CLOSED_LOCAL;
529 notifyHalfClosed(this);
530 break;
531 case HALF_CLOSED_LOCAL:
532 break;
533 default:
534 close();
535 break;
536 }
537 return this;
538 }
539
540 @Override
541 public Http2Stream closeRemoteSide() {
542 switch (state) {
543 case OPEN:
544 state = HALF_CLOSED_REMOTE;
545 notifyHalfClosed(this);
546 break;
547 case HALF_CLOSED_REMOTE:
548 break;
549 default:
550 close();
551 break;
552 }
553 return this;
554 }
555
556 DefaultEndpoint<? extends Http2FlowController> createdBy() {
557 return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
558 }
559
560 final boolean isLocal() {
561 return localEndpoint.isValidStreamId(id);
562 }
563
564
565
566
567 private class PropertyMap {
568 Object[] values = EmptyArrays.EMPTY_OBJECTS;
569
570 <V> V add(DefaultPropertyKey key, V value) {
571 resizeIfNecessary(key.index);
572 @SuppressWarnings("unchecked")
573 V prevValue = (V) values[key.index];
574 values[key.index] = value;
575 return prevValue;
576 }
577
578 @SuppressWarnings("unchecked")
579 <V> V get(DefaultPropertyKey key) {
580 if (key.index >= values.length) {
581 return null;
582 }
583 return (V) values[key.index];
584 }
585
586 @SuppressWarnings("unchecked")
587 <V> V remove(DefaultPropertyKey key) {
588 V prevValue = null;
589 if (key.index < values.length) {
590 prevValue = (V) values[key.index];
591 values[key.index] = null;
592 }
593 return prevValue;
594 }
595
596 void resizeIfNecessary(int index) {
597 if (index >= values.length) {
598 values = Arrays.copyOf(values, propertyKeyRegistry.size());
599 }
600 }
601 }
602
603 @Override
604 public boolean equals(final Object obj) {
605 return super.equals(obj);
606 }
607
608 @Override
609 public int hashCode() {
610 long value = identity;
611 if (value == 0) {
612 return System.identityHashCode(this);
613 }
614 return (int) (value ^ (value >>> 32));
615 }
616
617 @Override
618 public String toString() {
619 return getClass().getSimpleName() +
620 "{id=" + id +
621 ", state=" + state +
622 ", metaState=" + metaState +
623 '}';
624 }
625 }
626
627
628
629
630 private final class ConnectionStream extends DefaultStream {
631 ConnectionStream() {
632 super(0, CONNECTION_STREAM_ID, IDLE);
633 }
634
635 @Override
636 public boolean isResetSent() {
637 return false;
638 }
639
640 @Override
641 DefaultEndpoint<? extends Http2FlowController> createdBy() {
642 return null;
643 }
644
645 @Override
646 public Http2Stream resetSent() {
647 throw new UnsupportedOperationException();
648 }
649
650 @Override
651 public Http2Stream open(boolean halfClosed) {
652 throw new UnsupportedOperationException();
653 }
654
655 @Override
656 public Http2Stream close() {
657 throw new UnsupportedOperationException();
658 }
659
660 @Override
661 public Http2Stream closeLocalSide() {
662 throw new UnsupportedOperationException();
663 }
664
665 @Override
666 public Http2Stream closeRemoteSide() {
667 throw new UnsupportedOperationException();
668 }
669
670 @Override
671 public Http2Stream headersSent(boolean isInformational) {
672 throw new UnsupportedOperationException();
673 }
674
675 @Override
676 public boolean isHeadersSent() {
677 throw new UnsupportedOperationException();
678 }
679
680 @Override
681 public Http2Stream pushPromiseSent() {
682 throw new UnsupportedOperationException();
683 }
684
685 @Override
686 public boolean isPushPromiseSent() {
687 throw new UnsupportedOperationException();
688 }
689 }
690
691
692
693
694 private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
695 private final boolean server;
696
697
698
699 private long lastCreatedStreamIdentity;
700
701
702
703
704
705 private int nextStreamIdToCreate;
706
707
708
709
710
711
712 private int nextReservationStreamId;
713 private int lastStreamKnownByPeer = -1;
714 private boolean pushToAllowed;
715 private F flowController;
716 private int maxStreams;
717 private int maxActiveStreams;
718 private final int maxReservedStreams;
719
720 int numActiveStreams;
721 int numStreams;
722
723 DefaultEndpoint(boolean server, int maxReservedStreams) {
724 this.lastCreatedStreamIdentity = 0;
725 this.server = server;
726
727
728
729
730
731 if (server) {
732 nextStreamIdToCreate = 2;
733 nextReservationStreamId = 0;
734 } else {
735 nextStreamIdToCreate = 1;
736
737 nextReservationStreamId = 1;
738 }
739
740
741 pushToAllowed = !server;
742 maxActiveStreams = MAX_VALUE;
743 this.maxReservedStreams = checkPositiveOrZero(maxReservedStreams, "maxReservedStreams");
744 updateMaxStreams();
745 }
746
747 @Override
748 public int incrementAndGetNextStreamId() {
749 return nextReservationStreamId >= 0 ? nextReservationStreamId += 2 : nextReservationStreamId;
750 }
751
752 private void incrementExpectedStreamId(int streamId) {
753 if (streamId > nextReservationStreamId && nextReservationStreamId >= 0) {
754 nextReservationStreamId = streamId;
755 }
756 nextStreamIdToCreate = streamId + 2;
757 ++numStreams;
758 }
759
760 @Override
761 public boolean isValidStreamId(int streamId) {
762 return streamId > 0 && server == ((streamId & 1) == 0);
763 }
764
765 @Override
766 public boolean mayHaveCreatedStream(int streamId) {
767 return isValidStreamId(streamId) && streamId <= lastStreamCreated();
768 }
769
770 @Override
771 public boolean canOpenStream() {
772 return numActiveStreams < maxActiveStreams;
773 }
774
775 @Override
776 public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
777 State state = activeState(streamId, IDLE, isLocal(), halfClosed);
778
779 checkNewStreamAllowed(streamId, state);
780
781 lastCreatedStreamIdentity++;
782
783
784 DefaultStream stream = new DefaultStream(lastCreatedStreamIdentity, streamId, state);
785
786 incrementExpectedStreamId(streamId);
787
788 addStream(stream);
789
790 stream.activate();
791 return stream;
792 }
793
794 @Override
795 public boolean created(Http2Stream stream) {
796 return stream instanceof DefaultStream && ((DefaultStream) stream).createdBy() == this;
797 }
798
799 @Override
800 public boolean isServer() {
801 return server;
802 }
803
804 @Override
805 public DefaultStream reservePushStream(int streamId, Http2Stream parent) throws Http2Exception {
806 if (parent == null) {
807 throw connectionError(PROTOCOL_ERROR, "Parent stream missing");
808 }
809 if (isLocal() ? !parent.state().localSideOpen() : !parent.state().remoteSideOpen()) {
810 throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id());
811 }
812 if (!opposite().allowPushTo()) {
813 throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint");
814 }
815 State state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE;
816 checkNewStreamAllowed(streamId, state);
817
818 lastCreatedStreamIdentity++;
819
820
821 DefaultStream stream = new DefaultStream(lastCreatedStreamIdentity, streamId, state);
822
823 incrementExpectedStreamId(streamId);
824
825
826 addStream(stream);
827 return stream;
828 }
829
830 private void addStream(DefaultStream stream) {
831
832 streamMap.put(stream.id(), stream);
833
834
835 for (int i = 0; i < listeners.size(); i++) {
836 try {
837 listeners.get(i).onStreamAdded(stream);
838 } catch (Throwable cause) {
839 logger.error("Caught Throwable from listener onStreamAdded.", cause);
840 }
841 }
842 }
843
844 @Override
845 public void allowPushTo(boolean allow) {
846 if (allow && server) {
847 throw new IllegalArgumentException("Servers do not allow push");
848 }
849 pushToAllowed = allow;
850 }
851
852 @Override
853 public boolean allowPushTo() {
854 return pushToAllowed;
855 }
856
857 @Override
858 public int numActiveStreams() {
859 return numActiveStreams;
860 }
861
862 @Override
863 public int maxActiveStreams() {
864 return maxActiveStreams;
865 }
866
867 @Override
868 public void maxActiveStreams(int maxActiveStreams) {
869 this.maxActiveStreams = maxActiveStreams;
870 updateMaxStreams();
871 }
872
873 @Override
874 public int lastStreamCreated() {
875
876
877
878
879 return Math.max(0, nextStreamIdToCreate - 2);
880 }
881
882 @Override
883 public int lastStreamKnownByPeer() {
884 return lastStreamKnownByPeer;
885 }
886
887 private void lastStreamKnownByPeer(int lastKnownStream) {
888 lastStreamKnownByPeer = lastKnownStream;
889 }
890
891 @Override
892 public F flowController() {
893 return flowController;
894 }
895
896 @Override
897 public void flowController(F flowController) {
898 this.flowController = checkNotNull(flowController, "flowController");
899 }
900
901 @Override
902 public Endpoint<? extends Http2FlowController> opposite() {
903 return isLocal() ? remoteEndpoint : localEndpoint;
904 }
905
906 private void updateMaxStreams() {
907 maxStreams = (int) Math.min(MAX_VALUE, (long) maxActiveStreams + maxReservedStreams);
908 }
909
910 private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
911 assert state != IDLE;
912 if (lastStreamKnownByPeer >= 0 && streamId > lastStreamKnownByPeer) {
913 throw streamError(streamId, REFUSED_STREAM,
914 "Cannot create stream %d greater than Last-Stream-ID %d from GOAWAY.",
915 streamId, lastStreamKnownByPeer);
916 }
917 if (!isValidStreamId(streamId)) {
918 if (streamId < 0) {
919 throw new Http2NoMoreStreamIdsException();
920 }
921 throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId,
922 server ? "server" : "client");
923 }
924
925
926 if (streamId < nextStreamIdToCreate) {
927 throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
928 streamId, nextStreamIdToCreate);
929 }
930 if (nextStreamIdToCreate <= 0) {
931
932
933 throw new Http2Exception(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.",
934 Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN);
935 }
936 boolean isReserved = state == RESERVED_LOCAL || state == RESERVED_REMOTE;
937 if (!isReserved && !canOpenStream() || isReserved && numStreams >= maxStreams) {
938 throw streamError(streamId, REFUSED_STREAM, "Maximum active streams violated for this endpoint: " +
939 (isReserved ? maxStreams : maxActiveStreams));
940 }
941 if (isClosed()) {
942 throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed",
943 streamId);
944 }
945 }
946
947 private boolean isLocal() {
948 return this == localEndpoint;
949 }
950 }
951
952
953
954
955
956 interface Event {
957
958
959
960
961
962
963 void process();
964 }
965
966
967
968
969
970 private final class ActiveStreams {
971 private final List<Listener> listeners;
972 private final Queue<Event> pendingEvents = new ArrayDeque<Event>(4);
973 private final Set<Http2Stream> streams = new LinkedHashSet<Http2Stream>();
974 private int pendingIterations;
975
976 ActiveStreams(List<Listener> listeners) {
977 this.listeners = listeners;
978 }
979
980 public int size() {
981 return streams.size();
982 }
983
984 public void activate(final DefaultStream stream) {
985 if (allowModifications()) {
986 addToActiveStreams(stream);
987 } else {
988 pendingEvents.add(new Event() {
989 @Override
990 public void process() {
991 addToActiveStreams(stream);
992 }
993 });
994 }
995 }
996
997 public void deactivate(final DefaultStream stream, final Iterator<?> itr) {
998 if (allowModifications() || itr != null) {
999 removeFromActiveStreams(stream, itr);
1000 } else {
1001 pendingEvents.add(new Event() {
1002 @Override
1003 public void process() {
1004 removeFromActiveStreams(stream, itr);
1005 }
1006 });
1007 }
1008 }
1009
1010 public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
1011 incrementPendingIterations();
1012 try {
1013 for (Http2Stream stream : streams) {
1014 if (!visitor.visit(stream)) {
1015 return stream;
1016 }
1017 }
1018 return null;
1019 } finally {
1020 decrementPendingIterations();
1021 }
1022 }
1023
1024 void addToActiveStreams(DefaultStream stream) {
1025 if (streams.add(stream)) {
1026
1027 stream.createdBy().numActiveStreams++;
1028
1029 for (int i = 0; i < listeners.size(); i++) {
1030 try {
1031 listeners.get(i).onStreamActive(stream);
1032 } catch (Throwable cause) {
1033 logger.error("Caught Throwable from listener onStreamActive.", cause);
1034 }
1035 }
1036 }
1037 }
1038
1039 void removeFromActiveStreams(DefaultStream stream, Iterator<?> itr) {
1040 if (streams.remove(stream)) {
1041
1042 stream.createdBy().numActiveStreams--;
1043 notifyClosed(stream);
1044 }
1045 removeStream(stream, itr);
1046 }
1047
1048 boolean allowModifications() {
1049 return pendingIterations == 0;
1050 }
1051
1052 void incrementPendingIterations() {
1053 ++pendingIterations;
1054 }
1055
1056 void decrementPendingIterations() {
1057 --pendingIterations;
1058 if (allowModifications()) {
1059 for (;;) {
1060 Event event = pendingEvents.poll();
1061 if (event == null) {
1062 break;
1063 }
1064 try {
1065 event.process();
1066 } catch (Throwable cause) {
1067 logger.error("Caught Throwable while processing pending ActiveStreams$Event.", cause);
1068 }
1069 }
1070 }
1071 }
1072 }
1073
1074
1075
1076
1077 final class DefaultPropertyKey implements PropertyKey {
1078 final int index;
1079
1080 DefaultPropertyKey(int index) {
1081 this.index = index;
1082 }
1083
1084 DefaultPropertyKey verifyConnection(Http2Connection connection) {
1085 if (connection != DefaultHttp2Connection.this) {
1086 throw new IllegalArgumentException("Using a key that was not created by this connection");
1087 }
1088 return this;
1089 }
1090 }
1091
1092
1093
1094
1095 private final class PropertyKeyRegistry {
1096
1097
1098
1099
1100
1101 final List<DefaultPropertyKey> keys = new ArrayList<DefaultPropertyKey>(4);
1102
1103
1104
1105
1106 DefaultPropertyKey newKey() {
1107 DefaultPropertyKey key = new DefaultPropertyKey(keys.size());
1108 keys.add(key);
1109 return key;
1110 }
1111
1112 int size() {
1113 return keys.size();
1114 }
1115 }
1116 }