View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelPromise;
20  import io.netty.handler.codec.http2.Http2Stream.State;
21  import io.netty.util.collection.IntObjectHashMap;
22  import io.netty.util.collection.IntObjectMap;
23  import io.netty.util.collection.IntObjectMap.PrimitiveEntry;
24  import io.netty.util.concurrent.Future;
25  import io.netty.util.concurrent.Promise;
26  import io.netty.util.concurrent.UnaryPromiseNotifier;
27  import io.netty.util.internal.EmptyArrays;
28  import io.netty.util.internal.UnstableApi;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.util.ArrayDeque;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.Iterator;
36  import java.util.LinkedHashSet;
37  import java.util.List;
38  import java.util.Queue;
39  import java.util.Set;
40  
41  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
42  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_RESERVED_STREAMS;
43  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
45  import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
46  import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
47  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
48  import static io.netty.handler.codec.http2.Http2Exception.streamError;
49  import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
50  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
51  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
52  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
53  import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
54  import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
55  import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
56  import static io.netty.util.internal.ObjectUtil.checkNotNull;
57  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
58  import static java.lang.Integer.MAX_VALUE;
59  
60  /**
61   * Simple implementation of {@link Http2Connection}.
62   */
63  @UnstableApi
64  public class DefaultHttp2Connection implements Http2Connection {
65      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
66      // Fields accessed by inner classes
67      final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
68      final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry();
69      final ConnectionStream connectionStream = new ConnectionStream();
70      final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
71      final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
72  
73      /**
74       * We chose a {@link List} over a {@link Set} to avoid allocating an {@link Iterator} objects when iterating over
75       * the listeners.
76       * <p>
77       * Initial size of 4 because the default configuration currently has 3 listeners
78       * (local/remote flow controller and {@link StreamByteDistributor}) and we leave room for 1 extra.
79       * We could be more aggressive but the ArrayList resize will double the size if we are too small.
80       */
81      final List<Listener> listeners = new ArrayList<Listener>(4);
82      final ActiveStreams activeStreams;
83      Promise<Void> closePromise;
84  
85      /**
86       * Creates a new connection with the given settings.
87       * @param server whether or not this end-point is the server-side of the HTTP/2 connection.
88       */
89      public DefaultHttp2Connection(boolean server) {
90          this(server, DEFAULT_MAX_RESERVED_STREAMS);
91      }
92  
93      /**
94       * Creates a new connection with the given settings.
95       * @param server whether or not this end-point is the server-side of the HTTP/2 connection.
96       * @param maxReservedStreams The maximum amount of streams which can exist in the reserved state for each endpoint.
97       */
98      public DefaultHttp2Connection(boolean server, int maxReservedStreams) {
99          activeStreams = new ActiveStreams(listeners);
100         // Reserved streams are excluded from the SETTINGS_MAX_CONCURRENT_STREAMS limit according to [1] and the RFC
101         // doesn't define a way to communicate the limit on reserved streams. We rely upon the peer to send RST_STREAM
102         // in response to any locally enforced limits being exceeded [2].
103         // [1] https://tools.ietf.org/html/rfc7540#section-5.1.2
104         // [2] https://tools.ietf.org/html/rfc7540#section-8.2.2
105         localEndpoint = new DefaultEndpoint<Http2LocalFlowController>(server, server ? MAX_VALUE : maxReservedStreams);
106         remoteEndpoint = new DefaultEndpoint<Http2RemoteFlowController>(!server, maxReservedStreams);
107 
108         // Add the connection stream to the map.
109         streamMap.put(connectionStream.id(), connectionStream);
110     }
111 
112     /**
113      * Determine if {@link #close(Promise)} has been called and no more streams are allowed to be created.
114      */
115     final boolean isClosed() {
116         return closePromise != null;
117     }
118 
119     @Override
120     public Future<Void> close(final Promise<Void> promise) {
121         checkNotNull(promise, "promise");
122         // Since we allow this method to be called multiple times, we must make sure that all the promises are notified
123         // when all streams are removed and the close operation completes.
124         if (closePromise != null) {
125             if (closePromise == promise) {
126                 // Do nothing
127             } else if ((promise instanceof ChannelPromise) && ((ChannelPromise) closePromise).isVoid()) {
128                 closePromise = promise;
129             } else {
130                 closePromise.addListener(new UnaryPromiseNotifier<Void>(promise));
131             }
132         } else {
133             closePromise = promise;
134         }
135         if (isStreamMapEmpty()) {
136             promise.trySuccess(null);
137             return promise;
138         }
139 
140         Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator();
141         // We must take care while iterating the streamMap as to not modify while iterating in case there are other code
142         // paths iterating over the active streams.
143         if (activeStreams.allowModifications()) {
144             activeStreams.incrementPendingIterations();
145             try {
146                 while (itr.hasNext()) {
147                     DefaultStream stream = (DefaultStream) itr.next().value();
148                     if (stream.id() != CONNECTION_STREAM_ID) {
149                         // If modifications of the activeStream map is allowed, then a stream close operation will also
150                         // modify the streamMap. Pass the iterator in so that remove will be called to prevent
151                         // concurrent modification exceptions.
152                         stream.close(itr);
153                     }
154                 }
155             } finally {
156                 activeStreams.decrementPendingIterations();
157             }
158         } else {
159             while (itr.hasNext()) {
160                 Http2Stream stream = itr.next().value();
161                 if (stream.id() != CONNECTION_STREAM_ID) {
162                     // We are not allowed to make modifications, so the close calls will be executed after this
163                     // iteration completes.
164                     stream.close();
165                 }
166             }
167         }
168         return closePromise;
169     }
170 
171     @Override
172     public void addListener(Listener listener) {
173         listeners.add(listener);
174     }
175 
176     @Override
177     public void removeListener(Listener listener) {
178         listeners.remove(listener);
179     }
180 
181     @Override
182     public boolean isServer() {
183         return localEndpoint.isServer();
184     }
185 
186     @Override
187     public Http2Stream connectionStream() {
188         return connectionStream;
189     }
190 
191     @Override
192     public Http2Stream stream(int streamId) {
193         return streamMap.get(streamId);
194     }
195 
196     @Override
197     public boolean streamMayHaveExisted(int streamId) {
198         return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId);
199     }
200 
201     @Override
202     public int numActiveStreams() {
203         return activeStreams.size();
204     }
205 
206     @Override
207     public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
208         return activeStreams.forEachActiveStream(visitor);
209     }
210 
211     @Override
212     public Endpoint<Http2LocalFlowController> local() {
213         return localEndpoint;
214     }
215 
216     @Override
217     public Endpoint<Http2RemoteFlowController> remote() {
218         return remoteEndpoint;
219     }
220 
221     @Override
222     public boolean goAwayReceived() {
223         return localEndpoint.lastStreamKnownByPeer >= 0;
224     }
225 
226     @Override
227     public void goAwayReceived(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
228         if (localEndpoint.lastStreamKnownByPeer() >= 0 && localEndpoint.lastStreamKnownByPeer() < lastKnownStream) {
229             throw connectionError(PROTOCOL_ERROR, "lastStreamId MUST NOT increase. Current value: %d new value: %d",
230                     localEndpoint.lastStreamKnownByPeer(), lastKnownStream);
231         }
232 
233         localEndpoint.lastStreamKnownByPeer(lastKnownStream);
234         for (int i = 0; i < listeners.size(); ++i) {
235             try {
236                 listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData);
237             } catch (Throwable cause) {
238                 logger.error("Caught Throwable from listener onGoAwayReceived.", cause);
239             }
240         }
241 
242         closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, localEndpoint);
243     }
244 
245     @Override
246     public boolean goAwaySent() {
247         return remoteEndpoint.lastStreamKnownByPeer >= 0;
248     }
249 
250     @Override
251     public boolean goAwaySent(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
252         if (remoteEndpoint.lastStreamKnownByPeer() >= 0) {
253             // Protect against re-entrancy. Could happen if writing the frame fails, and error handling
254             // treating this is a connection handler and doing a graceful shutdown...
255             if (lastKnownStream == remoteEndpoint.lastStreamKnownByPeer()) {
256                 return false;
257             }
258             if (lastKnownStream > remoteEndpoint.lastStreamKnownByPeer()) {
259                 throw connectionError(PROTOCOL_ERROR, "Last stream identifier must not increase between " +
260                                 "sending multiple GOAWAY frames (was '%d', is '%d').",
261                         remoteEndpoint.lastStreamKnownByPeer(), lastKnownStream);
262             }
263         }
264 
265         remoteEndpoint.lastStreamKnownByPeer(lastKnownStream);
266         for (int i = 0; i < listeners.size(); ++i) {
267             try {
268                 listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData);
269             } catch (Throwable cause) {
270                 logger.error("Caught Throwable from listener onGoAwaySent.", cause);
271             }
272         }
273 
274         closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, remoteEndpoint);
275         return true;
276     }
277 
278     private void closeStreamsGreaterThanLastKnownStreamId(final int lastKnownStream,
279                                                           final DefaultEndpoint<?> endpoint) throws Http2Exception {
280         forEachActiveStream(new Http2StreamVisitor() {
281             @Override
282             public boolean visit(Http2Stream stream) {
283                 if (stream.id() > lastKnownStream && endpoint.isValidStreamId(stream.id())) {
284                     stream.close();
285                 }
286                 return true;
287             }
288         });
289     }
290 
291     /**
292      * Determine if {@link #streamMap} only contains the connection stream.
293      */
294     private boolean isStreamMapEmpty() {
295         return streamMap.size() == 1;
296     }
297 
298     /**
299      * Remove a stream from the {@link #streamMap}.
300      * @param stream the stream to remove.
301      * @param itr an iterator that may be pointing to the stream during iteration and {@link Iterator#remove()} will be
302      * used if non-{@code null}.
303      */
304     void removeStream(DefaultStream stream, Iterator<?> itr) {
305         final boolean removed;
306         if (itr == null) {
307             removed = streamMap.remove(stream.id()) != null;
308         } else {
309             itr.remove();
310             removed = true;
311         }
312 
313         if (removed) {
314             for (int i = 0; i < listeners.size(); i++) {
315                 try {
316                     listeners.get(i).onStreamRemoved(stream);
317                 } catch (Throwable cause) {
318                     logger.error("Caught Throwable from listener onStreamRemoved.", cause);
319                 }
320             }
321 
322             if (closePromise != null && isStreamMapEmpty()) {
323                 closePromise.trySuccess(null);
324             }
325         }
326     }
327 
328     static State activeState(int streamId, State initialState, boolean isLocal, boolean halfClosed)
329             throws Http2Exception {
330         switch (initialState) {
331         case IDLE:
332             return halfClosed ? isLocal ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN;
333         case RESERVED_LOCAL:
334             return HALF_CLOSED_REMOTE;
335         case RESERVED_REMOTE:
336             return HALF_CLOSED_LOCAL;
337         default:
338             throw streamError(streamId, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: "
339                     + initialState);
340         }
341     }
342 
343     void notifyHalfClosed(Http2Stream stream) {
344         for (int i = 0; i < listeners.size(); i++) {
345             try {
346                 listeners.get(i).onStreamHalfClosed(stream);
347             } catch (Throwable cause) {
348                 logger.error("Caught Throwable from listener onStreamHalfClosed.", cause);
349             }
350         }
351     }
352 
353     void notifyClosed(Http2Stream stream) {
354         for (int i = 0; i < listeners.size(); i++) {
355             try {
356                 listeners.get(i).onStreamClosed(stream);
357             } catch (Throwable cause) {
358                 logger.error("Caught Throwable from listener onStreamClosed.", cause);
359             }
360         }
361     }
362 
363     @Override
364     public PropertyKey newKey() {
365         return propertyKeyRegistry.newKey();
366     }
367 
368     /**
369      * Verifies that the key is valid and returns it as the internal {@link DefaultPropertyKey} type.
370      *
371      * @throws NullPointerException if the key is {@code null}.
372      * @throws ClassCastException if the key is not of type {@link DefaultPropertyKey}.
373      * @throws IllegalArgumentException if the key was not created by this connection.
374      */
375     final DefaultPropertyKey verifyKey(PropertyKey key) {
376         return checkNotNull((DefaultPropertyKey) key, "key").verifyConnection(this);
377     }
378 
379     /**
380      * Simple stream implementation. Streams can be compared to each other by priority.
381      */
382     private class DefaultStream implements Http2Stream {
383         private static final byte META_STATE_SENT_RST = 1;
384         private static final byte META_STATE_SENT_HEADERS = 1 << 1;
385         private static final byte META_STATE_SENT_TRAILERS = 1 << 2;
386         private static final byte META_STATE_SENT_PUSHPROMISE = 1 << 3;
387         private static final byte META_STATE_RECV_HEADERS = 1 << 4;
388         private static final byte META_STATE_RECV_TRAILERS = 1 << 5;
389         private final int id;
390         private final PropertyMap properties = new PropertyMap();
391         private State state;
392         private byte metaState;
393 
394         DefaultStream(int id, State state) {
395             this.id = id;
396             this.state = state;
397         }
398 
399         @Override
400         public final int id() {
401             return id;
402         }
403 
404         @Override
405         public final State state() {
406             return state;
407         }
408 
409         @Override
410         public boolean isResetSent() {
411             return (metaState & META_STATE_SENT_RST) != 0;
412         }
413 
414         @Override
415         public Http2Stream resetSent() {
416             metaState |= META_STATE_SENT_RST;
417             return this;
418         }
419 
420         @Override
421         public Http2Stream headersSent(boolean isInformational) {
422             if (!isInformational) {
423                 metaState |= isHeadersSent() ? META_STATE_SENT_TRAILERS : META_STATE_SENT_HEADERS;
424             }
425             return this;
426         }
427 
428         @Override
429         public boolean isHeadersSent() {
430             return (metaState & META_STATE_SENT_HEADERS) != 0;
431         }
432 
433         @Override
434         public boolean isTrailersSent() {
435             return (metaState & META_STATE_SENT_TRAILERS) != 0;
436         }
437 
438         @Override
439         public Http2Stream headersReceived(boolean isInformational) {
440             if (!isInformational) {
441                 metaState |= isHeadersReceived() ? META_STATE_RECV_TRAILERS : META_STATE_RECV_HEADERS;
442             }
443             return this;
444         }
445 
446         @Override
447         public boolean isHeadersReceived() {
448             return (metaState & META_STATE_RECV_HEADERS) != 0;
449         }
450 
451         @Override
452         public boolean isTrailersReceived() {
453             return (metaState & META_STATE_RECV_TRAILERS) != 0;
454         }
455 
456         @Override
457         public Http2Stream pushPromiseSent() {
458             metaState |= META_STATE_SENT_PUSHPROMISE;
459             return this;
460         }
461 
462         @Override
463         public boolean isPushPromiseSent() {
464             return (metaState & META_STATE_SENT_PUSHPROMISE) != 0;
465         }
466 
467         @Override
468         public final <V> V setProperty(PropertyKey key, V value) {
469             return properties.add(verifyKey(key), value);
470         }
471 
472         @Override
473         public final <V> V getProperty(PropertyKey key) {
474             return properties.get(verifyKey(key));
475         }
476 
477         @Override
478         public final <V> V removeProperty(PropertyKey key) {
479             return properties.remove(verifyKey(key));
480         }
481 
482         @Override
483         public Http2Stream open(boolean halfClosed) throws Http2Exception {
484             state = activeState(id, state, isLocal(), halfClosed);
485             if (!createdBy().canOpenStream()) {
486                 throw connectionError(PROTOCOL_ERROR, "Maximum active streams violated for this endpoint.");
487             }
488 
489             activate();
490             return this;
491         }
492 
493         void activate() {
494             // If the stream is opened in a half-closed state, the headers must have either
495             // been sent if this is a local stream, or received if it is a remote stream.
496             if (state == HALF_CLOSED_LOCAL) {
497                 headersSent(/*isInformational*/ false);
498             } else if (state == HALF_CLOSED_REMOTE) {
499                 headersReceived(/*isInformational*/ false);
500             }
501             activeStreams.activate(this);
502         }
503 
504         Http2Stream close(Iterator<?> itr) {
505             if (state == CLOSED) {
506                 return this;
507             }
508 
509             state = CLOSED;
510 
511             --createdBy().numStreams;
512             activeStreams.deactivate(this, itr);
513             return this;
514         }
515 
516         @Override
517         public Http2Stream close() {
518             return close(null);
519         }
520 
521         @Override
522         public Http2Stream closeLocalSide() {
523             switch (state) {
524             case OPEN:
525                 state = HALF_CLOSED_LOCAL;
526                 notifyHalfClosed(this);
527                 break;
528             case HALF_CLOSED_LOCAL:
529                 break;
530             default:
531                 close();
532                 break;
533             }
534             return this;
535         }
536 
537         @Override
538         public Http2Stream closeRemoteSide() {
539             switch (state) {
540             case OPEN:
541                 state = HALF_CLOSED_REMOTE;
542                 notifyHalfClosed(this);
543                 break;
544             case HALF_CLOSED_REMOTE:
545                 break;
546             default:
547                 close();
548                 break;
549             }
550             return this;
551         }
552 
553         DefaultEndpoint<? extends Http2FlowController> createdBy() {
554             return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
555         }
556 
557         final boolean isLocal() {
558             return localEndpoint.isValidStreamId(id);
559         }
560 
561         /**
562          * Provides the lazy initialization for the {@link DefaultStream} data map.
563          */
564         private class PropertyMap {
565             Object[] values = EmptyArrays.EMPTY_OBJECTS;
566 
567             <V> V add(DefaultPropertyKey key, V value) {
568                 resizeIfNecessary(key.index);
569                 @SuppressWarnings("unchecked")
570                 V prevValue = (V) values[key.index];
571                 values[key.index] = value;
572                 return prevValue;
573             }
574 
575             @SuppressWarnings("unchecked")
576             <V> V get(DefaultPropertyKey key) {
577                 if (key.index >= values.length) {
578                     return null;
579                 }
580                 return (V) values[key.index];
581             }
582 
583             @SuppressWarnings("unchecked")
584             <V> V remove(DefaultPropertyKey key) {
585                 V prevValue = null;
586                 if (key.index < values.length) {
587                     prevValue = (V) values[key.index];
588                     values[key.index] = null;
589                 }
590                 return prevValue;
591             }
592 
593             void resizeIfNecessary(int index) {
594                 if (index >= values.length) {
595                     values = Arrays.copyOf(values, propertyKeyRegistry.size());
596                 }
597             }
598         }
599     }
600 
601     /**
602      * Stream class representing the connection, itself.
603      */
604     private final class ConnectionStream extends DefaultStream {
605         ConnectionStream() {
606             super(CONNECTION_STREAM_ID, IDLE);
607         }
608 
609         @Override
610         public boolean isResetSent() {
611             return false;
612         }
613 
614         @Override
615         DefaultEndpoint<? extends Http2FlowController> createdBy() {
616             return null;
617         }
618 
619         @Override
620         public Http2Stream resetSent() {
621             throw new UnsupportedOperationException();
622         }
623 
624         @Override
625         public Http2Stream open(boolean halfClosed) {
626             throw new UnsupportedOperationException();
627         }
628 
629         @Override
630         public Http2Stream close() {
631             throw new UnsupportedOperationException();
632         }
633 
634         @Override
635         public Http2Stream closeLocalSide() {
636             throw new UnsupportedOperationException();
637         }
638 
639         @Override
640         public Http2Stream closeRemoteSide() {
641             throw new UnsupportedOperationException();
642         }
643 
644         @Override
645         public Http2Stream headersSent(boolean isInformational) {
646             throw new UnsupportedOperationException();
647         }
648 
649         @Override
650         public boolean isHeadersSent() {
651             throw new UnsupportedOperationException();
652         }
653 
654         @Override
655         public Http2Stream pushPromiseSent() {
656             throw new UnsupportedOperationException();
657         }
658 
659         @Override
660         public boolean isPushPromiseSent() {
661             throw new UnsupportedOperationException();
662         }
663     }
664 
665     /**
666      * Simple endpoint implementation.
667      */
668     private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
669         private final boolean server;
670         /**
671          * The minimum stream ID allowed when creating the next stream. This only applies at the time the stream is
672          * created. If the ID of the stream being created is less than this value, stream creation will fail. Upon
673          * successful creation of a stream, this value is incremented to the next valid stream ID.
674          */
675         private int nextStreamIdToCreate;
676         /**
677          * Used for reservation of stream IDs. Stream IDs can be reserved in advance by applications before the streams
678          * are actually created.  For example, applications may choose to buffer stream creation attempts as a way of
679          * working around {@code SETTINGS_MAX_CONCURRENT_STREAMS}, in which case they will reserve stream IDs for each
680          * buffered stream.
681          */
682         private int nextReservationStreamId;
683         private int lastStreamKnownByPeer = -1;
684         private boolean pushToAllowed = true;
685         private F flowController;
686         private int maxStreams;
687         private int maxActiveStreams;
688         private final int maxReservedStreams;
689         // Fields accessed by inner classes
690         int numActiveStreams;
691         int numStreams;
692 
693         DefaultEndpoint(boolean server, int maxReservedStreams) {
694             this.server = server;
695 
696             // Determine the starting stream ID for this endpoint. Client-initiated streams
697             // are odd and server-initiated streams are even. Zero is reserved for the
698             // connection. Stream 1 is reserved client-initiated stream for responding to an
699             // upgrade from HTTP 1.1.
700             if (server) {
701                 nextStreamIdToCreate = 2;
702                 nextReservationStreamId = 0;
703             } else {
704                 nextStreamIdToCreate = 1;
705                 // For manually created client-side streams, 1 is reserved for HTTP upgrade, so start at 3.
706                 nextReservationStreamId = 1;
707             }
708 
709             // Push is disallowed by default for servers and allowed for clients.
710             pushToAllowed = !server;
711             maxActiveStreams = MAX_VALUE;
712             this.maxReservedStreams = checkPositiveOrZero(maxReservedStreams, "maxReservedStreams");
713             updateMaxStreams();
714         }
715 
716         @Override
717         public int incrementAndGetNextStreamId() {
718             return nextReservationStreamId >= 0 ? nextReservationStreamId += 2 : nextReservationStreamId;
719         }
720 
721         private void incrementExpectedStreamId(int streamId) {
722             if (streamId > nextReservationStreamId && nextReservationStreamId >= 0) {
723                 nextReservationStreamId = streamId;
724             }
725             nextStreamIdToCreate = streamId + 2;
726             ++numStreams;
727         }
728 
729         @Override
730         public boolean isValidStreamId(int streamId) {
731             return streamId > 0 && server == ((streamId & 1) == 0);
732         }
733 
734         @Override
735         public boolean mayHaveCreatedStream(int streamId) {
736             return isValidStreamId(streamId) && streamId <= lastStreamCreated();
737         }
738 
739         @Override
740         public boolean canOpenStream() {
741             return numActiveStreams < maxActiveStreams;
742         }
743 
744         @Override
745         public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
746             State state = activeState(streamId, IDLE, isLocal(), halfClosed);
747 
748             checkNewStreamAllowed(streamId, state);
749 
750             // Create and initialize the stream.
751             DefaultStream stream = new DefaultStream(streamId, state);
752 
753             incrementExpectedStreamId(streamId);
754 
755             addStream(stream);
756 
757             stream.activate();
758             return stream;
759         }
760 
761         @Override
762         public boolean created(Http2Stream stream) {
763             return stream instanceof DefaultStream && ((DefaultStream) stream).createdBy() == this;
764         }
765 
766         @Override
767         public boolean isServer() {
768             return server;
769         }
770 
771         @Override
772         public DefaultStream reservePushStream(int streamId, Http2Stream parent) throws Http2Exception {
773             if (parent == null) {
774                 throw connectionError(PROTOCOL_ERROR, "Parent stream missing");
775             }
776             if (isLocal() ? !parent.state().localSideOpen() : !parent.state().remoteSideOpen()) {
777                 throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id());
778             }
779             if (!opposite().allowPushTo()) {
780                 throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint");
781             }
782             State state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE;
783             checkNewStreamAllowed(streamId, state);
784 
785             // Create and initialize the stream.
786             DefaultStream stream = new DefaultStream(streamId, state);
787 
788             incrementExpectedStreamId(streamId);
789 
790             // Register the stream.
791             addStream(stream);
792             return stream;
793         }
794 
795         private void addStream(DefaultStream stream) {
796             // Add the stream to the map and priority tree.
797             streamMap.put(stream.id(), stream);
798 
799             // Notify the listeners of the event.
800             for (int i = 0; i < listeners.size(); i++) {
801                 try {
802                     listeners.get(i).onStreamAdded(stream);
803                 } catch (Throwable cause) {
804                     logger.error("Caught Throwable from listener onStreamAdded.", cause);
805                 }
806             }
807         }
808 
809         @Override
810         public void allowPushTo(boolean allow) {
811             if (allow && server) {
812                 throw new IllegalArgumentException("Servers do not allow push");
813             }
814             pushToAllowed = allow;
815         }
816 
817         @Override
818         public boolean allowPushTo() {
819             return pushToAllowed;
820         }
821 
822         @Override
823         public int numActiveStreams() {
824             return numActiveStreams;
825         }
826 
827         @Override
828         public int maxActiveStreams() {
829             return maxActiveStreams;
830         }
831 
832         @Override
833         public void maxActiveStreams(int maxActiveStreams) {
834             this.maxActiveStreams = maxActiveStreams;
835             updateMaxStreams();
836         }
837 
838         @Override
839         public int lastStreamCreated() {
840             return nextStreamIdToCreate > 1 ? nextStreamIdToCreate - 2 : 0;
841         }
842 
843         @Override
844         public int lastStreamKnownByPeer() {
845             return lastStreamKnownByPeer;
846         }
847 
848         private void lastStreamKnownByPeer(int lastKnownStream) {
849             this.lastStreamKnownByPeer = lastKnownStream;
850         }
851 
852         @Override
853         public F flowController() {
854             return flowController;
855         }
856 
857         @Override
858         public void flowController(F flowController) {
859             this.flowController = checkNotNull(flowController, "flowController");
860         }
861 
862         @Override
863         public Endpoint<? extends Http2FlowController> opposite() {
864             return isLocal() ? remoteEndpoint : localEndpoint;
865         }
866 
867         private void updateMaxStreams() {
868             maxStreams = (int) Math.min(MAX_VALUE, (long) maxActiveStreams + maxReservedStreams);
869         }
870 
871         private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
872             assert state != IDLE;
873             if (lastStreamKnownByPeer >= 0 && streamId > lastStreamKnownByPeer) {
874                 throw streamError(streamId, REFUSED_STREAM,
875                         "Cannot create stream %d greater than Last-Stream-ID %d from GOAWAY.",
876                         streamId, lastStreamKnownByPeer);
877             }
878             if (!isValidStreamId(streamId)) {
879                 if (streamId < 0) {
880                     throw new Http2NoMoreStreamIdsException();
881                 }
882                 throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId,
883                         server ? "server" : "client");
884             }
885             // This check must be after all id validated checks, but before the max streams check because it may be
886             // recoverable to some degree for handling frames which can be sent on closed streams.
887             if (streamId < nextStreamIdToCreate) {
888                 throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
889                         streamId, nextStreamIdToCreate);
890             }
891             if (nextStreamIdToCreate <= 0) {
892                 throw connectionError(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.");
893             }
894             boolean isReserved = state == RESERVED_LOCAL || state == RESERVED_REMOTE;
895             if (!isReserved && !canOpenStream() || isReserved && numStreams >= maxStreams) {
896                 throw streamError(streamId, REFUSED_STREAM, "Maximum active streams violated for this endpoint.");
897             }
898             if (isClosed()) {
899                 throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed",
900                                       streamId);
901             }
902         }
903 
904         private boolean isLocal() {
905             return this == localEndpoint;
906         }
907     }
908 
909     /**
910      * Allows events which would modify the collection of active streams to be queued while iterating via {@link
911      * #forEachActiveStream(Http2StreamVisitor)}.
912      */
913     interface Event {
914         /**
915          * Trigger the original intention of this event. Expect to modify the active streams list.
916          * <p/>
917          * If a {@link RuntimeException} object is thrown it will be logged and <strong>not propagated</strong>.
918          * Throwing from this method is not supported and is considered a programming error.
919          */
920         void process();
921     }
922 
923     /**
924      * Manages the list of currently active streams.  Queues any {@link Event}s that would modify the list of
925      * active streams in order to prevent modification while iterating.
926      */
927     private final class ActiveStreams {
928         private final List<Listener> listeners;
929         private final Queue<Event> pendingEvents = new ArrayDeque<Event>(4);
930         private final Set<Http2Stream> streams = new LinkedHashSet<Http2Stream>();
931         private int pendingIterations;
932 
933         public ActiveStreams(List<Listener> listeners) {
934             this.listeners = listeners;
935         }
936 
937         public int size() {
938             return streams.size();
939         }
940 
941         public void activate(final DefaultStream stream) {
942             if (allowModifications()) {
943                 addToActiveStreams(stream);
944             } else {
945                 pendingEvents.add(new Event() {
946                     @Override
947                     public void process() {
948                         addToActiveStreams(stream);
949                     }
950                 });
951             }
952         }
953 
954         public void deactivate(final DefaultStream stream, final Iterator<?> itr) {
955             if (allowModifications() || itr != null) {
956                 removeFromActiveStreams(stream, itr);
957             } else {
958                 pendingEvents.add(new Event() {
959                     @Override
960                     public void process() {
961                         removeFromActiveStreams(stream, itr);
962                     }
963                 });
964             }
965         }
966 
967         public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
968             incrementPendingIterations();
969             try {
970                 for (Http2Stream stream : streams) {
971                     if (!visitor.visit(stream)) {
972                         return stream;
973                     }
974                 }
975                 return null;
976             } finally {
977                 decrementPendingIterations();
978             }
979         }
980 
981         void addToActiveStreams(DefaultStream stream) {
982             if (streams.add(stream)) {
983                 // Update the number of active streams initiated by the endpoint.
984                 stream.createdBy().numActiveStreams++;
985 
986                 for (int i = 0; i < listeners.size(); i++) {
987                     try {
988                         listeners.get(i).onStreamActive(stream);
989                     } catch (Throwable cause) {
990                         logger.error("Caught Throwable from listener onStreamActive.", cause);
991                     }
992                 }
993             }
994         }
995 
996         void removeFromActiveStreams(DefaultStream stream, Iterator<?> itr) {
997             if (streams.remove(stream)) {
998                 // Update the number of active streams initiated by the endpoint.
999                 stream.createdBy().numActiveStreams--;
1000                 notifyClosed(stream);
1001             }
1002             removeStream(stream, itr);
1003         }
1004 
1005         boolean allowModifications() {
1006             return pendingIterations == 0;
1007         }
1008 
1009         void incrementPendingIterations() {
1010             ++pendingIterations;
1011         }
1012 
1013         void decrementPendingIterations() {
1014             --pendingIterations;
1015             if (allowModifications()) {
1016                 for (;;) {
1017                     Event event = pendingEvents.poll();
1018                     if (event == null) {
1019                         break;
1020                     }
1021                     try {
1022                         event.process();
1023                     } catch (Throwable cause) {
1024                         logger.error("Caught Throwable while processing pending ActiveStreams$Event.", cause);
1025                     }
1026                 }
1027             }
1028         }
1029     }
1030 
1031     /**
1032      * Implementation of {@link PropertyKey} that specifies the index position of the property.
1033      */
1034     final class DefaultPropertyKey implements PropertyKey {
1035         final int index;
1036 
1037         DefaultPropertyKey(int index) {
1038             this.index = index;
1039         }
1040 
1041         DefaultPropertyKey verifyConnection(Http2Connection connection) {
1042             if (connection != DefaultHttp2Connection.this) {
1043                 throw new IllegalArgumentException("Using a key that was not created by this connection");
1044             }
1045             return this;
1046         }
1047     }
1048 
1049     /**
1050      * A registry of all stream property keys known by this connection.
1051      */
1052     private final class PropertyKeyRegistry {
1053         /**
1054          * Initial size of 4 because the default configuration currently has 3 listeners
1055          * (local/remote flow controller and {@link StreamByteDistributor}) and we leave room for 1 extra.
1056          * We could be more aggressive but the ArrayList resize will double the size if we are too small.
1057          */
1058         final List<DefaultPropertyKey> keys = new ArrayList<DefaultPropertyKey>(4);
1059 
1060         /**
1061          * Registers a new property key.
1062          */
1063         DefaultPropertyKey newKey() {
1064             DefaultPropertyKey key = new DefaultPropertyKey(keys.size());
1065             keys.add(key);
1066             return key;
1067         }
1068 
1069         int size() {
1070             return keys.size();
1071         }
1072     }
1073 }