View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelPromise;
21  import io.netty.handler.codec.http2.Http2Stream.State;
22  import io.netty.util.collection.IntObjectHashMap;
23  import io.netty.util.collection.IntObjectMap;
24  import io.netty.util.collection.IntObjectMap.PrimitiveEntry;
25  import io.netty.util.concurrent.Future;
26  import io.netty.util.concurrent.Promise;
27  import io.netty.util.concurrent.PromiseNotifier;
28  import io.netty.util.internal.EmptyArrays;
29  import io.netty.util.internal.UnstableApi;
30  import io.netty.util.internal.logging.InternalLogger;
31  import io.netty.util.internal.logging.InternalLoggerFactory;
32  
33  import java.util.ArrayDeque;
34  import java.util.ArrayList;
35  import java.util.Arrays;
36  import java.util.Iterator;
37  import java.util.LinkedHashSet;
38  import java.util.List;
39  import java.util.Queue;
40  import java.util.Set;
41  
42  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
43  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_RESERVED_STREAMS;
44  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
45  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46  import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
47  import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
48  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
49  import static io.netty.handler.codec.http2.Http2Exception.streamError;
50  import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
51  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
52  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
53  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
54  import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
55  import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
56  import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
57  import static io.netty.util.internal.ObjectUtil.checkNotNull;
58  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
59  import static java.lang.Integer.MAX_VALUE;
60  
61  /**
62   * Simple implementation of {@link Http2Connection}.
63   */
64  @UnstableApi
65  public class DefaultHttp2Connection implements Http2Connection {
66      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
67      // Fields accessed by inner classes
68      final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
69      final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry();
70      final ConnectionStream connectionStream = new ConnectionStream();
71      final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
72      final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
73  
74      /**
75       * We chose a {@link List} over a {@link Set} to avoid allocating an {@link Iterator} objects when iterating over
76       * the listeners.
77       * <p>
78       * Initial size of 4 because the default configuration currently has 3 listeners
79       * (local/remote flow controller and {@link StreamByteDistributor}) and we leave room for 1 extra.
80       * We could be more aggressive but the ArrayList resize will double the size if we are too small.
81       */
82      final List<Listener> listeners = new ArrayList<Listener>(4);
83      final ActiveStreams activeStreams;
84      Promise<Void> closePromise;
85  
86      /**
87       * Creates a new connection with the given settings.
88       * @param server whether or not this end-point is the server-side of the HTTP/2 connection.
89       */
90      public DefaultHttp2Connection(boolean server) {
91          this(server, DEFAULT_MAX_RESERVED_STREAMS);
92      }
93  
94      /**
95       * Creates a new connection with the given settings.
96       * @param server whether or not this end-point is the server-side of the HTTP/2 connection.
97       * @param maxReservedStreams The maximum amount of streams which can exist in the reserved state for each endpoint.
98       */
99      public DefaultHttp2Connection(boolean server, int maxReservedStreams) {
100         activeStreams = new ActiveStreams(listeners);
101         // Reserved streams are excluded from the SETTINGS_MAX_CONCURRENT_STREAMS limit according to [1] and the RFC
102         // doesn't define a way to communicate the limit on reserved streams. We rely upon the peer to send RST_STREAM
103         // in response to any locally enforced limits being exceeded [2].
104         // [1] https://tools.ietf.org/html/rfc7540#section-5.1.2
105         // [2] https://tools.ietf.org/html/rfc7540#section-8.2.2
106         localEndpoint = new DefaultEndpoint<Http2LocalFlowController>(server, server ? MAX_VALUE : maxReservedStreams);
107         remoteEndpoint = new DefaultEndpoint<Http2RemoteFlowController>(!server, maxReservedStreams);
108 
109         // Add the connection stream to the map.
110         streamMap.put(connectionStream.id(), connectionStream);
111     }
112 
113     /**
114      * Determine if {@link #close(Promise)} has been called and no more streams are allowed to be created.
115      */
116     final boolean isClosed() {
117         return closePromise != null;
118     }
119 
120     @Override
121     public Future<Void> close(final Promise<Void> promise) {
122         checkNotNull(promise, "promise");
123         // Since we allow this method to be called multiple times, we must make sure that all the promises are notified
124         // when all streams are removed and the close operation completes.
125         if (closePromise != null) {
126             if (closePromise == promise) {
127                 // Do nothing
128             } else if (promise instanceof ChannelPromise && ((ChannelFuture) closePromise).isVoid()) {
129                 closePromise = promise;
130             } else {
131                 PromiseNotifier.cascade(closePromise, promise);
132             }
133         } else {
134             closePromise = promise;
135         }
136         if (isStreamMapEmpty()) {
137             promise.trySuccess(null);
138             return promise;
139         }
140 
141         Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator();
142         // We must take care while iterating the streamMap as to not modify while iterating in case there are other code
143         // paths iterating over the active streams.
144         if (activeStreams.allowModifications()) {
145             activeStreams.incrementPendingIterations();
146             try {
147                 while (itr.hasNext()) {
148                     DefaultStream stream = (DefaultStream) itr.next().value();
149                     if (stream.id() != CONNECTION_STREAM_ID) {
150                         // If modifications of the activeStream map is allowed, then a stream close operation will also
151                         // modify the streamMap. Pass the iterator in so that remove will be called to prevent
152                         // concurrent modification exceptions.
153                         stream.close(itr);
154                     }
155                 }
156             } finally {
157                 activeStreams.decrementPendingIterations();
158             }
159         } else {
160             while (itr.hasNext()) {
161                 Http2Stream stream = itr.next().value();
162                 if (stream.id() != CONNECTION_STREAM_ID) {
163                     // We are not allowed to make modifications, so the close calls will be executed after this
164                     // iteration completes.
165                     stream.close();
166                 }
167             }
168         }
169         return closePromise;
170     }
171 
172     @Override
173     public void addListener(Listener listener) {
174         listeners.add(listener);
175     }
176 
177     @Override
178     public void removeListener(Listener listener) {
179         listeners.remove(listener);
180     }
181 
182     @Override
183     public boolean isServer() {
184         return localEndpoint.isServer();
185     }
186 
187     @Override
188     public Http2Stream connectionStream() {
189         return connectionStream;
190     }
191 
192     @Override
193     public Http2Stream stream(int streamId) {
194         return streamMap.get(streamId);
195     }
196 
197     @Override
198     public boolean streamMayHaveExisted(int streamId) {
199         return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId);
200     }
201 
202     @Override
203     public int numActiveStreams() {
204         return activeStreams.size();
205     }
206 
207     @Override
208     public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
209         return activeStreams.forEachActiveStream(visitor);
210     }
211 
212     @Override
213     public Endpoint<Http2LocalFlowController> local() {
214         return localEndpoint;
215     }
216 
217     @Override
218     public Endpoint<Http2RemoteFlowController> remote() {
219         return remoteEndpoint;
220     }
221 
222     @Override
223     public boolean goAwayReceived() {
224         return localEndpoint.lastStreamKnownByPeer >= 0;
225     }
226 
227     @Override
228     public void goAwayReceived(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
229         if (localEndpoint.lastStreamKnownByPeer() >= 0 && localEndpoint.lastStreamKnownByPeer() < lastKnownStream) {
230             throw connectionError(PROTOCOL_ERROR, "lastStreamId MUST NOT increase. Current value: %d new value: %d",
231                     localEndpoint.lastStreamKnownByPeer(), lastKnownStream);
232         }
233 
234         localEndpoint.lastStreamKnownByPeer(lastKnownStream);
235         for (int i = 0; i < listeners.size(); ++i) {
236             try {
237                 listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData);
238             } catch (Throwable cause) {
239                 logger.error("Caught Throwable from listener onGoAwayReceived.", cause);
240             }
241         }
242 
243         closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, localEndpoint);
244     }
245 
246     @Override
247     public boolean goAwaySent() {
248         return remoteEndpoint.lastStreamKnownByPeer >= 0;
249     }
250 
251     @Override
252     public boolean goAwaySent(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
253         if (remoteEndpoint.lastStreamKnownByPeer() >= 0) {
254             // Protect against re-entrancy. Could happen if writing the frame fails, and error handling
255             // treating this is a connection handler and doing a graceful shutdown...
256             if (lastKnownStream == remoteEndpoint.lastStreamKnownByPeer()) {
257                 return false;
258             }
259             if (lastKnownStream > remoteEndpoint.lastStreamKnownByPeer()) {
260                 throw connectionError(PROTOCOL_ERROR, "Last stream identifier must not increase between " +
261                                 "sending multiple GOAWAY frames (was '%d', is '%d').",
262                         remoteEndpoint.lastStreamKnownByPeer(), lastKnownStream);
263             }
264         }
265 
266         remoteEndpoint.lastStreamKnownByPeer(lastKnownStream);
267         for (int i = 0; i < listeners.size(); ++i) {
268             try {
269                 listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData);
270             } catch (Throwable cause) {
271                 logger.error("Caught Throwable from listener onGoAwaySent.", cause);
272             }
273         }
274 
275         closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, remoteEndpoint);
276         return true;
277     }
278 
279     private void closeStreamsGreaterThanLastKnownStreamId(final int lastKnownStream,
280                                                           final DefaultEndpoint<?> endpoint) throws Http2Exception {
281         forEachActiveStream(new Http2StreamVisitor() {
282             @Override
283             public boolean visit(Http2Stream stream) {
284                 if (stream.id() > lastKnownStream && endpoint.isValidStreamId(stream.id())) {
285                     stream.close();
286                 }
287                 return true;
288             }
289         });
290     }
291 
292     /**
293      * Determine if {@link #streamMap} only contains the connection stream.
294      */
295     private boolean isStreamMapEmpty() {
296         return streamMap.size() == 1;
297     }
298 
299     /**
300      * Remove a stream from the {@link #streamMap}.
301      * @param stream the stream to remove.
302      * @param itr an iterator that may be pointing to the stream during iteration and {@link Iterator#remove()} will be
303      * used if non-{@code null}.
304      */
305     void removeStream(DefaultStream stream, Iterator<?> itr) {
306         final boolean removed;
307         if (itr == null) {
308             removed = streamMap.remove(stream.id()) != null;
309         } else {
310             itr.remove();
311             removed = true;
312         }
313 
314         if (removed) {
315             for (int i = 0; i < listeners.size(); i++) {
316                 try {
317                     listeners.get(i).onStreamRemoved(stream);
318                 } catch (Throwable cause) {
319                     logger.error("Caught Throwable from listener onStreamRemoved.", cause);
320                 }
321             }
322 
323             if (closePromise != null && isStreamMapEmpty()) {
324                 closePromise.trySuccess(null);
325             }
326         }
327     }
328 
329     static State activeState(int streamId, State initialState, boolean isLocal, boolean halfClosed)
330             throws Http2Exception {
331         switch (initialState) {
332         case IDLE:
333             return halfClosed ? isLocal ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN;
334         case RESERVED_LOCAL:
335             return HALF_CLOSED_REMOTE;
336         case RESERVED_REMOTE:
337             return HALF_CLOSED_LOCAL;
338         default:
339             throw streamError(streamId, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: "
340                     + initialState);
341         }
342     }
343 
344     void notifyHalfClosed(Http2Stream stream) {
345         for (int i = 0; i < listeners.size(); i++) {
346             try {
347                 listeners.get(i).onStreamHalfClosed(stream);
348             } catch (Throwable cause) {
349                 logger.error("Caught Throwable from listener onStreamHalfClosed.", cause);
350             }
351         }
352     }
353 
354     void notifyClosed(Http2Stream stream) {
355         for (int i = 0; i < listeners.size(); i++) {
356             try {
357                 listeners.get(i).onStreamClosed(stream);
358             } catch (Throwable cause) {
359                 logger.error("Caught Throwable from listener onStreamClosed.", cause);
360             }
361         }
362     }
363 
364     @Override
365     public PropertyKey newKey() {
366         return propertyKeyRegistry.newKey();
367     }
368 
369     /**
370      * Verifies that the key is valid and returns it as the internal {@link DefaultPropertyKey} type.
371      *
372      * @throws NullPointerException if the key is {@code null}.
373      * @throws ClassCastException if the key is not of type {@link DefaultPropertyKey}.
374      * @throws IllegalArgumentException if the key was not created by this connection.
375      */
376     final DefaultPropertyKey verifyKey(PropertyKey key) {
377         return checkNotNull((DefaultPropertyKey) key, "key").verifyConnection(this);
378     }
379 
380     /**
381      * Simple stream implementation. Streams can be compared to each other by priority.
382      */
383     private class DefaultStream implements Http2Stream {
384         private static final byte META_STATE_SENT_RST = 1;
385         private static final byte META_STATE_SENT_HEADERS = 1 << 1;
386         private static final byte META_STATE_SENT_TRAILERS = 1 << 2;
387         private static final byte META_STATE_SENT_PUSHPROMISE = 1 << 3;
388         private static final byte META_STATE_RECV_HEADERS = 1 << 4;
389         private static final byte META_STATE_RECV_TRAILERS = 1 << 5;
390         private final int id;
391         private final long identity;
392         private final PropertyMap properties = new PropertyMap();
393         private State state;
394         private byte metaState;
395 
396         DefaultStream(long identity, int id, State state) {
397             this.identity = identity;
398             this.id = id;
399             this.state = state;
400         }
401 
402         @Override
403         public final int id() {
404             return id;
405         }
406 
407         @Override
408         public final State state() {
409             return state;
410         }
411 
412         @Override
413         public boolean isResetSent() {
414             return (metaState & META_STATE_SENT_RST) != 0;
415         }
416 
417         @Override
418         public Http2Stream resetSent() {
419             metaState |= META_STATE_SENT_RST;
420             return this;
421         }
422 
423         @Override
424         public Http2Stream headersSent(boolean isInformational) {
425             if (!isInformational) {
426                 metaState |= isHeadersSent() ? META_STATE_SENT_TRAILERS : META_STATE_SENT_HEADERS;
427             }
428             return this;
429         }
430 
431         @Override
432         public boolean isHeadersSent() {
433             return (metaState & META_STATE_SENT_HEADERS) != 0;
434         }
435 
436         @Override
437         public boolean isTrailersSent() {
438             return (metaState & META_STATE_SENT_TRAILERS) != 0;
439         }
440 
441         @Override
442         public Http2Stream headersReceived(boolean isInformational) {
443             if (!isInformational) {
444                 metaState |= isHeadersReceived() ? META_STATE_RECV_TRAILERS : META_STATE_RECV_HEADERS;
445             }
446             return this;
447         }
448 
449         @Override
450         public boolean isHeadersReceived() {
451             return (metaState & META_STATE_RECV_HEADERS) != 0;
452         }
453 
454         @Override
455         public boolean isTrailersReceived() {
456             return (metaState & META_STATE_RECV_TRAILERS) != 0;
457         }
458 
459         @Override
460         public Http2Stream pushPromiseSent() {
461             metaState |= META_STATE_SENT_PUSHPROMISE;
462             return this;
463         }
464 
465         @Override
466         public boolean isPushPromiseSent() {
467             return (metaState & META_STATE_SENT_PUSHPROMISE) != 0;
468         }
469 
470         @Override
471         public final <V> V setProperty(PropertyKey key, V value) {
472             return properties.add(verifyKey(key), value);
473         }
474 
475         @Override
476         public final <V> V getProperty(PropertyKey key) {
477             return properties.get(verifyKey(key));
478         }
479 
480         @Override
481         public final <V> V removeProperty(PropertyKey key) {
482             return properties.remove(verifyKey(key));
483         }
484 
485         @Override
486         public Http2Stream open(boolean halfClosed) throws Http2Exception {
487             state = activeState(id, state, isLocal(), halfClosed);
488             final DefaultEndpoint<? extends Http2FlowController> endpoint = createdBy();
489             if (!endpoint.canOpenStream()) {
490                 throw connectionError(PROTOCOL_ERROR, "Maximum active streams violated for this endpoint: " +
491                         endpoint.maxActiveStreams());
492             }
493 
494             activate();
495             return this;
496         }
497 
498         void activate() {
499             // If the stream is opened in a half-closed state, the headers must have either
500             // been sent if this is a local stream, or received if it is a remote stream.
501             if (state == HALF_CLOSED_LOCAL) {
502                 headersSent(/*isInformational*/ false);
503             } else if (state == HALF_CLOSED_REMOTE) {
504                 headersReceived(/*isInformational*/ false);
505             }
506             activeStreams.activate(this);
507         }
508 
509         Http2Stream close(Iterator<?> itr) {
510             if (state == CLOSED) {
511                 return this;
512             }
513 
514             state = CLOSED;
515 
516             --createdBy().numStreams;
517             activeStreams.deactivate(this, itr);
518             return this;
519         }
520 
521         @Override
522         public Http2Stream close() {
523             return close(null);
524         }
525 
526         @Override
527         public Http2Stream closeLocalSide() {
528             switch (state) {
529             case OPEN:
530                 state = HALF_CLOSED_LOCAL;
531                 notifyHalfClosed(this);
532                 break;
533             case HALF_CLOSED_LOCAL:
534                 break;
535             default:
536                 close();
537                 break;
538             }
539             return this;
540         }
541 
542         @Override
543         public Http2Stream closeRemoteSide() {
544             switch (state) {
545             case OPEN:
546                 state = HALF_CLOSED_REMOTE;
547                 notifyHalfClosed(this);
548                 break;
549             case HALF_CLOSED_REMOTE:
550                 break;
551             default:
552                 close();
553                 break;
554             }
555             return this;
556         }
557 
558         DefaultEndpoint<? extends Http2FlowController> createdBy() {
559             return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
560         }
561 
562         final boolean isLocal() {
563             return localEndpoint.isValidStreamId(id);
564         }
565 
566         /**
567          * Provides the lazy initialization for the {@link DefaultStream} data map.
568          */
569         private class PropertyMap {
570             Object[] values = EmptyArrays.EMPTY_OBJECTS;
571 
572             <V> V add(DefaultPropertyKey key, V value) {
573                 resizeIfNecessary(key.index);
574                 @SuppressWarnings("unchecked")
575                 V prevValue = (V) values[key.index];
576                 values[key.index] = value;
577                 return prevValue;
578             }
579 
580             @SuppressWarnings("unchecked")
581             <V> V get(DefaultPropertyKey key) {
582                 if (key.index >= values.length) {
583                     return null;
584                 }
585                 return (V) values[key.index];
586             }
587 
588             @SuppressWarnings("unchecked")
589             <V> V remove(DefaultPropertyKey key) {
590                 V prevValue = null;
591                 if (key.index < values.length) {
592                     prevValue = (V) values[key.index];
593                     values[key.index] = null;
594                 }
595                 return prevValue;
596             }
597 
598             void resizeIfNecessary(int index) {
599                 if (index >= values.length) {
600                     values = Arrays.copyOf(values, propertyKeyRegistry.size());
601                 }
602             }
603         }
604 
605         @Override
606         public boolean equals(final Object obj) {
607             return super.equals(obj);
608         }
609 
610         @Override
611         public int hashCode() {
612             long value = identity;
613             if (value == 0) {
614                 return System.identityHashCode(this);
615             }
616             return (int) (value ^ (value >>> 32));
617         }
618     }
619 
620     /**
621      * Stream class representing the connection, itself.
622      */
623     private final class ConnectionStream extends DefaultStream {
624         ConnectionStream() {
625             super(0, CONNECTION_STREAM_ID, IDLE);
626         }
627 
628         @Override
629         public boolean isResetSent() {
630             return false;
631         }
632 
633         @Override
634         DefaultEndpoint<? extends Http2FlowController> createdBy() {
635             return null;
636         }
637 
638         @Override
639         public Http2Stream resetSent() {
640             throw new UnsupportedOperationException();
641         }
642 
643         @Override
644         public Http2Stream open(boolean halfClosed) {
645             throw new UnsupportedOperationException();
646         }
647 
648         @Override
649         public Http2Stream close() {
650             throw new UnsupportedOperationException();
651         }
652 
653         @Override
654         public Http2Stream closeLocalSide() {
655             throw new UnsupportedOperationException();
656         }
657 
658         @Override
659         public Http2Stream closeRemoteSide() {
660             throw new UnsupportedOperationException();
661         }
662 
663         @Override
664         public Http2Stream headersSent(boolean isInformational) {
665             throw new UnsupportedOperationException();
666         }
667 
668         @Override
669         public boolean isHeadersSent() {
670             throw new UnsupportedOperationException();
671         }
672 
673         @Override
674         public Http2Stream pushPromiseSent() {
675             throw new UnsupportedOperationException();
676         }
677 
678         @Override
679         public boolean isPushPromiseSent() {
680             throw new UnsupportedOperationException();
681         }
682     }
683 
684     /**
685      * Simple endpoint implementation.
686      */
687     private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
688         private final boolean server;
689         /**
690          * This is an always increasing sequence number used to hash {@link DefaultStream} instances.
691          */
692         private long lastCreatedStreamIdentity;
693         /**
694          * The minimum stream ID allowed when creating the next stream. This only applies at the time the stream is
695          * created. If the ID of the stream being created is less than this value, stream creation will fail. Upon
696          * successful creation of a stream, this value is incremented to the next valid stream ID.
697          */
698         private int nextStreamIdToCreate;
699         /**
700          * Used for reservation of stream IDs. Stream IDs can be reserved in advance by applications before the streams
701          * are actually created.  For example, applications may choose to buffer stream creation attempts as a way of
702          * working around {@code SETTINGS_MAX_CONCURRENT_STREAMS}, in which case they will reserve stream IDs for each
703          * buffered stream.
704          */
705         private int nextReservationStreamId;
706         private int lastStreamKnownByPeer = -1;
707         private boolean pushToAllowed;
708         private F flowController;
709         private int maxStreams;
710         private int maxActiveStreams;
711         private final int maxReservedStreams;
712         // Fields accessed by inner classes
713         int numActiveStreams;
714         int numStreams;
715 
716         DefaultEndpoint(boolean server, int maxReservedStreams) {
717             this.lastCreatedStreamIdentity = 0;
718             this.server = server;
719 
720             // Determine the starting stream ID for this endpoint. Client-initiated streams
721             // are odd and server-initiated streams are even. Zero is reserved for the
722             // connection. Stream 1 is reserved client-initiated stream for responding to an
723             // upgrade from HTTP 1.1.
724             if (server) {
725                 nextStreamIdToCreate = 2;
726                 nextReservationStreamId = 0;
727             } else {
728                 nextStreamIdToCreate = 1;
729                 // For manually created client-side streams, 1 is reserved for HTTP upgrade, so start at 3.
730                 nextReservationStreamId = 1;
731             }
732 
733             // Push is disallowed by default for servers and allowed for clients.
734             pushToAllowed = !server;
735             maxActiveStreams = MAX_VALUE;
736             this.maxReservedStreams = checkPositiveOrZero(maxReservedStreams, "maxReservedStreams");
737             updateMaxStreams();
738         }
739 
740         @Override
741         public int incrementAndGetNextStreamId() {
742             return nextReservationStreamId >= 0 ? nextReservationStreamId += 2 : nextReservationStreamId;
743         }
744 
745         private void incrementExpectedStreamId(int streamId) {
746             if (streamId > nextReservationStreamId && nextReservationStreamId >= 0) {
747                 nextReservationStreamId = streamId;
748             }
749             nextStreamIdToCreate = streamId + 2;
750             ++numStreams;
751         }
752 
753         @Override
754         public boolean isValidStreamId(int streamId) {
755             return streamId > 0 && server == ((streamId & 1) == 0);
756         }
757 
758         @Override
759         public boolean mayHaveCreatedStream(int streamId) {
760             return isValidStreamId(streamId) && streamId <= lastStreamCreated();
761         }
762 
763         @Override
764         public boolean canOpenStream() {
765             return numActiveStreams < maxActiveStreams;
766         }
767 
768         @Override
769         public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
770             State state = activeState(streamId, IDLE, isLocal(), halfClosed);
771 
772             checkNewStreamAllowed(streamId, state);
773 
774             lastCreatedStreamIdentity++;
775 
776             // Create and initialize the stream.
777             DefaultStream stream = new DefaultStream(lastCreatedStreamIdentity, streamId, state);
778 
779             incrementExpectedStreamId(streamId);
780 
781             addStream(stream);
782 
783             stream.activate();
784             return stream;
785         }
786 
787         @Override
788         public boolean created(Http2Stream stream) {
789             return stream instanceof DefaultStream && ((DefaultStream) stream).createdBy() == this;
790         }
791 
792         @Override
793         public boolean isServer() {
794             return server;
795         }
796 
797         @Override
798         public DefaultStream reservePushStream(int streamId, Http2Stream parent) throws Http2Exception {
799             if (parent == null) {
800                 throw connectionError(PROTOCOL_ERROR, "Parent stream missing");
801             }
802             if (isLocal() ? !parent.state().localSideOpen() : !parent.state().remoteSideOpen()) {
803                 throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id());
804             }
805             if (!opposite().allowPushTo()) {
806                 throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint");
807             }
808             State state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE;
809             checkNewStreamAllowed(streamId, state);
810 
811             lastCreatedStreamIdentity++;
812 
813             // Create and initialize the stream.
814             DefaultStream stream = new DefaultStream(lastCreatedStreamIdentity, streamId, state);
815 
816             incrementExpectedStreamId(streamId);
817 
818             // Register the stream.
819             addStream(stream);
820             return stream;
821         }
822 
823         private void addStream(DefaultStream stream) {
824             // Add the stream to the map and priority tree.
825             streamMap.put(stream.id(), stream);
826 
827             // Notify the listeners of the event.
828             for (int i = 0; i < listeners.size(); i++) {
829                 try {
830                     listeners.get(i).onStreamAdded(stream);
831                 } catch (Throwable cause) {
832                     logger.error("Caught Throwable from listener onStreamAdded.", cause);
833                 }
834             }
835         }
836 
837         @Override
838         public void allowPushTo(boolean allow) {
839             if (allow && server) {
840                 throw new IllegalArgumentException("Servers do not allow push");
841             }
842             pushToAllowed = allow;
843         }
844 
845         @Override
846         public boolean allowPushTo() {
847             return pushToAllowed;
848         }
849 
850         @Override
851         public int numActiveStreams() {
852             return numActiveStreams;
853         }
854 
855         @Override
856         public int maxActiveStreams() {
857             return maxActiveStreams;
858         }
859 
860         @Override
861         public void maxActiveStreams(int maxActiveStreams) {
862             this.maxActiveStreams = maxActiveStreams;
863             updateMaxStreams();
864         }
865 
866         @Override
867         public int lastStreamCreated() {
868             // Stream ids are always incremented by 2 so just subtract it. This is even ok in the case
869             // of nextStreamIdToCreate overflown as it will just return the correct positive number.
870             // Use max(...) to ensure we return the correct value for the case when its a client and no stream
871             // was created yet.
872             return Math.max(0, nextStreamIdToCreate - 2);
873         }
874 
875         @Override
876         public int lastStreamKnownByPeer() {
877             return lastStreamKnownByPeer;
878         }
879 
880         private void lastStreamKnownByPeer(int lastKnownStream) {
881             lastStreamKnownByPeer = lastKnownStream;
882         }
883 
884         @Override
885         public F flowController() {
886             return flowController;
887         }
888 
889         @Override
890         public void flowController(F flowController) {
891             this.flowController = checkNotNull(flowController, "flowController");
892         }
893 
894         @Override
895         public Endpoint<? extends Http2FlowController> opposite() {
896             return isLocal() ? remoteEndpoint : localEndpoint;
897         }
898 
899         private void updateMaxStreams() {
900             maxStreams = (int) Math.min(MAX_VALUE, (long) maxActiveStreams + maxReservedStreams);
901         }
902 
903         private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
904             assert state != IDLE;
905             if (lastStreamKnownByPeer >= 0 && streamId > lastStreamKnownByPeer) {
906                 throw streamError(streamId, REFUSED_STREAM,
907                         "Cannot create stream %d greater than Last-Stream-ID %d from GOAWAY.",
908                         streamId, lastStreamKnownByPeer);
909             }
910             if (!isValidStreamId(streamId)) {
911                 if (streamId < 0) {
912                     throw new Http2NoMoreStreamIdsException();
913                 }
914                 throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId,
915                         server ? "server" : "client");
916             }
917             // This check must be after all id validated checks, but before the max streams check because it may be
918             // recoverable to some degree for handling frames which can be sent on closed streams.
919             if (streamId < nextStreamIdToCreate) {
920                 throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
921                         streamId, nextStreamIdToCreate);
922             }
923             if (nextStreamIdToCreate <= 0) {
924                 // We exhausted the stream id space that we can use. Let's signal this back but also signal that
925                 // we still may want to process active streams.
926                 throw new Http2Exception(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.",
927                         Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN);
928             }
929             boolean isReserved = state == RESERVED_LOCAL || state == RESERVED_REMOTE;
930             if (!isReserved && !canOpenStream() || isReserved && numStreams >= maxStreams) {
931                 throw streamError(streamId, REFUSED_STREAM, "Maximum active streams violated for this endpoint: " +
932                         (isReserved ? maxStreams : maxActiveStreams));
933             }
934             if (isClosed()) {
935                 throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed",
936                                       streamId);
937             }
938         }
939 
940         private boolean isLocal() {
941             return this == localEndpoint;
942         }
943     }
944 
945     /**
946      * Allows events which would modify the collection of active streams to be queued while iterating via {@link
947      * #forEachActiveStream(Http2StreamVisitor)}.
948      */
949     interface Event {
950         /**
951          * Trigger the original intention of this event. Expect to modify the active streams list.
952          * <p/>
953          * If a {@link RuntimeException} object is thrown it will be logged and <strong>not propagated</strong>.
954          * Throwing from this method is not supported and is considered a programming error.
955          */
956         void process();
957     }
958 
959     /**
960      * Manages the list of currently active streams.  Queues any {@link Event}s that would modify the list of
961      * active streams in order to prevent modification while iterating.
962      */
963     private final class ActiveStreams {
964         private final List<Listener> listeners;
965         private final Queue<Event> pendingEvents = new ArrayDeque<Event>(4);
966         private final Set<Http2Stream> streams = new LinkedHashSet<Http2Stream>();
967         private int pendingIterations;
968 
969         ActiveStreams(List<Listener> listeners) {
970             this.listeners = listeners;
971         }
972 
973         public int size() {
974             return streams.size();
975         }
976 
977         public void activate(final DefaultStream stream) {
978             if (allowModifications()) {
979                 addToActiveStreams(stream);
980             } else {
981                 pendingEvents.add(new Event() {
982                     @Override
983                     public void process() {
984                         addToActiveStreams(stream);
985                     }
986                 });
987             }
988         }
989 
990         public void deactivate(final DefaultStream stream, final Iterator<?> itr) {
991             if (allowModifications() || itr != null) {
992                 removeFromActiveStreams(stream, itr);
993             } else {
994                 pendingEvents.add(new Event() {
995                     @Override
996                     public void process() {
997                         removeFromActiveStreams(stream, itr);
998                     }
999                 });
1000             }
1001         }
1002 
1003         public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
1004             incrementPendingIterations();
1005             try {
1006                 for (Http2Stream stream : streams) {
1007                     if (!visitor.visit(stream)) {
1008                         return stream;
1009                     }
1010                 }
1011                 return null;
1012             } finally {
1013                 decrementPendingIterations();
1014             }
1015         }
1016 
1017         void addToActiveStreams(DefaultStream stream) {
1018             if (streams.add(stream)) {
1019                 // Update the number of active streams initiated by the endpoint.
1020                 stream.createdBy().numActiveStreams++;
1021 
1022                 for (int i = 0; i < listeners.size(); i++) {
1023                     try {
1024                         listeners.get(i).onStreamActive(stream);
1025                     } catch (Throwable cause) {
1026                         logger.error("Caught Throwable from listener onStreamActive.", cause);
1027                     }
1028                 }
1029             }
1030         }
1031 
1032         void removeFromActiveStreams(DefaultStream stream, Iterator<?> itr) {
1033             if (streams.remove(stream)) {
1034                 // Update the number of active streams initiated by the endpoint.
1035                 stream.createdBy().numActiveStreams--;
1036                 notifyClosed(stream);
1037             }
1038             removeStream(stream, itr);
1039         }
1040 
1041         boolean allowModifications() {
1042             return pendingIterations == 0;
1043         }
1044 
1045         void incrementPendingIterations() {
1046             ++pendingIterations;
1047         }
1048 
1049         void decrementPendingIterations() {
1050             --pendingIterations;
1051             if (allowModifications()) {
1052                 for (;;) {
1053                     Event event = pendingEvents.poll();
1054                     if (event == null) {
1055                         break;
1056                     }
1057                     try {
1058                         event.process();
1059                     } catch (Throwable cause) {
1060                         logger.error("Caught Throwable while processing pending ActiveStreams$Event.", cause);
1061                     }
1062                 }
1063             }
1064         }
1065     }
1066 
1067     /**
1068      * Implementation of {@link PropertyKey} that specifies the index position of the property.
1069      */
1070     final class DefaultPropertyKey implements PropertyKey {
1071         final int index;
1072 
1073         DefaultPropertyKey(int index) {
1074             this.index = index;
1075         }
1076 
1077         DefaultPropertyKey verifyConnection(Http2Connection connection) {
1078             if (connection != DefaultHttp2Connection.this) {
1079                 throw new IllegalArgumentException("Using a key that was not created by this connection");
1080             }
1081             return this;
1082         }
1083     }
1084 
1085     /**
1086      * A registry of all stream property keys known by this connection.
1087      */
1088     private final class PropertyKeyRegistry {
1089         /**
1090          * Initial size of 4 because the default configuration currently has 3 listeners
1091          * (local/remote flow controller and {@link StreamByteDistributor}) and we leave room for 1 extra.
1092          * We could be more aggressive but the ArrayList resize will double the size if we are too small.
1093          */
1094         final List<DefaultPropertyKey> keys = new ArrayList<DefaultPropertyKey>(4);
1095 
1096         /**
1097          * Registers a new property key.
1098          */
1099         DefaultPropertyKey newKey() {
1100             DefaultPropertyKey key = new DefaultPropertyKey(keys.size());
1101             keys.add(key);
1102             return key;
1103         }
1104 
1105         int size() {
1106             return keys.size();
1107         }
1108     }
1109 }