View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelPromise;
21  import io.netty.handler.codec.http2.Http2Stream.State;
22  import io.netty.util.collection.IntObjectHashMap;
23  import io.netty.util.collection.IntObjectMap;
24  import io.netty.util.collection.IntObjectMap.PrimitiveEntry;
25  import io.netty.util.concurrent.Future;
26  import io.netty.util.concurrent.Promise;
27  import io.netty.util.concurrent.PromiseNotifier;
28  import io.netty.util.internal.EmptyArrays;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.util.ArrayDeque;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.Iterator;
36  import java.util.LinkedHashSet;
37  import java.util.List;
38  import java.util.Queue;
39  import java.util.Set;
40  
41  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
42  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_RESERVED_STREAMS;
43  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
45  import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
46  import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
47  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
48  import static io.netty.handler.codec.http2.Http2Exception.streamError;
49  import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
50  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
51  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
52  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
53  import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
54  import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
55  import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
56  import static io.netty.util.internal.ObjectUtil.checkNotNull;
57  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
58  import static java.lang.Integer.MAX_VALUE;
59  
60  /**
61   * Simple implementation of {@link Http2Connection}.
62   */
63  public class DefaultHttp2Connection implements Http2Connection {
64      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
65      // Fields accessed by inner classes
66      final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
67      final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry();
68      final ConnectionStream connectionStream = new ConnectionStream();
69      final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
70      final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
71  
72      /**
73       * We chose a {@link List} over a {@link Set} to avoid allocating an {@link Iterator} objects when iterating over
74       * the listeners.
75       * <p>
76       * Initial size of 4 because the default configuration currently has 3 listeners
77       * (local/remote flow controller and {@link StreamByteDistributor}) and we leave room for 1 extra.
78       * We could be more aggressive but the ArrayList resize will double the size if we are too small.
79       */
80      final List<Listener> listeners = new ArrayList<Listener>(4);
81      final ActiveStreams activeStreams;
82      Promise<Void> closePromise;
83  
84      /**
85       * Creates a new connection with the given settings.
86       * @param server whether or not this end-point is the server-side of the HTTP/2 connection.
87       */
88      public DefaultHttp2Connection(boolean server) {
89          this(server, DEFAULT_MAX_RESERVED_STREAMS);
90      }
91  
92      /**
93       * Creates a new connection with the given settings.
94       * @param server whether or not this end-point is the server-side of the HTTP/2 connection.
95       * @param maxReservedStreams The maximum amount of streams which can exist in the reserved state for each endpoint.
96       */
97      public DefaultHttp2Connection(boolean server, int maxReservedStreams) {
98          activeStreams = new ActiveStreams(listeners);
99          // Reserved streams are excluded from the SETTINGS_MAX_CONCURRENT_STREAMS limit according to [1] and the RFC
100         // doesn't define a way to communicate the limit on reserved streams. We rely upon the peer to send RST_STREAM
101         // in response to any locally enforced limits being exceeded [2].
102         // [1] https://tools.ietf.org/html/rfc7540#section-5.1.2
103         // [2] https://tools.ietf.org/html/rfc7540#section-8.2.2
104         localEndpoint = new DefaultEndpoint<Http2LocalFlowController>(server, server ? MAX_VALUE : maxReservedStreams);
105         remoteEndpoint = new DefaultEndpoint<Http2RemoteFlowController>(!server, maxReservedStreams);
106 
107         // Add the connection stream to the map.
108         streamMap.put(connectionStream.id(), connectionStream);
109     }
110 
111     /**
112      * Determine if {@link #close(Promise)} has been called and no more streams are allowed to be created.
113      */
114     final boolean isClosed() {
115         return closePromise != null;
116     }
117 
118     @Override
119     public Future<Void> close(final Promise<Void> promise) {
120         checkNotNull(promise, "promise");
121         // Since we allow this method to be called multiple times, we must make sure that all the promises are notified
122         // when all streams are removed and the close operation completes.
123         if (closePromise != null) {
124             if (closePromise == promise) {
125                 // Do nothing
126             } else if (promise instanceof ChannelPromise && ((ChannelFuture) closePromise).isVoid()) {
127                 closePromise = promise;
128             } else {
129                 PromiseNotifier.cascade(closePromise, promise);
130             }
131         } else {
132             closePromise = promise;
133         }
134         if (isStreamMapEmpty()) {
135             promise.trySuccess(null);
136             return promise;
137         }
138 
139         Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator();
140         // We must take care while iterating the streamMap as to not modify while iterating in case there are other code
141         // paths iterating over the active streams.
142         if (activeStreams.allowModifications()) {
143             activeStreams.incrementPendingIterations();
144             try {
145                 while (itr.hasNext()) {
146                     DefaultStream stream = (DefaultStream) itr.next().value();
147                     if (stream.id() != CONNECTION_STREAM_ID) {
148                         // If modifications of the activeStream map is allowed, then a stream close operation will also
149                         // modify the streamMap. Pass the iterator in so that remove will be called to prevent
150                         // concurrent modification exceptions.
151                         stream.close(itr);
152                     }
153                 }
154             } finally {
155                 activeStreams.decrementPendingIterations();
156             }
157         } else {
158             while (itr.hasNext()) {
159                 Http2Stream stream = itr.next().value();
160                 if (stream.id() != CONNECTION_STREAM_ID) {
161                     // We are not allowed to make modifications, so the close calls will be executed after this
162                     // iteration completes.
163                     stream.close();
164                 }
165             }
166         }
167         return closePromise;
168     }
169 
170     @Override
171     public void addListener(Listener listener) {
172         listeners.add(listener);
173     }
174 
175     @Override
176     public void removeListener(Listener listener) {
177         listeners.remove(listener);
178     }
179 
180     @Override
181     public boolean isServer() {
182         return localEndpoint.isServer();
183     }
184 
185     @Override
186     public Http2Stream connectionStream() {
187         return connectionStream;
188     }
189 
190     @Override
191     public Http2Stream stream(int streamId) {
192         return streamMap.get(streamId);
193     }
194 
195     @Override
196     public boolean streamMayHaveExisted(int streamId) {
197         return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId);
198     }
199 
200     @Override
201     public int numActiveStreams() {
202         return activeStreams.size();
203     }
204 
205     @Override
206     public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
207         return activeStreams.forEachActiveStream(visitor);
208     }
209 
210     @Override
211     public Endpoint<Http2LocalFlowController> local() {
212         return localEndpoint;
213     }
214 
215     @Override
216     public Endpoint<Http2RemoteFlowController> remote() {
217         return remoteEndpoint;
218     }
219 
220     @Override
221     public boolean goAwayReceived() {
222         return localEndpoint.lastStreamKnownByPeer >= 0;
223     }
224 
225     @Override
226     public void goAwayReceived(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
227         if (localEndpoint.lastStreamKnownByPeer() >= 0 && localEndpoint.lastStreamKnownByPeer() < lastKnownStream) {
228             throw connectionError(PROTOCOL_ERROR, "lastStreamId MUST NOT increase. Current value: %d new value: %d",
229                     localEndpoint.lastStreamKnownByPeer(), lastKnownStream);
230         }
231 
232         localEndpoint.lastStreamKnownByPeer(lastKnownStream);
233         for (int i = 0; i < listeners.size(); ++i) {
234             try {
235                 listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData);
236             } catch (Throwable cause) {
237                 logger.error("Caught Throwable from listener onGoAwayReceived.", cause);
238             }
239         }
240 
241         closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, localEndpoint);
242     }
243 
244     @Override
245     public boolean goAwaySent() {
246         return remoteEndpoint.lastStreamKnownByPeer >= 0;
247     }
248 
249     @Override
250     public boolean goAwaySent(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
251         if (remoteEndpoint.lastStreamKnownByPeer() >= 0) {
252             // Protect against re-entrancy. Could happen if writing the frame fails, and error handling
253             // treating this is a connection handler and doing a graceful shutdown...
254             if (lastKnownStream == remoteEndpoint.lastStreamKnownByPeer()) {
255                 return false;
256             }
257             if (lastKnownStream > remoteEndpoint.lastStreamKnownByPeer()) {
258                 throw connectionError(PROTOCOL_ERROR, "Last stream identifier must not increase between " +
259                                 "sending multiple GOAWAY frames (was '%d', is '%d').",
260                         remoteEndpoint.lastStreamKnownByPeer(), lastKnownStream);
261             }
262         }
263 
264         remoteEndpoint.lastStreamKnownByPeer(lastKnownStream);
265         for (int i = 0; i < listeners.size(); ++i) {
266             try {
267                 listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData);
268             } catch (Throwable cause) {
269                 logger.error("Caught Throwable from listener onGoAwaySent.", cause);
270             }
271         }
272 
273         closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, remoteEndpoint);
274         return true;
275     }
276 
277     private void closeStreamsGreaterThanLastKnownStreamId(final int lastKnownStream,
278                                                           final DefaultEndpoint<?> endpoint) throws Http2Exception {
279         forEachActiveStream(new Http2StreamVisitor() {
280             @Override
281             public boolean visit(Http2Stream stream) {
282                 if (stream.id() > lastKnownStream && endpoint.isValidStreamId(stream.id())) {
283                     stream.close();
284                 }
285                 return true;
286             }
287         });
288     }
289 
290     /**
291      * Determine if {@link #streamMap} only contains the connection stream.
292      */
293     private boolean isStreamMapEmpty() {
294         return streamMap.size() == 1;
295     }
296 
297     /**
298      * Remove a stream from the {@link #streamMap}.
299      * @param stream the stream to remove.
300      * @param itr an iterator that may be pointing to the stream during iteration and {@link Iterator#remove()} will be
301      * used if non-{@code null}.
302      */
303     void removeStream(DefaultStream stream, Iterator<?> itr) {
304         final boolean removed;
305         if (itr == null) {
306             removed = streamMap.remove(stream.id()) != null;
307         } else {
308             itr.remove();
309             removed = true;
310         }
311 
312         if (removed) {
313             for (int i = 0; i < listeners.size(); i++) {
314                 try {
315                     listeners.get(i).onStreamRemoved(stream);
316                 } catch (Throwable cause) {
317                     logger.error("Caught Throwable from listener onStreamRemoved.", cause);
318                 }
319             }
320 
321             if (closePromise != null && isStreamMapEmpty()) {
322                 closePromise.trySuccess(null);
323             }
324         }
325     }
326 
327     static State activeState(int streamId, State initialState, boolean isLocal, boolean halfClosed)
328             throws Http2Exception {
329         switch (initialState) {
330         case IDLE:
331             return halfClosed ? isLocal ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN;
332         case RESERVED_LOCAL:
333             return HALF_CLOSED_REMOTE;
334         case RESERVED_REMOTE:
335             return HALF_CLOSED_LOCAL;
336         default:
337             throw streamError(streamId, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: "
338                     + initialState);
339         }
340     }
341 
342     void notifyHalfClosed(Http2Stream stream) {
343         for (int i = 0; i < listeners.size(); i++) {
344             try {
345                 listeners.get(i).onStreamHalfClosed(stream);
346             } catch (Throwable cause) {
347                 logger.error("Caught Throwable from listener onStreamHalfClosed.", cause);
348             }
349         }
350     }
351 
352     void notifyClosed(Http2Stream stream) {
353         for (int i = 0; i < listeners.size(); i++) {
354             try {
355                 listeners.get(i).onStreamClosed(stream);
356             } catch (Throwable cause) {
357                 logger.error("Caught Throwable from listener onStreamClosed.", cause);
358             }
359         }
360     }
361 
362     @Override
363     public PropertyKey newKey() {
364         return propertyKeyRegistry.newKey();
365     }
366 
367     /**
368      * Verifies that the key is valid and returns it as the internal {@link DefaultPropertyKey} type.
369      *
370      * @throws NullPointerException if the key is {@code null}.
371      * @throws ClassCastException if the key is not of type {@link DefaultPropertyKey}.
372      * @throws IllegalArgumentException if the key was not created by this connection.
373      */
374     final DefaultPropertyKey verifyKey(PropertyKey key) {
375         return checkNotNull((DefaultPropertyKey) key, "key").verifyConnection(this);
376     }
377 
378     /**
379      * Simple stream implementation. Streams can be compared to each other by priority.
380      */
381     private class DefaultStream implements Http2Stream {
382         private static final byte META_STATE_SENT_RST = 1;
383         private static final byte META_STATE_SENT_HEADERS = 1 << 1;
384         private static final byte META_STATE_SENT_TRAILERS = 1 << 2;
385         private static final byte META_STATE_SENT_PUSHPROMISE = 1 << 3;
386         private static final byte META_STATE_RECV_HEADERS = 1 << 4;
387         private static final byte META_STATE_RECV_TRAILERS = 1 << 5;
388         private final int id;
389         private final long identity;
390         private final PropertyMap properties = new PropertyMap();
391         private State state;
392         private byte metaState;
393 
394         DefaultStream(long identity, int id, State state) {
395             this.identity = identity;
396             this.id = id;
397             this.state = state;
398         }
399 
400         @Override
401         public final int id() {
402             return id;
403         }
404 
405         @Override
406         public final State state() {
407             return state;
408         }
409 
410         @Override
411         public boolean isResetSent() {
412             return (metaState & META_STATE_SENT_RST) != 0;
413         }
414 
415         @Override
416         public Http2Stream resetSent() {
417             metaState |= META_STATE_SENT_RST;
418             return this;
419         }
420 
421         @Override
422         public Http2Stream headersSent(boolean isInformational) {
423             if (!isInformational) {
424                 metaState |= isHeadersSent() ? META_STATE_SENT_TRAILERS : META_STATE_SENT_HEADERS;
425             }
426             return this;
427         }
428 
429         @Override
430         public boolean isHeadersSent() {
431             return (metaState & META_STATE_SENT_HEADERS) != 0;
432         }
433 
434         @Override
435         public boolean isTrailersSent() {
436             return (metaState & META_STATE_SENT_TRAILERS) != 0;
437         }
438 
439         @Override
440         public Http2Stream headersReceived(boolean isInformational) {
441             if (!isInformational) {
442                 metaState |= isHeadersReceived() ? META_STATE_RECV_TRAILERS : META_STATE_RECV_HEADERS;
443             }
444             return this;
445         }
446 
447         @Override
448         public boolean isHeadersReceived() {
449             return (metaState & META_STATE_RECV_HEADERS) != 0;
450         }
451 
452         @Override
453         public boolean isTrailersReceived() {
454             return (metaState & META_STATE_RECV_TRAILERS) != 0;
455         }
456 
457         @Override
458         public Http2Stream pushPromiseSent() {
459             metaState |= META_STATE_SENT_PUSHPROMISE;
460             return this;
461         }
462 
463         @Override
464         public boolean isPushPromiseSent() {
465             return (metaState & META_STATE_SENT_PUSHPROMISE) != 0;
466         }
467 
468         @Override
469         public final <V> V setProperty(PropertyKey key, V value) {
470             return properties.add(verifyKey(key), value);
471         }
472 
473         @Override
474         public final <V> V getProperty(PropertyKey key) {
475             return properties.get(verifyKey(key));
476         }
477 
478         @Override
479         public final <V> V removeProperty(PropertyKey key) {
480             return properties.remove(verifyKey(key));
481         }
482 
483         @Override
484         public Http2Stream open(boolean halfClosed) throws Http2Exception {
485             state = activeState(id, state, isLocal(), halfClosed);
486             final DefaultEndpoint<? extends Http2FlowController> endpoint = createdBy();
487             if (!endpoint.canOpenStream()) {
488                 throw connectionError(PROTOCOL_ERROR, "Maximum active streams violated for this endpoint: " +
489                         endpoint.maxActiveStreams());
490             }
491 
492             activate();
493             return this;
494         }
495 
496         void activate() {
497             // If the stream is opened in a half-closed state, the headers must have either
498             // been sent if this is a local stream, or received if it is a remote stream.
499             if (state == HALF_CLOSED_LOCAL) {
500                 headersSent(/*isInformational*/ false);
501             } else if (state == HALF_CLOSED_REMOTE) {
502                 headersReceived(/*isInformational*/ false);
503             }
504             activeStreams.activate(this);
505         }
506 
507         Http2Stream close(Iterator<?> itr) {
508             if (state == CLOSED) {
509                 return this;
510             }
511 
512             state = CLOSED;
513 
514             --createdBy().numStreams;
515             activeStreams.deactivate(this, itr);
516             return this;
517         }
518 
519         @Override
520         public Http2Stream close() {
521             return close(null);
522         }
523 
524         @Override
525         public Http2Stream closeLocalSide() {
526             switch (state) {
527             case OPEN:
528                 state = HALF_CLOSED_LOCAL;
529                 notifyHalfClosed(this);
530                 break;
531             case HALF_CLOSED_LOCAL:
532                 break;
533             default:
534                 close();
535                 break;
536             }
537             return this;
538         }
539 
540         @Override
541         public Http2Stream closeRemoteSide() {
542             switch (state) {
543             case OPEN:
544                 state = HALF_CLOSED_REMOTE;
545                 notifyHalfClosed(this);
546                 break;
547             case HALF_CLOSED_REMOTE:
548                 break;
549             default:
550                 close();
551                 break;
552             }
553             return this;
554         }
555 
556         DefaultEndpoint<? extends Http2FlowController> createdBy() {
557             return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
558         }
559 
560         final boolean isLocal() {
561             return localEndpoint.isValidStreamId(id);
562         }
563 
564         /**
565          * Provides the lazy initialization for the {@link DefaultStream} data map.
566          */
567         private class PropertyMap {
568             Object[] values = EmptyArrays.EMPTY_OBJECTS;
569 
570             <V> V add(DefaultPropertyKey key, V value) {
571                 resizeIfNecessary(key.index);
572                 @SuppressWarnings("unchecked")
573                 V prevValue = (V) values[key.index];
574                 values[key.index] = value;
575                 return prevValue;
576             }
577 
578             @SuppressWarnings("unchecked")
579             <V> V get(DefaultPropertyKey key) {
580                 if (key.index >= values.length) {
581                     return null;
582                 }
583                 return (V) values[key.index];
584             }
585 
586             @SuppressWarnings("unchecked")
587             <V> V remove(DefaultPropertyKey key) {
588                 V prevValue = null;
589                 if (key.index < values.length) {
590                     prevValue = (V) values[key.index];
591                     values[key.index] = null;
592                 }
593                 return prevValue;
594             }
595 
596             void resizeIfNecessary(int index) {
597                 if (index >= values.length) {
598                     values = Arrays.copyOf(values, propertyKeyRegistry.size());
599                 }
600             }
601         }
602 
603         @Override
604         public boolean equals(final Object obj) {
605             return super.equals(obj);
606         }
607 
608         @Override
609         public int hashCode() {
610             long value = identity;
611             if (value == 0) {
612                 return System.identityHashCode(this);
613             }
614             return (int) (value ^ (value >>> 32));
615         }
616 
617         @Override
618         public String toString() {
619             return getClass().getSimpleName() +
620                     "{id=" + id +
621                     ", state=" + state +
622                     ", metaState=" + metaState +
623                     '}';
624         }
625     }
626 
627     /**
628      * Stream class representing the connection, itself.
629      */
630     private final class ConnectionStream extends DefaultStream {
631         ConnectionStream() {
632             super(0, CONNECTION_STREAM_ID, IDLE);
633         }
634 
635         @Override
636         public boolean isResetSent() {
637             return false;
638         }
639 
640         @Override
641         DefaultEndpoint<? extends Http2FlowController> createdBy() {
642             return null;
643         }
644 
645         @Override
646         public Http2Stream resetSent() {
647             throw new UnsupportedOperationException();
648         }
649 
650         @Override
651         public Http2Stream open(boolean halfClosed) {
652             throw new UnsupportedOperationException();
653         }
654 
655         @Override
656         public Http2Stream close() {
657             throw new UnsupportedOperationException();
658         }
659 
660         @Override
661         public Http2Stream closeLocalSide() {
662             throw new UnsupportedOperationException();
663         }
664 
665         @Override
666         public Http2Stream closeRemoteSide() {
667             throw new UnsupportedOperationException();
668         }
669 
670         @Override
671         public Http2Stream headersSent(boolean isInformational) {
672             throw new UnsupportedOperationException();
673         }
674 
675         @Override
676         public boolean isHeadersSent() {
677             throw new UnsupportedOperationException();
678         }
679 
680         @Override
681         public Http2Stream pushPromiseSent() {
682             throw new UnsupportedOperationException();
683         }
684 
685         @Override
686         public boolean isPushPromiseSent() {
687             throw new UnsupportedOperationException();
688         }
689     }
690 
691     /**
692      * Simple endpoint implementation.
693      */
694     private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
695         private final boolean server;
696         /**
697          * This is an always increasing sequence number used to hash {@link DefaultStream} instances.
698          */
699         private long lastCreatedStreamIdentity;
700         /**
701          * The minimum stream ID allowed when creating the next stream. This only applies at the time the stream is
702          * created. If the ID of the stream being created is less than this value, stream creation will fail. Upon
703          * successful creation of a stream, this value is incremented to the next valid stream ID.
704          */
705         private int nextStreamIdToCreate;
706         /**
707          * Used for reservation of stream IDs. Stream IDs can be reserved in advance by applications before the streams
708          * are actually created.  For example, applications may choose to buffer stream creation attempts as a way of
709          * working around {@code SETTINGS_MAX_CONCURRENT_STREAMS}, in which case they will reserve stream IDs for each
710          * buffered stream.
711          */
712         private int nextReservationStreamId;
713         private int lastStreamKnownByPeer = -1;
714         private boolean pushToAllowed;
715         private F flowController;
716         private int maxStreams;
717         private int maxActiveStreams;
718         private final int maxReservedStreams;
719         // Fields accessed by inner classes
720         int numActiveStreams;
721         int numStreams;
722 
723         DefaultEndpoint(boolean server, int maxReservedStreams) {
724             this.lastCreatedStreamIdentity = 0;
725             this.server = server;
726 
727             // Determine the starting stream ID for this endpoint. Client-initiated streams
728             // are odd and server-initiated streams are even. Zero is reserved for the
729             // connection. Stream 1 is reserved client-initiated stream for responding to an
730             // upgrade from HTTP 1.1.
731             if (server) {
732                 nextStreamIdToCreate = 2;
733                 nextReservationStreamId = 0;
734             } else {
735                 nextStreamIdToCreate = 1;
736                 // For manually created client-side streams, 1 is reserved for HTTP upgrade, so start at 3.
737                 nextReservationStreamId = 1;
738             }
739 
740             // Push is disallowed by default for servers and allowed for clients.
741             pushToAllowed = !server;
742             maxActiveStreams = MAX_VALUE;
743             this.maxReservedStreams = checkPositiveOrZero(maxReservedStreams, "maxReservedStreams");
744             updateMaxStreams();
745         }
746 
747         @Override
748         public int incrementAndGetNextStreamId() {
749             return nextReservationStreamId >= 0 ? nextReservationStreamId += 2 : nextReservationStreamId;
750         }
751 
752         private void incrementExpectedStreamId(int streamId) {
753             if (streamId > nextReservationStreamId && nextReservationStreamId >= 0) {
754                 nextReservationStreamId = streamId;
755             }
756             nextStreamIdToCreate = streamId + 2;
757             ++numStreams;
758         }
759 
760         @Override
761         public boolean isValidStreamId(int streamId) {
762             return streamId > 0 && server == ((streamId & 1) == 0);
763         }
764 
765         @Override
766         public boolean mayHaveCreatedStream(int streamId) {
767             return isValidStreamId(streamId) && streamId <= lastStreamCreated();
768         }
769 
770         @Override
771         public boolean canOpenStream() {
772             return numActiveStreams < maxActiveStreams;
773         }
774 
775         @Override
776         public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
777             State state = activeState(streamId, IDLE, isLocal(), halfClosed);
778 
779             checkNewStreamAllowed(streamId, state);
780 
781             lastCreatedStreamIdentity++;
782 
783             // Create and initialize the stream.
784             DefaultStream stream = new DefaultStream(lastCreatedStreamIdentity, streamId, state);
785 
786             incrementExpectedStreamId(streamId);
787 
788             addStream(stream);
789 
790             stream.activate();
791             return stream;
792         }
793 
794         @Override
795         public boolean created(Http2Stream stream) {
796             return stream instanceof DefaultStream && ((DefaultStream) stream).createdBy() == this;
797         }
798 
799         @Override
800         public boolean isServer() {
801             return server;
802         }
803 
804         @Override
805         public DefaultStream reservePushStream(int streamId, Http2Stream parent) throws Http2Exception {
806             if (parent == null) {
807                 throw connectionError(PROTOCOL_ERROR, "Parent stream missing");
808             }
809             if (isLocal() ? !parent.state().localSideOpen() : !parent.state().remoteSideOpen()) {
810                 throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id());
811             }
812             if (!opposite().allowPushTo()) {
813                 throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint");
814             }
815             State state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE;
816             checkNewStreamAllowed(streamId, state);
817 
818             lastCreatedStreamIdentity++;
819 
820             // Create and initialize the stream.
821             DefaultStream stream = new DefaultStream(lastCreatedStreamIdentity, streamId, state);
822 
823             incrementExpectedStreamId(streamId);
824 
825             // Register the stream.
826             addStream(stream);
827             return stream;
828         }
829 
830         private void addStream(DefaultStream stream) {
831             // Add the stream to the map and priority tree.
832             streamMap.put(stream.id(), stream);
833 
834             // Notify the listeners of the event.
835             for (int i = 0; i < listeners.size(); i++) {
836                 try {
837                     listeners.get(i).onStreamAdded(stream);
838                 } catch (Throwable cause) {
839                     logger.error("Caught Throwable from listener onStreamAdded.", cause);
840                 }
841             }
842         }
843 
844         @Override
845         public void allowPushTo(boolean allow) {
846             if (allow && server) {
847                 throw new IllegalArgumentException("Servers do not allow push");
848             }
849             pushToAllowed = allow;
850         }
851 
852         @Override
853         public boolean allowPushTo() {
854             return pushToAllowed;
855         }
856 
857         @Override
858         public int numActiveStreams() {
859             return numActiveStreams;
860         }
861 
862         @Override
863         public int maxActiveStreams() {
864             return maxActiveStreams;
865         }
866 
867         @Override
868         public void maxActiveStreams(int maxActiveStreams) {
869             this.maxActiveStreams = maxActiveStreams;
870             updateMaxStreams();
871         }
872 
873         @Override
874         public int lastStreamCreated() {
875             // Stream ids are always incremented by 2 so just subtract it. This is even ok in the case
876             // of nextStreamIdToCreate overflown as it will just return the correct positive number.
877             // Use max(...) to ensure we return the correct value for the case when its a client and no stream
878             // was created yet.
879             return Math.max(0, nextStreamIdToCreate - 2);
880         }
881 
882         @Override
883         public int lastStreamKnownByPeer() {
884             return lastStreamKnownByPeer;
885         }
886 
887         private void lastStreamKnownByPeer(int lastKnownStream) {
888             lastStreamKnownByPeer = lastKnownStream;
889         }
890 
891         @Override
892         public F flowController() {
893             return flowController;
894         }
895 
896         @Override
897         public void flowController(F flowController) {
898             this.flowController = checkNotNull(flowController, "flowController");
899         }
900 
901         @Override
902         public Endpoint<? extends Http2FlowController> opposite() {
903             return isLocal() ? remoteEndpoint : localEndpoint;
904         }
905 
906         private void updateMaxStreams() {
907             maxStreams = (int) Math.min(MAX_VALUE, (long) maxActiveStreams + maxReservedStreams);
908         }
909 
910         private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
911             assert state != IDLE;
912             if (lastStreamKnownByPeer >= 0 && streamId > lastStreamKnownByPeer) {
913                 throw streamError(streamId, REFUSED_STREAM,
914                         "Cannot create stream %d greater than Last-Stream-ID %d from GOAWAY.",
915                         streamId, lastStreamKnownByPeer);
916             }
917             if (!isValidStreamId(streamId)) {
918                 if (streamId < 0) {
919                     throw new Http2NoMoreStreamIdsException();
920                 }
921                 throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId,
922                         server ? "server" : "client");
923             }
924             // This check must be after all id validated checks, but before the max streams check because it may be
925             // recoverable to some degree for handling frames which can be sent on closed streams.
926             if (streamId < nextStreamIdToCreate) {
927                 throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
928                         streamId, nextStreamIdToCreate);
929             }
930             if (nextStreamIdToCreate <= 0) {
931                 // We exhausted the stream id space that we can use. Let's signal this back but also signal that
932                 // we still may want to process active streams.
933                 throw new Http2Exception(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.",
934                         Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN);
935             }
936             boolean isReserved = state == RESERVED_LOCAL || state == RESERVED_REMOTE;
937             if (!isReserved && !canOpenStream() || isReserved && numStreams >= maxStreams) {
938                 throw streamError(streamId, REFUSED_STREAM, "Maximum active streams violated for this endpoint: " +
939                         (isReserved ? maxStreams : maxActiveStreams));
940             }
941             if (isClosed()) {
942                 throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed",
943                                       streamId);
944             }
945         }
946 
947         private boolean isLocal() {
948             return this == localEndpoint;
949         }
950     }
951 
952     /**
953      * Allows events which would modify the collection of active streams to be queued while iterating via {@link
954      * #forEachActiveStream(Http2StreamVisitor)}.
955      */
956     interface Event {
957         /**
958          * Trigger the original intention of this event. Expect to modify the active streams list.
959          * <p/>
960          * If a {@link RuntimeException} object is thrown it will be logged and <strong>not propagated</strong>.
961          * Throwing from this method is not supported and is considered a programming error.
962          */
963         void process();
964     }
965 
966     /**
967      * Manages the list of currently active streams.  Queues any {@link Event}s that would modify the list of
968      * active streams in order to prevent modification while iterating.
969      */
970     private final class ActiveStreams {
971         private final List<Listener> listeners;
972         private final Queue<Event> pendingEvents = new ArrayDeque<Event>(4);
973         private final Set<Http2Stream> streams = new LinkedHashSet<Http2Stream>();
974         private int pendingIterations;
975 
976         ActiveStreams(List<Listener> listeners) {
977             this.listeners = listeners;
978         }
979 
980         public int size() {
981             return streams.size();
982         }
983 
984         public void activate(final DefaultStream stream) {
985             if (allowModifications()) {
986                 addToActiveStreams(stream);
987             } else {
988                 pendingEvents.add(new Event() {
989                     @Override
990                     public void process() {
991                         addToActiveStreams(stream);
992                     }
993                 });
994             }
995         }
996 
997         public void deactivate(final DefaultStream stream, final Iterator<?> itr) {
998             if (allowModifications() || itr != null) {
999                 removeFromActiveStreams(stream, itr);
1000             } else {
1001                 pendingEvents.add(new Event() {
1002                     @Override
1003                     public void process() {
1004                         removeFromActiveStreams(stream, itr);
1005                     }
1006                 });
1007             }
1008         }
1009 
1010         public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
1011             incrementPendingIterations();
1012             try {
1013                 for (Http2Stream stream : streams) {
1014                     if (!visitor.visit(stream)) {
1015                         return stream;
1016                     }
1017                 }
1018                 return null;
1019             } finally {
1020                 decrementPendingIterations();
1021             }
1022         }
1023 
1024         void addToActiveStreams(DefaultStream stream) {
1025             if (streams.add(stream)) {
1026                 // Update the number of active streams initiated by the endpoint.
1027                 stream.createdBy().numActiveStreams++;
1028 
1029                 for (int i = 0; i < listeners.size(); i++) {
1030                     try {
1031                         listeners.get(i).onStreamActive(stream);
1032                     } catch (Throwable cause) {
1033                         logger.error("Caught Throwable from listener onStreamActive.", cause);
1034                     }
1035                 }
1036             }
1037         }
1038 
1039         void removeFromActiveStreams(DefaultStream stream, Iterator<?> itr) {
1040             if (streams.remove(stream)) {
1041                 // Update the number of active streams initiated by the endpoint.
1042                 stream.createdBy().numActiveStreams--;
1043                 notifyClosed(stream);
1044             }
1045             removeStream(stream, itr);
1046         }
1047 
1048         boolean allowModifications() {
1049             return pendingIterations == 0;
1050         }
1051 
1052         void incrementPendingIterations() {
1053             ++pendingIterations;
1054         }
1055 
1056         void decrementPendingIterations() {
1057             --pendingIterations;
1058             if (allowModifications()) {
1059                 for (;;) {
1060                     Event event = pendingEvents.poll();
1061                     if (event == null) {
1062                         break;
1063                     }
1064                     try {
1065                         event.process();
1066                     } catch (Throwable cause) {
1067                         logger.error("Caught Throwable while processing pending ActiveStreams$Event.", cause);
1068                     }
1069                 }
1070             }
1071         }
1072     }
1073 
1074     /**
1075      * Implementation of {@link PropertyKey} that specifies the index position of the property.
1076      */
1077     final class DefaultPropertyKey implements PropertyKey {
1078         final int index;
1079 
1080         DefaultPropertyKey(int index) {
1081             this.index = index;
1082         }
1083 
1084         DefaultPropertyKey verifyConnection(Http2Connection connection) {
1085             if (connection != DefaultHttp2Connection.this) {
1086                 throw new IllegalArgumentException("Using a key that was not created by this connection");
1087             }
1088             return this;
1089         }
1090     }
1091 
1092     /**
1093      * A registry of all stream property keys known by this connection.
1094      */
1095     private final class PropertyKeyRegistry {
1096         /**
1097          * Initial size of 4 because the default configuration currently has 3 listeners
1098          * (local/remote flow controller and {@link StreamByteDistributor}) and we leave room for 1 extra.
1099          * We could be more aggressive but the ArrayList resize will double the size if we are too small.
1100          */
1101         final List<DefaultPropertyKey> keys = new ArrayList<DefaultPropertyKey>(4);
1102 
1103         /**
1104          * Registers a new property key.
1105          */
1106         DefaultPropertyKey newKey() {
1107             DefaultPropertyKey key = new DefaultPropertyKey(keys.size());
1108             keys.add(key);
1109             return key;
1110         }
1111 
1112         int size() {
1113             return keys.size();
1114         }
1115     }
1116 }