View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty5.handler.codec.http2;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.handler.codec.http2.Http2Stream.State;
20  import io.netty5.util.collection.IntObjectHashMap;
21  import io.netty5.util.collection.IntObjectMap;
22  import io.netty5.util.collection.IntObjectMap.PrimitiveEntry;
23  import io.netty5.util.concurrent.Promise;
24  import io.netty5.util.internal.EmptyArrays;
25  import io.netty5.util.internal.UnstableApi;
26  import io.netty5.util.internal.logging.InternalLogger;
27  import io.netty5.util.internal.logging.InternalLoggerFactory;
28  
29  import java.util.ArrayDeque;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Iterator;
33  import java.util.LinkedHashSet;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.Set;
37  
38  import static io.netty5.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
39  import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_RESERVED_STREAMS;
40  import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
41  import static io.netty5.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
42  import static io.netty5.handler.codec.http2.Http2Error.REFUSED_STREAM;
43  import static io.netty5.handler.codec.http2.Http2Exception.closedStreamError;
44  import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
45  import static io.netty5.handler.codec.http2.Http2Exception.streamError;
46  import static io.netty5.handler.codec.http2.Http2Stream.State.CLOSED;
47  import static io.netty5.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
48  import static io.netty5.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
49  import static io.netty5.handler.codec.http2.Http2Stream.State.IDLE;
50  import static io.netty5.handler.codec.http2.Http2Stream.State.OPEN;
51  import static io.netty5.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
52  import static io.netty5.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
53  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
54  import static java.lang.Integer.MAX_VALUE;
55  import static java.util.Objects.requireNonNull;
56  
57  /**
58   * Simple implementation of {@link Http2Connection}.
59   */
60  @UnstableApi
61  public class DefaultHttp2Connection implements Http2Connection {
62      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
63      // Fields accessed by inner classes
64      final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
65      final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry();
66      final ConnectionStream connectionStream = new ConnectionStream();
67      final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
68      final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
69  
70      /**
71       * We chose a {@link List} over a {@link Set} to avoid allocating an {@link Iterator} objects when iterating over
72       * the listeners.
73       * <p>
74       * Initial size of 4 because the default configuration currently has 3 listeners
75       * (local/remote flow controller and {@link StreamByteDistributor}) and we leave room for 1 extra.
76       * We could be more aggressive but the ArrayList resize will double the size if we are too small.
77       */
78      final List<Listener> listeners = new ArrayList<>(4);
79      final ActiveStreams activeStreams;
80      Promise<Void> closePromise;
81  
82      /**
83       * Creates a new connection with the given settings.
84       * @param server whether or not this end-point is the server-side of the HTTP/2 connection.
85       */
86      public DefaultHttp2Connection(boolean server) {
87          this(server, DEFAULT_MAX_RESERVED_STREAMS);
88      }
89  
90      /**
91       * Creates a new connection with the given settings.
92       * @param server whether or not this end-point is the server-side of the HTTP/2 connection.
93       * @param maxReservedStreams The maximum amount of streams which can exist in the reserved state for each endpoint.
94       */
95      public DefaultHttp2Connection(boolean server, int maxReservedStreams) {
96          activeStreams = new ActiveStreams(listeners);
97          // Reserved streams are excluded from the SETTINGS_MAX_CONCURRENT_STREAMS limit according to [1] and the RFC
98          // doesn't define a way to communicate the limit on reserved streams. We rely upon the peer to send RST_STREAM
99          // in response to any locally enforced limits being exceeded [2].
100         // [1] https://tools.ietf.org/html/rfc7540#section-5.1.2
101         // [2] https://tools.ietf.org/html/rfc7540#section-8.2.2
102         localEndpoint = new DefaultEndpoint<>(server, server ? MAX_VALUE : maxReservedStreams);
103         remoteEndpoint = new DefaultEndpoint<>(!server, maxReservedStreams);
104 
105         // Add the connection stream to the map.
106         streamMap.put(connectionStream.id(), connectionStream);
107     }
108 
109     /**
110      * Determine if {@link #close(Promise)} has been called and no more streams are allowed to be created.
111      */
112     final boolean isClosed() {
113         return closePromise != null;
114     }
115 
116     @Override
117     public void close(final Promise<Void> promise) {
118         requireNonNull(promise, "promise");
119         // Since we allow this method to be called multiple times, we must make sure that all the promises are notified
120         // when all streams are removed and the close operation completes.
121         if (closePromise != null) {
122             if (closePromise == promise) {
123                 // Do nothing
124             } else {
125                 closePromise.asFuture().cascadeTo(promise);
126             }
127         } else {
128             closePromise = promise;
129         }
130         if (isStreamMapEmpty()) {
131             promise.trySuccess(null);
132             return;
133         }
134 
135         Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator();
136         // We must take care while iterating the streamMap as to not modify while iterating in case there are other code
137         // paths iterating over the active streams.
138         if (activeStreams.allowModifications()) {
139             activeStreams.incrementPendingIterations();
140             try {
141                 while (itr.hasNext()) {
142                     DefaultStream stream = (DefaultStream) itr.next().value();
143                     if (stream.id() != CONNECTION_STREAM_ID) {
144                         // If modifications of the activeStream map is allowed, then a stream close operation will also
145                         // modify the streamMap. Pass the iterator in so that remove will be called to prevent
146                         // concurrent modification exceptions.
147                         stream.close(itr);
148                     }
149                 }
150             } finally {
151                 activeStreams.decrementPendingIterations();
152             }
153         } else {
154             while (itr.hasNext()) {
155                 Http2Stream stream = itr.next().value();
156                 if (stream.id() != CONNECTION_STREAM_ID) {
157                     // We are not allowed to make modifications, so the close calls will be executed after this
158                     // iteration completes.
159                     stream.close();
160                 }
161             }
162         }
163     }
164 
165     @Override
166     public void addListener(Listener listener) {
167         listeners.add(listener);
168     }
169 
170     @Override
171     public void removeListener(Listener listener) {
172         listeners.remove(listener);
173     }
174 
175     @Override
176     public boolean isServer() {
177         return localEndpoint.isServer();
178     }
179 
180     @Override
181     public Http2Stream connectionStream() {
182         return connectionStream;
183     }
184 
185     @Override
186     public Http2Stream stream(int streamId) {
187         return streamMap.get(streamId);
188     }
189 
190     @Override
191     public boolean streamMayHaveExisted(int streamId) {
192         return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId);
193     }
194 
195     @Override
196     public int numActiveStreams() {
197         return activeStreams.size();
198     }
199 
200     @Override
201     public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
202         return activeStreams.forEachActiveStream(visitor);
203     }
204 
205     @Override
206     public Endpoint<Http2LocalFlowController> local() {
207         return localEndpoint;
208     }
209 
210     @Override
211     public Endpoint<Http2RemoteFlowController> remote() {
212         return remoteEndpoint;
213     }
214 
215     @Override
216     public boolean goAwayReceived() {
217         return localEndpoint.lastStreamKnownByPeer >= 0;
218     }
219 
220     @Override
221     public void goAwayReceived(final int lastKnownStream, long errorCode, Buffer debugData) throws Http2Exception {
222         if (localEndpoint.lastStreamKnownByPeer() >= 0 && localEndpoint.lastStreamKnownByPeer() < lastKnownStream) {
223             throw connectionError(PROTOCOL_ERROR, "lastStreamId MUST NOT increase. Current value: %d new value: %d",
224                     localEndpoint.lastStreamKnownByPeer(), lastKnownStream);
225         }
226 
227         localEndpoint.lastStreamKnownByPeer(lastKnownStream);
228         for (int i = 0; i < listeners.size(); ++i) {
229             try {
230                 listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData);
231             } catch (Throwable cause) {
232                 logger.error("Caught Throwable from listener onGoAwayReceived.", cause);
233             }
234         }
235 
236         closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, localEndpoint);
237     }
238 
239     @Override
240     public boolean goAwaySent() {
241         return remoteEndpoint.lastStreamKnownByPeer >= 0;
242     }
243 
244     @Override
245     public boolean goAwaySent(final int lastKnownStream, long errorCode, Buffer debugData) throws Http2Exception {
246         if (remoteEndpoint.lastStreamKnownByPeer() >= 0) {
247             // Protect against re-entrancy. Could happen if writing the frame fails, and error handling
248             // treating this is a connection handler and doing a graceful shutdown...
249             if (lastKnownStream == remoteEndpoint.lastStreamKnownByPeer()) {
250                 return false;
251             }
252             if (lastKnownStream > remoteEndpoint.lastStreamKnownByPeer()) {
253                 throw connectionError(PROTOCOL_ERROR, "Last stream identifier must not increase between " +
254                                 "sending multiple GOAWAY frames (was '%d', is '%d').",
255                         remoteEndpoint.lastStreamKnownByPeer(), lastKnownStream);
256             }
257         }
258 
259         remoteEndpoint.lastStreamKnownByPeer(lastKnownStream);
260         for (int i = 0; i < listeners.size(); ++i) {
261             try {
262                 listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData);
263             } catch (Throwable cause) {
264                 logger.error("Caught Throwable from listener onGoAwaySent.", cause);
265             }
266         }
267 
268         closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, remoteEndpoint);
269         return true;
270     }
271 
272     private void closeStreamsGreaterThanLastKnownStreamId(final int lastKnownStream,
273                                                           final DefaultEndpoint<?> endpoint) throws Http2Exception {
274         forEachActiveStream(stream -> {
275             if (stream.id() > lastKnownStream && endpoint.isValidStreamId(stream.id())) {
276                 stream.close();
277             }
278             return true;
279         });
280     }
281 
282     /**
283      * Determine if {@link #streamMap} only contains the connection stream.
284      */
285     private boolean isStreamMapEmpty() {
286         return streamMap.size() == 1;
287     }
288 
289     /**
290      * Remove a stream from the {@link #streamMap}.
291      * @param stream the stream to remove.
292      * @param itr an iterator that may be pointing to the stream during iteration and {@link Iterator#remove()} will be
293      * used if non-{@code null}.
294      */
295     void removeStream(DefaultStream stream, Iterator<?> itr) {
296         final boolean removed;
297         if (itr == null) {
298             removed = streamMap.remove(stream.id()) != null;
299         } else {
300             itr.remove();
301             removed = true;
302         }
303 
304         if (removed) {
305             for (int i = 0; i < listeners.size(); i++) {
306                 try {
307                     listeners.get(i).onStreamRemoved(stream);
308                 } catch (Throwable cause) {
309                     logger.error("Caught Throwable from listener onStreamRemoved.", cause);
310                 }
311             }
312 
313             if (closePromise != null && isStreamMapEmpty()) {
314                 closePromise.trySuccess(null);
315             }
316         }
317     }
318 
319     static State activeState(int streamId, State initialState, boolean isLocal, boolean halfClosed)
320             throws Http2Exception {
321         switch (initialState) {
322         case IDLE:
323             return halfClosed ? isLocal ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN;
324         case RESERVED_LOCAL:
325             return HALF_CLOSED_REMOTE;
326         case RESERVED_REMOTE:
327             return HALF_CLOSED_LOCAL;
328         default:
329             throw streamError(streamId, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: "
330                     + initialState);
331         }
332     }
333 
334     void notifyHalfClosed(Http2Stream stream) {
335         for (int i = 0; i < listeners.size(); i++) {
336             try {
337                 listeners.get(i).onStreamHalfClosed(stream);
338             } catch (Throwable cause) {
339                 logger.error("Caught Throwable from listener onStreamHalfClosed.", cause);
340             }
341         }
342     }
343 
344     void notifyClosed(Http2Stream stream) {
345         for (int i = 0; i < listeners.size(); i++) {
346             try {
347                 listeners.get(i).onStreamClosed(stream);
348             } catch (Throwable cause) {
349                 logger.error("Caught Throwable from listener onStreamClosed.", cause);
350             }
351         }
352     }
353 
354     @Override
355     public PropertyKey newKey() {
356         return propertyKeyRegistry.newKey();
357     }
358 
359     /**
360      * Verifies that the key is valid and returns it as the internal {@link DefaultPropertyKey} type.
361      *
362      * @throws NullPointerException if the key is {@code null}.
363      * @throws ClassCastException if the key is not of type {@link DefaultPropertyKey}.
364      * @throws IllegalArgumentException if the key was not created by this connection.
365      */
366     final DefaultPropertyKey verifyKey(PropertyKey key) {
367         return requireNonNull((DefaultPropertyKey) key, "key").verifyConnection(this);
368     }
369 
370     /**
371      * Simple stream implementation. Streams can be compared to each other by priority.
372      */
373     private class DefaultStream implements Http2Stream {
374         private static final byte META_STATE_SENT_RST = 1;
375         private static final byte META_STATE_SENT_HEADERS = 1 << 1;
376         private static final byte META_STATE_SENT_TRAILERS = 1 << 2;
377         private static final byte META_STATE_SENT_PUSHPROMISE = 1 << 3;
378         private static final byte META_STATE_RECV_HEADERS = 1 << 4;
379         private static final byte META_STATE_RECV_TRAILERS = 1 << 5;
380         private final int id;
381         private final PropertyMap properties = new PropertyMap();
382         private State state;
383         private byte metaState;
384 
385         DefaultStream(int id, State state) {
386             this.id = id;
387             this.state = state;
388         }
389 
390         @Override
391         public final int id() {
392             return id;
393         }
394 
395         @Override
396         public final State state() {
397             return state;
398         }
399 
400         @Override
401         public boolean isResetSent() {
402             return (metaState & META_STATE_SENT_RST) != 0;
403         }
404 
405         @Override
406         public Http2Stream resetSent() {
407             metaState |= META_STATE_SENT_RST;
408             return this;
409         }
410 
411         @Override
412         public Http2Stream headersSent(boolean isInformational) {
413             if (!isInformational) {
414                 metaState |= isHeadersSent() ? META_STATE_SENT_TRAILERS : META_STATE_SENT_HEADERS;
415             }
416             return this;
417         }
418 
419         @Override
420         public boolean isHeadersSent() {
421             return (metaState & META_STATE_SENT_HEADERS) != 0;
422         }
423 
424         @Override
425         public boolean isTrailersSent() {
426             return (metaState & META_STATE_SENT_TRAILERS) != 0;
427         }
428 
429         @Override
430         public Http2Stream headersReceived(boolean isInformational) {
431             if (!isInformational) {
432                 metaState |= isHeadersReceived() ? META_STATE_RECV_TRAILERS : META_STATE_RECV_HEADERS;
433             }
434             return this;
435         }
436 
437         @Override
438         public boolean isHeadersReceived() {
439             return (metaState & META_STATE_RECV_HEADERS) != 0;
440         }
441 
442         @Override
443         public boolean isTrailersReceived() {
444             return (metaState & META_STATE_RECV_TRAILERS) != 0;
445         }
446 
447         @Override
448         public Http2Stream pushPromiseSent() {
449             metaState |= META_STATE_SENT_PUSHPROMISE;
450             return this;
451         }
452 
453         @Override
454         public boolean isPushPromiseSent() {
455             return (metaState & META_STATE_SENT_PUSHPROMISE) != 0;
456         }
457 
458         @Override
459         public final <V> V setProperty(PropertyKey key, V value) {
460             return properties.add(verifyKey(key), value);
461         }
462 
463         @Override
464         public final <V> V getProperty(PropertyKey key) {
465             return properties.get(verifyKey(key));
466         }
467 
468         @Override
469         public final <V> V removeProperty(PropertyKey key) {
470             return properties.remove(verifyKey(key));
471         }
472 
473         @Override
474         public Http2Stream open(boolean halfClosed) throws Http2Exception {
475             state = activeState(id, state, isLocal(), halfClosed);
476             final DefaultEndpoint<? extends Http2FlowController> endpoint = createdBy();
477             if (!endpoint.canOpenStream()) {
478                 throw connectionError(PROTOCOL_ERROR, "Maximum active streams violated for this endpoint: " +
479                         endpoint.maxActiveStreams());
480             }
481 
482             activate();
483             return this;
484         }
485 
486         void activate() {
487             // If the stream is opened in a half-closed state, the headers must have either
488             // been sent if this is a local stream, or received if it is a remote stream.
489             if (state == HALF_CLOSED_LOCAL) {
490                 headersSent(/*isInformational*/ false);
491             } else if (state == HALF_CLOSED_REMOTE) {
492                 headersReceived(/*isInformational*/ false);
493             }
494             activeStreams.activate(this);
495         }
496 
497         Http2Stream close(Iterator<?> itr) {
498             if (state == CLOSED) {
499                 return this;
500             }
501 
502             state = CLOSED;
503 
504             --createdBy().numStreams;
505             activeStreams.deactivate(this, itr);
506             return this;
507         }
508 
509         @Override
510         public Http2Stream close() {
511             return close(null);
512         }
513 
514         @Override
515         public Http2Stream closeLocalSide() {
516             switch (state) {
517             case OPEN:
518                 state = HALF_CLOSED_LOCAL;
519                 notifyHalfClosed(this);
520                 break;
521             case HALF_CLOSED_LOCAL:
522                 break;
523             default:
524                 close();
525                 break;
526             }
527             return this;
528         }
529 
530         @Override
531         public Http2Stream closeRemoteSide() {
532             switch (state) {
533             case OPEN:
534                 state = HALF_CLOSED_REMOTE;
535                 notifyHalfClosed(this);
536                 break;
537             case HALF_CLOSED_REMOTE:
538                 break;
539             default:
540                 close();
541                 break;
542             }
543             return this;
544         }
545 
546         DefaultEndpoint<? extends Http2FlowController> createdBy() {
547             return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
548         }
549 
550         final boolean isLocal() {
551             return localEndpoint.isValidStreamId(id);
552         }
553 
554         /**
555          * Provides the lazy initialization for the {@link DefaultStream} data map.
556          */
557         private class PropertyMap {
558             Object[] values = EmptyArrays.EMPTY_OBJECTS;
559 
560             <V> V add(DefaultPropertyKey key, V value) {
561                 resizeIfNecessary(key.index);
562                 @SuppressWarnings("unchecked")
563                 V prevValue = (V) values[key.index];
564                 values[key.index] = value;
565                 return prevValue;
566             }
567 
568             @SuppressWarnings("unchecked")
569             <V> V get(DefaultPropertyKey key) {
570                 if (key.index >= values.length) {
571                     return null;
572                 }
573                 return (V) values[key.index];
574             }
575 
576             @SuppressWarnings("unchecked")
577             <V> V remove(DefaultPropertyKey key) {
578                 V prevValue = null;
579                 if (key.index < values.length) {
580                     prevValue = (V) values[key.index];
581                     values[key.index] = null;
582                 }
583                 return prevValue;
584             }
585 
586             void resizeIfNecessary(int index) {
587                 if (index >= values.length) {
588                     values = Arrays.copyOf(values, propertyKeyRegistry.size());
589                 }
590             }
591         }
592     }
593 
594     /**
595      * Stream class representing the connection, itself.
596      */
597     private final class ConnectionStream extends DefaultStream {
598         ConnectionStream() {
599             super(CONNECTION_STREAM_ID, IDLE);
600         }
601 
602         @Override
603         public boolean isResetSent() {
604             return false;
605         }
606 
607         @Override
608         DefaultEndpoint<? extends Http2FlowController> createdBy() {
609             return null;
610         }
611 
612         @Override
613         public Http2Stream resetSent() {
614             throw new UnsupportedOperationException();
615         }
616 
617         @Override
618         public Http2Stream open(boolean halfClosed) {
619             throw new UnsupportedOperationException();
620         }
621 
622         @Override
623         public Http2Stream close() {
624             throw new UnsupportedOperationException();
625         }
626 
627         @Override
628         public Http2Stream closeLocalSide() {
629             throw new UnsupportedOperationException();
630         }
631 
632         @Override
633         public Http2Stream closeRemoteSide() {
634             throw new UnsupportedOperationException();
635         }
636 
637         @Override
638         public Http2Stream headersSent(boolean isInformational) {
639             throw new UnsupportedOperationException();
640         }
641 
642         @Override
643         public boolean isHeadersSent() {
644             throw new UnsupportedOperationException();
645         }
646 
647         @Override
648         public Http2Stream pushPromiseSent() {
649             throw new UnsupportedOperationException();
650         }
651 
652         @Override
653         public boolean isPushPromiseSent() {
654             throw new UnsupportedOperationException();
655         }
656     }
657 
658     /**
659      * Simple endpoint implementation.
660      */
661     private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
662         private final boolean server;
663         /**
664          * The minimum stream ID allowed when creating the next stream. This only applies at the time the stream is
665          * created. If the ID of the stream being created is less than this value, stream creation will fail. Upon
666          * successful creation of a stream, this value is incremented to the next valid stream ID.
667          */
668         private int nextStreamIdToCreate;
669         /**
670          * Used for reservation of stream IDs. Stream IDs can be reserved in advance by applications before the streams
671          * are actually created.  For example, applications may choose to buffer stream creation attempts as a way of
672          * working around {@code SETTINGS_MAX_CONCURRENT_STREAMS}, in which case they will reserve stream IDs for each
673          * buffered stream.
674          */
675         private int nextReservationStreamId;
676         private int lastStreamKnownByPeer = -1;
677         private boolean pushToAllowed;
678         private F flowController;
679         private int maxStreams;
680         private int maxActiveStreams;
681         private final int maxReservedStreams;
682         // Fields accessed by inner classes
683         int numActiveStreams;
684         int numStreams;
685 
686         DefaultEndpoint(boolean server, int maxReservedStreams) {
687             this.server = server;
688 
689             // Determine the starting stream ID for this endpoint. Client-initiated streams
690             // are odd and server-initiated streams are even. Zero is reserved for the
691             // connection. Stream 1 is reserved client-initiated stream for responding to an
692             // upgrade from HTTP 1.1.
693             if (server) {
694                 nextStreamIdToCreate = 2;
695                 nextReservationStreamId = 0;
696             } else {
697                 nextStreamIdToCreate = 1;
698                 // For manually created client-side streams, 1 is reserved for HTTP upgrade, so start at 3.
699                 nextReservationStreamId = 1;
700             }
701 
702             // Push is disallowed by default for servers and allowed for clients.
703             pushToAllowed = !server;
704             maxActiveStreams = MAX_VALUE;
705             this.maxReservedStreams = checkPositiveOrZero(maxReservedStreams, "maxReservedStreams");
706             updateMaxStreams();
707         }
708 
709         @Override
710         public int incrementAndGetNextStreamId() {
711             return nextReservationStreamId >= 0 ? nextReservationStreamId += 2 : nextReservationStreamId;
712         }
713 
714         private void incrementExpectedStreamId(int streamId) {
715             if (streamId > nextReservationStreamId && nextReservationStreamId >= 0) {
716                 nextReservationStreamId = streamId;
717             }
718             nextStreamIdToCreate = streamId + 2;
719             ++numStreams;
720         }
721 
722         @Override
723         public boolean isValidStreamId(int streamId) {
724             return streamId > 0 && server == ((streamId & 1) == 0);
725         }
726 
727         @Override
728         public boolean mayHaveCreatedStream(int streamId) {
729             return isValidStreamId(streamId) && streamId <= lastStreamCreated();
730         }
731 
732         @Override
733         public boolean canOpenStream() {
734             return numActiveStreams < maxActiveStreams;
735         }
736 
737         @Override
738         public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
739             State state = activeState(streamId, IDLE, isLocal(), halfClosed);
740 
741             checkNewStreamAllowed(streamId, state);
742 
743             // Create and initialize the stream.
744             DefaultStream stream = new DefaultStream(streamId, state);
745 
746             incrementExpectedStreamId(streamId);
747 
748             addStream(stream);
749 
750             stream.activate();
751             return stream;
752         }
753 
754         @Override
755         public boolean created(Http2Stream stream) {
756             return stream instanceof DefaultStream && ((DefaultStream) stream).createdBy() == this;
757         }
758 
759         @Override
760         public boolean isServer() {
761             return server;
762         }
763 
764         @Override
765         public DefaultStream reservePushStream(int streamId, Http2Stream parent) throws Http2Exception {
766             if (parent == null) {
767                 throw connectionError(PROTOCOL_ERROR, "Parent stream missing");
768             }
769             if (isLocal() ? !parent.state().localSideOpen() : !parent.state().remoteSideOpen()) {
770                 throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id());
771             }
772             if (!opposite().allowPushTo()) {
773                 throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint");
774             }
775             State state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE;
776             checkNewStreamAllowed(streamId, state);
777 
778             // Create and initialize the stream.
779             DefaultStream stream = new DefaultStream(streamId, state);
780 
781             incrementExpectedStreamId(streamId);
782 
783             // Register the stream.
784             addStream(stream);
785             return stream;
786         }
787 
788         private void addStream(DefaultStream stream) {
789             // Add the stream to the map and priority tree.
790             streamMap.put(stream.id(), stream);
791 
792             // Notify the listeners of the event.
793             for (int i = 0; i < listeners.size(); i++) {
794                 try {
795                     listeners.get(i).onStreamAdded(stream);
796                 } catch (Throwable cause) {
797                     logger.error("Caught Throwable from listener onStreamAdded.", cause);
798                 }
799             }
800         }
801 
802         @Override
803         public void allowPushTo(boolean allow) {
804             if (allow && server) {
805                 throw new IllegalArgumentException("Servers do not allow push");
806             }
807             pushToAllowed = allow;
808         }
809 
810         @Override
811         public boolean allowPushTo() {
812             return pushToAllowed;
813         }
814 
815         @Override
816         public int numActiveStreams() {
817             return numActiveStreams;
818         }
819 
820         @Override
821         public int maxActiveStreams() {
822             return maxActiveStreams;
823         }
824 
825         @Override
826         public void maxActiveStreams(int maxActiveStreams) {
827             this.maxActiveStreams = maxActiveStreams;
828             updateMaxStreams();
829         }
830 
831         @Override
832         public int lastStreamCreated() {
833             return nextStreamIdToCreate > 1 ? nextStreamIdToCreate - 2 : 0;
834         }
835 
836         @Override
837         public int lastStreamKnownByPeer() {
838             return lastStreamKnownByPeer;
839         }
840 
841         private void lastStreamKnownByPeer(int lastKnownStream) {
842             lastStreamKnownByPeer = lastKnownStream;
843         }
844 
845         @Override
846         public F flowController() {
847             return flowController;
848         }
849 
850         @Override
851         public void flowController(F flowController) {
852             this.flowController = requireNonNull(flowController, "flowController");
853         }
854 
855         @Override
856         public Endpoint<? extends Http2FlowController> opposite() {
857             return isLocal() ? remoteEndpoint : localEndpoint;
858         }
859 
860         private void updateMaxStreams() {
861             maxStreams = (int) Math.min(MAX_VALUE, (long) maxActiveStreams + maxReservedStreams);
862         }
863 
864         private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
865             assert state != IDLE;
866             if (lastStreamKnownByPeer >= 0 && streamId > lastStreamKnownByPeer) {
867                 throw streamError(streamId, REFUSED_STREAM,
868                         "Cannot create stream %d greater than Last-Stream-ID %d from GOAWAY.",
869                         streamId, lastStreamKnownByPeer);
870             }
871             if (!isValidStreamId(streamId)) {
872                 if (streamId < 0) {
873                     throw new Http2NoMoreStreamIdsException();
874                 }
875                 throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId,
876                         server ? "server" : "client");
877             }
878             // This check must be after all id validated checks, but before the max streams check because it may be
879             // recoverable to some degree for handling frames which can be sent on closed streams.
880             if (streamId < nextStreamIdToCreate) {
881                 throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
882                         streamId, nextStreamIdToCreate);
883             }
884             if (nextStreamIdToCreate <= 0) {
885                 // We exhausted the stream id space that we  can use. Let's signal this back but also signal that
886                 // we still may want to process active streams.
887                 throw new Http2Exception(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.",
888                         Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN);
889             }
890             boolean isReserved = state == RESERVED_LOCAL || state == RESERVED_REMOTE;
891             if (!isReserved && !canOpenStream() || isReserved && numStreams >= maxStreams) {
892                 throw streamError(streamId, REFUSED_STREAM, "Maximum active streams violated for this endpoint: " +
893                         (isReserved ? maxStreams : maxActiveStreams));
894             }
895             if (isClosed()) {
896                 throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed",
897                                       streamId);
898             }
899         }
900 
901         private boolean isLocal() {
902             return this == localEndpoint;
903         }
904     }
905 
906     /**
907      * Allows events which would modify the collection of active streams to be queued while iterating via {@link
908      * #forEachActiveStream(Http2StreamVisitor)}.
909      */
910     interface Event {
911         /**
912          * Trigger the original intention of this event. Expect to modify the active streams list.
913          * <p/>
914          * If a {@link RuntimeException} object is thrown it will be logged and <strong>not propagated</strong>.
915          * Throwing from this method is not supported and is considered a programming error.
916          */
917         void process();
918     }
919 
920     /**
921      * Manages the list of currently active streams.  Queues any {@link Event}s that would modify the list of
922      * active streams in order to prevent modification while iterating.
923      */
924     private final class ActiveStreams {
925         private final List<Listener> listeners;
926         private final Queue<Event> pendingEvents = new ArrayDeque<>(4);
927         private final Set<Http2Stream> streams = new LinkedHashSet<>();
928         private int pendingIterations;
929 
930         ActiveStreams(List<Listener> listeners) {
931             this.listeners = listeners;
932         }
933 
934         public int size() {
935             return streams.size();
936         }
937 
938         public void activate(final DefaultStream stream) {
939             if (allowModifications()) {
940                 addToActiveStreams(stream);
941             } else {
942                 pendingEvents.add(() -> addToActiveStreams(stream));
943             }
944         }
945 
946         public void deactivate(final DefaultStream stream, final Iterator<?> itr) {
947             if (allowModifications() || itr != null) {
948                 removeFromActiveStreams(stream, itr);
949             } else {
950                 pendingEvents.add(() -> removeFromActiveStreams(stream, itr));
951             }
952         }
953 
954         public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
955             incrementPendingIterations();
956             try {
957                 for (Http2Stream stream : streams) {
958                     if (!visitor.visit(stream)) {
959                         return stream;
960                     }
961                 }
962                 return null;
963             } finally {
964                 decrementPendingIterations();
965             }
966         }
967 
968         void addToActiveStreams(DefaultStream stream) {
969             if (streams.add(stream)) {
970                 // Update the number of active streams initiated by the endpoint.
971                 stream.createdBy().numActiveStreams++;
972 
973                 for (int i = 0; i < listeners.size(); i++) {
974                     try {
975                         listeners.get(i).onStreamActive(stream);
976                     } catch (Throwable cause) {
977                         logger.error("Caught Throwable from listener onStreamActive.", cause);
978                     }
979                 }
980             }
981         }
982 
983         void removeFromActiveStreams(DefaultStream stream, Iterator<?> itr) {
984             if (streams.remove(stream)) {
985                 // Update the number of active streams initiated by the endpoint.
986                 stream.createdBy().numActiveStreams--;
987                 notifyClosed(stream);
988             }
989             removeStream(stream, itr);
990         }
991 
992         boolean allowModifications() {
993             return pendingIterations == 0;
994         }
995 
996         void incrementPendingIterations() {
997             ++pendingIterations;
998         }
999 
1000         void decrementPendingIterations() {
1001             --pendingIterations;
1002             if (allowModifications()) {
1003                 for (;;) {
1004                     Event event = pendingEvents.poll();
1005                     if (event == null) {
1006                         break;
1007                     }
1008                     try {
1009                         event.process();
1010                     } catch (Throwable cause) {
1011                         logger.error("Caught Throwable while processing pending ActiveStreams$Event.", cause);
1012                     }
1013                 }
1014             }
1015         }
1016     }
1017 
1018     /**
1019      * Implementation of {@link PropertyKey} that specifies the index position of the property.
1020      */
1021     final class DefaultPropertyKey implements PropertyKey {
1022         final int index;
1023 
1024         DefaultPropertyKey(int index) {
1025             this.index = index;
1026         }
1027 
1028         DefaultPropertyKey verifyConnection(Http2Connection connection) {
1029             if (connection != DefaultHttp2Connection.this) {
1030                 throw new IllegalArgumentException("Using a key that was not created by this connection");
1031             }
1032             return this;
1033         }
1034     }
1035 
1036     /**
1037      * A registry of all stream property keys known by this connection.
1038      */
1039     private final class PropertyKeyRegistry {
1040         /**
1041          * Initial size of 4 because the default configuration currently has 3 listeners
1042          * (local/remote flow controller and {@link StreamByteDistributor}) and we leave room for 1 extra.
1043          * We could be more aggressive but the ArrayList resize will double the size if we are too small.
1044          */
1045         final List<DefaultPropertyKey> keys = new ArrayList<>(4);
1046 
1047         /**
1048          * Registers a new property key.
1049          */
1050         DefaultPropertyKey newKey() {
1051             DefaultPropertyKey key = new DefaultPropertyKey(keys.size());
1052             keys.add(key);
1053             return key;
1054         }
1055 
1056         int size() {
1057             return keys.size();
1058         }
1059     }
1060 }