View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelPromise;
20  import io.netty.handler.codec.http2.Http2Stream.State;
21  import io.netty.util.collection.IntObjectHashMap;
22  import io.netty.util.collection.IntObjectMap;
23  import io.netty.util.collection.IntObjectMap.PrimitiveEntry;
24  import io.netty.util.concurrent.Future;
25  import io.netty.util.concurrent.Promise;
26  import io.netty.util.concurrent.UnaryPromiseNotifier;
27  import io.netty.util.internal.EmptyArrays;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.UnstableApi;
30  import io.netty.util.internal.logging.InternalLogger;
31  import io.netty.util.internal.logging.InternalLoggerFactory;
32  
33  import java.util.ArrayDeque;
34  import java.util.ArrayList;
35  import java.util.Arrays;
36  import java.util.Iterator;
37  import java.util.LinkedHashSet;
38  import java.util.List;
39  import java.util.Queue;
40  import java.util.Set;
41  
42  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
43  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_RESERVED_STREAMS;
44  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
45  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46  import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
47  import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
48  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
49  import static io.netty.handler.codec.http2.Http2Exception.streamError;
50  import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
51  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
52  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
53  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
54  import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
55  import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
56  import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
57  import static io.netty.util.internal.ObjectUtil.checkNotNull;
58  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
59  import static java.lang.Integer.MAX_VALUE;
60  
61  /**
62   * Simple implementation of {@link Http2Connection}.
63   */
64  @UnstableApi
65  public class DefaultHttp2Connection implements Http2Connection {
66      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
67      // Fields accessed by inner classes
68      final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
69      final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry();
70      final ConnectionStream connectionStream = new ConnectionStream();
71      final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
72      final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
73  
74      /**
75       * We chose a {@link List} over a {@link Set} to avoid allocating an {@link Iterator} objects when iterating over
76       * the listeners.
77       * <p>
78       * Initial size of 4 because the default configuration currently has 3 listeners
79       * (local/remote flow controller and {@link StreamByteDistributor}) and we leave room for 1 extra.
80       * We could be more aggressive but the ArrayList resize will double the size if we are too small.
81       */
82      final List<Listener> listeners = new ArrayList<Listener>(4);
83      final ActiveStreams activeStreams;
84      Promise<Void> closePromise;
85  
86      /**
87       * Creates a new connection with the given settings.
88       * @param server whether or not this end-point is the server-side of the HTTP/2 connection.
89       */
90      public DefaultHttp2Connection(boolean server) {
91          this(server, DEFAULT_MAX_RESERVED_STREAMS);
92      }
93  
94      /**
95       * Creates a new connection with the given settings.
96       * @param server whether or not this end-point is the server-side of the HTTP/2 connection.
97       * @param maxReservedStreams The maximum amount of streams which can exist in the reserved state for each endpoint.
98       */
99      public DefaultHttp2Connection(boolean server, int maxReservedStreams) {
100         activeStreams = new ActiveStreams(listeners);
101         // Reserved streams are excluded from the SETTINGS_MAX_CONCURRENT_STREAMS limit according to [1] and the RFC
102         // doesn't define a way to communicate the limit on reserved streams. We rely upon the peer to send RST_STREAM
103         // in response to any locally enforced limits being exceeded [2].
104         // [1] https://tools.ietf.org/html/rfc7540#section-5.1.2
105         // [2] https://tools.ietf.org/html/rfc7540#section-8.2.2
106         localEndpoint = new DefaultEndpoint<Http2LocalFlowController>(server, server ? MAX_VALUE : maxReservedStreams);
107         remoteEndpoint = new DefaultEndpoint<Http2RemoteFlowController>(!server, maxReservedStreams);
108 
109         // Add the connection stream to the map.
110         streamMap.put(connectionStream.id(), connectionStream);
111     }
112 
113     /**
114      * Determine if {@link #close(Promise)} has been called and no more streams are allowed to be created.
115      */
116     final boolean isClosed() {
117         return closePromise != null;
118     }
119 
120     @Override
121     public Future<Void> close(final Promise<Void> promise) {
122         checkNotNull(promise, "promise");
123         // Since we allow this method to be called multiple times, we must make sure that all the promises are notified
124         // when all streams are removed and the close operation completes.
125         if (closePromise != null) {
126             if (closePromise == promise) {
127                 // Do nothing
128             } else if ((promise instanceof ChannelPromise) && ((ChannelPromise) closePromise).isVoid()) {
129                 closePromise = promise;
130             } else {
131                 closePromise.addListener(new UnaryPromiseNotifier<Void>(promise));
132             }
133         } else {
134             closePromise = promise;
135         }
136         if (isStreamMapEmpty()) {
137             promise.trySuccess(null);
138             return promise;
139         }
140 
141         Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator();
142         // We must take care while iterating the streamMap as to not modify while iterating in case there are other code
143         // paths iterating over the active streams.
144         if (activeStreams.allowModifications()) {
145             activeStreams.incrementPendingIterations();
146             try {
147                 while (itr.hasNext()) {
148                     DefaultStream stream = (DefaultStream) itr.next().value();
149                     if (stream.id() != CONNECTION_STREAM_ID) {
150                         // If modifications of the activeStream map is allowed, then a stream close operation will also
151                         // modify the streamMap. Pass the iterator in so that remove will be called to prevent
152                         // concurrent modification exceptions.
153                         stream.close(itr);
154                     }
155                 }
156             } finally {
157                 activeStreams.decrementPendingIterations();
158             }
159         } else {
160             while (itr.hasNext()) {
161                 Http2Stream stream = itr.next().value();
162                 if (stream.id() != CONNECTION_STREAM_ID) {
163                     // We are not allowed to make modifications, so the close calls will be executed after this
164                     // iteration completes.
165                     stream.close();
166                 }
167             }
168         }
169         return closePromise;
170     }
171 
172     @Override
173     public void addListener(Listener listener) {
174         listeners.add(listener);
175     }
176 
177     @Override
178     public void removeListener(Listener listener) {
179         listeners.remove(listener);
180     }
181 
182     @Override
183     public boolean isServer() {
184         return localEndpoint.isServer();
185     }
186 
187     @Override
188     public Http2Stream connectionStream() {
189         return connectionStream;
190     }
191 
192     @Override
193     public Http2Stream stream(int streamId) {
194         return streamMap.get(streamId);
195     }
196 
197     @Override
198     public boolean streamMayHaveExisted(int streamId) {
199         return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId);
200     }
201 
202     @Override
203     public int numActiveStreams() {
204         return activeStreams.size();
205     }
206 
207     @Override
208     public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
209         return activeStreams.forEachActiveStream(visitor);
210     }
211 
212     @Override
213     public Endpoint<Http2LocalFlowController> local() {
214         return localEndpoint;
215     }
216 
217     @Override
218     public Endpoint<Http2RemoteFlowController> remote() {
219         return remoteEndpoint;
220     }
221 
222     @Override
223     public boolean goAwayReceived() {
224         return localEndpoint.lastStreamKnownByPeer >= 0;
225     }
226 
227     @Override
228     public void goAwayReceived(final int lastKnownStream, long errorCode, ByteBuf debugData) {
229         localEndpoint.lastStreamKnownByPeer(lastKnownStream);
230         for (int i = 0; i < listeners.size(); ++i) {
231             try {
232                 listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData);
233             } catch (Throwable cause) {
234                 logger.error("Caught Throwable from listener onGoAwayReceived.", cause);
235             }
236         }
237 
238         try {
239             forEachActiveStream(new Http2StreamVisitor() {
240                 @Override
241                 public boolean visit(Http2Stream stream) {
242                     if (stream.id() > lastKnownStream && localEndpoint.isValidStreamId(stream.id())) {
243                         stream.close();
244                     }
245                     return true;
246                 }
247             });
248         } catch (Http2Exception e) {
249             PlatformDependent.throwException(e);
250         }
251     }
252 
253     @Override
254     public boolean goAwaySent() {
255         return remoteEndpoint.lastStreamKnownByPeer >= 0;
256     }
257 
258     @Override
259     public void goAwaySent(final int lastKnownStream, long errorCode, ByteBuf debugData) {
260         remoteEndpoint.lastStreamKnownByPeer(lastKnownStream);
261         for (int i = 0; i < listeners.size(); ++i) {
262             try {
263                 listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData);
264             } catch (Throwable cause) {
265                 logger.error("Caught Throwable from listener onGoAwaySent.", cause);
266             }
267         }
268 
269         try {
270             forEachActiveStream(new Http2StreamVisitor() {
271                 @Override
272                 public boolean visit(Http2Stream stream) {
273                     if (stream.id() > lastKnownStream && remoteEndpoint.isValidStreamId(stream.id())) {
274                         stream.close();
275                     }
276                     return true;
277                 }
278             });
279         } catch (Http2Exception e) {
280             PlatformDependent.throwException(e);
281         }
282     }
283 
284     /**
285      * Determine if {@link #streamMap} only contains the connection stream.
286      */
287     private boolean isStreamMapEmpty() {
288         return streamMap.size() == 1;
289     }
290 
291     /**
292      * Remove a stream from the {@link #streamMap}.
293      * @param stream the stream to remove.
294      * @param itr an iterator that may be pointing to the stream during iteration and {@link Iterator#remove()} will be
295      * used if non-{@code null}.
296      */
297     void removeStream(DefaultStream stream, Iterator<?> itr) {
298         final boolean removed;
299         if (itr == null) {
300             removed = streamMap.remove(stream.id()) != null;
301         } else {
302             itr.remove();
303             removed = true;
304         }
305 
306         if (removed) {
307             for (int i = 0; i < listeners.size(); i++) {
308                 try {
309                     listeners.get(i).onStreamRemoved(stream);
310                 } catch (Throwable cause) {
311                     logger.error("Caught Throwable from listener onStreamRemoved.", cause);
312                 }
313             }
314 
315             if (closePromise != null && isStreamMapEmpty()) {
316                 closePromise.trySuccess(null);
317             }
318         }
319     }
320 
321     static State activeState(int streamId, State initialState, boolean isLocal, boolean halfClosed)
322             throws Http2Exception {
323         switch (initialState) {
324         case IDLE:
325             return halfClosed ? isLocal ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN;
326         case RESERVED_LOCAL:
327             return HALF_CLOSED_REMOTE;
328         case RESERVED_REMOTE:
329             return HALF_CLOSED_LOCAL;
330         default:
331             throw streamError(streamId, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: "
332                     + initialState);
333         }
334     }
335 
336     void notifyHalfClosed(Http2Stream stream) {
337         for (int i = 0; i < listeners.size(); i++) {
338             try {
339                 listeners.get(i).onStreamHalfClosed(stream);
340             } catch (Throwable cause) {
341                 logger.error("Caught Throwable from listener onStreamHalfClosed.", cause);
342             }
343         }
344     }
345 
346     void notifyClosed(Http2Stream stream) {
347         for (int i = 0; i < listeners.size(); i++) {
348             try {
349                 listeners.get(i).onStreamClosed(stream);
350             } catch (Throwable cause) {
351                 logger.error("Caught Throwable from listener onStreamClosed.", cause);
352             }
353         }
354     }
355 
356     @Override
357     public PropertyKey newKey() {
358         return propertyKeyRegistry.newKey();
359     }
360 
361     /**
362      * Verifies that the key is valid and returns it as the internal {@link DefaultPropertyKey} type.
363      *
364      * @throws NullPointerException if the key is {@code null}.
365      * @throws ClassCastException if the key is not of type {@link DefaultPropertyKey}.
366      * @throws IllegalArgumentException if the key was not created by this connection.
367      */
368     final DefaultPropertyKey verifyKey(PropertyKey key) {
369         return checkNotNull((DefaultPropertyKey) key, "key").verifyConnection(this);
370     }
371 
372     /**
373      * Simple stream implementation. Streams can be compared to each other by priority.
374      */
375     private class DefaultStream implements Http2Stream {
376         private static final byte META_STATE_SENT_RST = 1;
377         private static final byte META_STATE_SENT_HEADERS = 1 << 1;
378         private static final byte META_STATE_SENT_TRAILERS = 1 << 2;
379         private static final byte META_STATE_SENT_PUSHPROMISE = 1 << 3;
380         private static final byte META_STATE_RECV_HEADERS = 1 << 4;
381         private static final byte META_STATE_RECV_TRAILERS = 1 << 5;
382         private final int id;
383         private final PropertyMap properties = new PropertyMap();
384         private State state;
385         private byte metaState;
386 
387         DefaultStream(int id, State state) {
388             this.id = id;
389             this.state = state;
390         }
391 
392         @Override
393         public final int id() {
394             return id;
395         }
396 
397         @Override
398         public final State state() {
399             return state;
400         }
401 
402         @Override
403         public boolean isResetSent() {
404             return (metaState & META_STATE_SENT_RST) != 0;
405         }
406 
407         @Override
408         public Http2Stream resetSent() {
409             metaState |= META_STATE_SENT_RST;
410             return this;
411         }
412 
413         @Override
414         public Http2Stream headersSent(boolean isInformational) {
415             if (!isInformational) {
416                 metaState |= isHeadersSent() ? META_STATE_SENT_TRAILERS : META_STATE_SENT_HEADERS;
417             }
418             return this;
419         }
420 
421         @Override
422         public boolean isHeadersSent() {
423             return (metaState & META_STATE_SENT_HEADERS) != 0;
424         }
425 
426         @Override
427         public boolean isTrailersSent() {
428             return (metaState & META_STATE_SENT_TRAILERS) != 0;
429         }
430 
431         @Override
432         public Http2Stream headersReceived(boolean isInformational) {
433             if (!isInformational) {
434                 metaState |= isHeadersReceived() ? META_STATE_RECV_TRAILERS : META_STATE_RECV_HEADERS;
435             }
436             return this;
437         }
438 
439         @Override
440         public boolean isHeadersReceived() {
441             return (metaState & META_STATE_RECV_HEADERS) != 0;
442         }
443 
444         @Override
445         public boolean isTrailersReceived() {
446             return (metaState & META_STATE_RECV_TRAILERS) != 0;
447         }
448 
449         @Override
450         public Http2Stream pushPromiseSent() {
451             metaState |= META_STATE_SENT_PUSHPROMISE;
452             return this;
453         }
454 
455         @Override
456         public boolean isPushPromiseSent() {
457             return (metaState & META_STATE_SENT_PUSHPROMISE) != 0;
458         }
459 
460         @Override
461         public final <V> V setProperty(PropertyKey key, V value) {
462             return properties.add(verifyKey(key), value);
463         }
464 
465         @Override
466         public final <V> V getProperty(PropertyKey key) {
467             return properties.get(verifyKey(key));
468         }
469 
470         @Override
471         public final <V> V removeProperty(PropertyKey key) {
472             return properties.remove(verifyKey(key));
473         }
474 
475         @Override
476         public Http2Stream open(boolean halfClosed) throws Http2Exception {
477             state = activeState(id, state, isLocal(), halfClosed);
478             if (!createdBy().canOpenStream()) {
479                 throw connectionError(PROTOCOL_ERROR, "Maximum active streams violated for this endpoint.");
480             }
481             activate();
482             return this;
483         }
484 
485         void activate() {
486             activeStreams.activate(this);
487         }
488 
489         Http2Stream close(Iterator<?> itr) {
490             if (state == CLOSED) {
491                 return this;
492             }
493 
494             state = CLOSED;
495 
496             --createdBy().numStreams;
497             activeStreams.deactivate(this, itr);
498             return this;
499         }
500 
501         @Override
502         public Http2Stream close() {
503             return close(null);
504         }
505 
506         @Override
507         public Http2Stream closeLocalSide() {
508             switch (state) {
509             case OPEN:
510                 state = HALF_CLOSED_LOCAL;
511                 notifyHalfClosed(this);
512                 break;
513             case HALF_CLOSED_LOCAL:
514                 break;
515             default:
516                 close();
517                 break;
518             }
519             return this;
520         }
521 
522         @Override
523         public Http2Stream closeRemoteSide() {
524             switch (state) {
525             case OPEN:
526                 state = HALF_CLOSED_REMOTE;
527                 notifyHalfClosed(this);
528                 break;
529             case HALF_CLOSED_REMOTE:
530                 break;
531             default:
532                 close();
533                 break;
534             }
535             return this;
536         }
537 
538         DefaultEndpoint<? extends Http2FlowController> createdBy() {
539             return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
540         }
541 
542         final boolean isLocal() {
543             return localEndpoint.isValidStreamId(id);
544         }
545 
546         /**
547          * Provides the lazy initialization for the {@link DefaultStream} data map.
548          */
549         private class PropertyMap {
550             Object[] values = EmptyArrays.EMPTY_OBJECTS;
551 
552             <V> V add(DefaultPropertyKey key, V value) {
553                 resizeIfNecessary(key.index);
554                 @SuppressWarnings("unchecked")
555                 V prevValue = (V) values[key.index];
556                 values[key.index] = value;
557                 return prevValue;
558             }
559 
560             @SuppressWarnings("unchecked")
561             <V> V get(DefaultPropertyKey key) {
562                 if (key.index >= values.length) {
563                     return null;
564                 }
565                 return (V) values[key.index];
566             }
567 
568             @SuppressWarnings("unchecked")
569             <V> V remove(DefaultPropertyKey key) {
570                 V prevValue = null;
571                 if (key.index < values.length) {
572                     prevValue = (V) values[key.index];
573                     values[key.index] = null;
574                 }
575                 return prevValue;
576             }
577 
578             void resizeIfNecessary(int index) {
579                 if (index >= values.length) {
580                     values = Arrays.copyOf(values, propertyKeyRegistry.size());
581                 }
582             }
583         }
584     }
585 
586     /**
587      * Stream class representing the connection, itself.
588      */
589     private final class ConnectionStream extends DefaultStream {
590         ConnectionStream() {
591             super(CONNECTION_STREAM_ID, IDLE);
592         }
593 
594         @Override
595         public boolean isResetSent() {
596             return false;
597         }
598 
599         @Override
600         DefaultEndpoint<? extends Http2FlowController> createdBy() {
601             return null;
602         }
603 
604         @Override
605         public Http2Stream resetSent() {
606             throw new UnsupportedOperationException();
607         }
608 
609         @Override
610         public Http2Stream open(boolean halfClosed) {
611             throw new UnsupportedOperationException();
612         }
613 
614         @Override
615         public Http2Stream close() {
616             throw new UnsupportedOperationException();
617         }
618 
619         @Override
620         public Http2Stream closeLocalSide() {
621             throw new UnsupportedOperationException();
622         }
623 
624         @Override
625         public Http2Stream closeRemoteSide() {
626             throw new UnsupportedOperationException();
627         }
628 
629         @Override
630         public Http2Stream headersSent(boolean isInformational) {
631             throw new UnsupportedOperationException();
632         }
633 
634         @Override
635         public boolean isHeadersSent() {
636             throw new UnsupportedOperationException();
637         }
638 
639         @Override
640         public Http2Stream pushPromiseSent() {
641             throw new UnsupportedOperationException();
642         }
643 
644         @Override
645         public boolean isPushPromiseSent() {
646             throw new UnsupportedOperationException();
647         }
648     }
649 
650     /**
651      * Simple endpoint implementation.
652      */
653     private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
654         private final boolean server;
655         /**
656          * The minimum stream ID allowed when creating the next stream. This only applies at the time the stream is
657          * created. If the ID of the stream being created is less than this value, stream creation will fail. Upon
658          * successful creation of a stream, this value is incremented to the next valid stream ID.
659          */
660         private int nextStreamIdToCreate;
661         /**
662          * Used for reservation of stream IDs. Stream IDs can be reserved in advance by applications before the streams
663          * are actually created.  For example, applications may choose to buffer stream creation attempts as a way of
664          * working around {@code SETTINGS_MAX_CONCURRENT_STREAMS}, in which case they will reserve stream IDs for each
665          * buffered stream.
666          */
667         private int nextReservationStreamId;
668         private int lastStreamKnownByPeer = -1;
669         private boolean pushToAllowed = true;
670         private F flowController;
671         private int maxStreams;
672         private int maxActiveStreams;
673         private final int maxReservedStreams;
674         // Fields accessed by inner classes
675         int numActiveStreams;
676         int numStreams;
677 
678         DefaultEndpoint(boolean server, int maxReservedStreams) {
679             this.server = server;
680 
681             // Determine the starting stream ID for this endpoint. Client-initiated streams
682             // are odd and server-initiated streams are even. Zero is reserved for the
683             // connection. Stream 1 is reserved client-initiated stream for responding to an
684             // upgrade from HTTP 1.1.
685             if (server) {
686                 nextStreamIdToCreate = 2;
687                 nextReservationStreamId = 0;
688             } else {
689                 nextStreamIdToCreate = 1;
690                 // For manually created client-side streams, 1 is reserved for HTTP upgrade, so start at 3.
691                 nextReservationStreamId = 1;
692             }
693 
694             // Push is disallowed by default for servers and allowed for clients.
695             pushToAllowed = !server;
696             maxActiveStreams = MAX_VALUE;
697             this.maxReservedStreams = checkPositiveOrZero(maxReservedStreams, "maxReservedStreams");
698             updateMaxStreams();
699         }
700 
701         @Override
702         public int incrementAndGetNextStreamId() {
703             return nextReservationStreamId >= 0 ? nextReservationStreamId += 2 : nextReservationStreamId;
704         }
705 
706         private void incrementExpectedStreamId(int streamId) {
707             if (streamId > nextReservationStreamId && nextReservationStreamId >= 0) {
708                 nextReservationStreamId = streamId;
709             }
710             nextStreamIdToCreate = streamId + 2;
711             ++numStreams;
712         }
713 
714         @Override
715         public boolean isValidStreamId(int streamId) {
716             return streamId > 0 && server == ((streamId & 1) == 0);
717         }
718 
719         @Override
720         public boolean mayHaveCreatedStream(int streamId) {
721             return isValidStreamId(streamId) && streamId <= lastStreamCreated();
722         }
723 
724         @Override
725         public boolean canOpenStream() {
726             return numActiveStreams < maxActiveStreams;
727         }
728 
729         @Override
730         public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
731             State state = activeState(streamId, IDLE, isLocal(), halfClosed);
732 
733             checkNewStreamAllowed(streamId, state);
734 
735             // Create and initialize the stream.
736             DefaultStream stream = new DefaultStream(streamId, state);
737 
738             incrementExpectedStreamId(streamId);
739 
740             addStream(stream);
741 
742             stream.activate();
743             return stream;
744         }
745 
746         @Override
747         public boolean created(Http2Stream stream) {
748             return stream instanceof DefaultStream && ((DefaultStream) stream).createdBy() == this;
749         }
750 
751         @Override
752         public boolean isServer() {
753             return server;
754         }
755 
756         @Override
757         public DefaultStream reservePushStream(int streamId, Http2Stream parent) throws Http2Exception {
758             if (parent == null) {
759                 throw connectionError(PROTOCOL_ERROR, "Parent stream missing");
760             }
761             if (isLocal() ? !parent.state().localSideOpen() : !parent.state().remoteSideOpen()) {
762                 throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id());
763             }
764             if (!opposite().allowPushTo()) {
765                 throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint");
766             }
767             State state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE;
768             checkNewStreamAllowed(streamId, state);
769 
770             // Create and initialize the stream.
771             DefaultStream stream = new DefaultStream(streamId, state);
772 
773             incrementExpectedStreamId(streamId);
774 
775             // Register the stream.
776             addStream(stream);
777             return stream;
778         }
779 
780         private void addStream(DefaultStream stream) {
781             // Add the stream to the map and priority tree.
782             streamMap.put(stream.id(), stream);
783 
784             // Notify the listeners of the event.
785             for (int i = 0; i < listeners.size(); i++) {
786                 try {
787                     listeners.get(i).onStreamAdded(stream);
788                 } catch (Throwable cause) {
789                     logger.error("Caught Throwable from listener onStreamAdded.", cause);
790                 }
791             }
792         }
793 
794         @Override
795         public void allowPushTo(boolean allow) {
796             if (allow && server) {
797                 throw new IllegalArgumentException("Servers do not allow push");
798             }
799             pushToAllowed = allow;
800         }
801 
802         @Override
803         public boolean allowPushTo() {
804             return pushToAllowed;
805         }
806 
807         @Override
808         public int numActiveStreams() {
809             return numActiveStreams;
810         }
811 
812         @Override
813         public int maxActiveStreams() {
814             return maxActiveStreams;
815         }
816 
817         @Override
818         public void maxActiveStreams(int maxActiveStreams) {
819             this.maxActiveStreams = maxActiveStreams;
820             updateMaxStreams();
821         }
822 
823         @Override
824         public int lastStreamCreated() {
825             return nextStreamIdToCreate > 1 ? nextStreamIdToCreate - 2 : 0;
826         }
827 
828         @Override
829         public int lastStreamKnownByPeer() {
830             return lastStreamKnownByPeer;
831         }
832 
833         private void lastStreamKnownByPeer(int lastKnownStream) {
834             this.lastStreamKnownByPeer = lastKnownStream;
835         }
836 
837         @Override
838         public F flowController() {
839             return flowController;
840         }
841 
842         @Override
843         public void flowController(F flowController) {
844             this.flowController = checkNotNull(flowController, "flowController");
845         }
846 
847         @Override
848         public Endpoint<? extends Http2FlowController> opposite() {
849             return isLocal() ? remoteEndpoint : localEndpoint;
850         }
851 
852         private void updateMaxStreams() {
853             maxStreams = (int) Math.min(MAX_VALUE, (long) maxActiveStreams + maxReservedStreams);
854         }
855 
856         private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
857             assert state != IDLE;
858             if (goAwayReceived() && streamId > localEndpoint.lastStreamKnownByPeer()) {
859                 throw connectionError(PROTOCOL_ERROR, "Cannot create stream %d since this endpoint has received a " +
860                                                       "GOAWAY frame with last stream id %d.", streamId,
861                                                       localEndpoint.lastStreamKnownByPeer());
862             }
863             if (!isValidStreamId(streamId)) {
864                 if (streamId < 0) {
865                     throw new Http2NoMoreStreamIdsException();
866                 }
867                 throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId,
868                         server ? "server" : "client");
869             }
870             // This check must be after all id validated checks, but before the max streams check because it may be
871             // recoverable to some degree for handling frames which can be sent on closed streams.
872             if (streamId < nextStreamIdToCreate) {
873                 throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
874                         streamId, nextStreamIdToCreate);
875             }
876             if (nextStreamIdToCreate <= 0) {
877                 throw connectionError(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.");
878             }
879             boolean isReserved = state == RESERVED_LOCAL || state == RESERVED_REMOTE;
880             if (!isReserved && !canOpenStream() || isReserved && numStreams >= maxStreams) {
881                 throw streamError(streamId, REFUSED_STREAM, "Maximum active streams violated for this endpoint.");
882             }
883             if (isClosed()) {
884                 throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed",
885                                       streamId);
886             }
887         }
888 
889         private boolean isLocal() {
890             return this == localEndpoint;
891         }
892     }
893 
894     /**
895      * Allows events which would modify the collection of active streams to be queued while iterating via {@link
896      * #forEachActiveStream(Http2StreamVisitor)}.
897      */
898     interface Event {
899         /**
900          * Trigger the original intention of this event. Expect to modify the active streams list.
901          * <p/>
902          * If a {@link RuntimeException} object is thrown it will be logged and <strong>not propagated</strong>.
903          * Throwing from this method is not supported and is considered a programming error.
904          */
905         void process();
906     }
907 
908     /**
909      * Manages the list of currently active streams.  Queues any {@link Event}s that would modify the list of
910      * active streams in order to prevent modification while iterating.
911      */
912     private final class ActiveStreams {
913         private final List<Listener> listeners;
914         private final Queue<Event> pendingEvents = new ArrayDeque<Event>(4);
915         private final Set<Http2Stream> streams = new LinkedHashSet<Http2Stream>();
916         private int pendingIterations;
917 
918         public ActiveStreams(List<Listener> listeners) {
919             this.listeners = listeners;
920         }
921 
922         public int size() {
923             return streams.size();
924         }
925 
926         public void activate(final DefaultStream stream) {
927             if (allowModifications()) {
928                 addToActiveStreams(stream);
929             } else {
930                 pendingEvents.add(new Event() {
931                     @Override
932                     public void process() {
933                         addToActiveStreams(stream);
934                     }
935                 });
936             }
937         }
938 
939         public void deactivate(final DefaultStream stream, final Iterator<?> itr) {
940             if (allowModifications() || itr != null) {
941                 removeFromActiveStreams(stream, itr);
942             } else {
943                 pendingEvents.add(new Event() {
944                     @Override
945                     public void process() {
946                         removeFromActiveStreams(stream, itr);
947                     }
948                 });
949             }
950         }
951 
952         public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
953             incrementPendingIterations();
954             try {
955                 for (Http2Stream stream : streams) {
956                     if (!visitor.visit(stream)) {
957                         return stream;
958                     }
959                 }
960                 return null;
961             } finally {
962                 decrementPendingIterations();
963             }
964         }
965 
966         void addToActiveStreams(DefaultStream stream) {
967             if (streams.add(stream)) {
968                 // Update the number of active streams initiated by the endpoint.
969                 stream.createdBy().numActiveStreams++;
970 
971                 for (int i = 0; i < listeners.size(); i++) {
972                     try {
973                         listeners.get(i).onStreamActive(stream);
974                     } catch (Throwable cause) {
975                         logger.error("Caught Throwable from listener onStreamActive.", cause);
976                     }
977                 }
978             }
979         }
980 
981         void removeFromActiveStreams(DefaultStream stream, Iterator<?> itr) {
982             if (streams.remove(stream)) {
983                 // Update the number of active streams initiated by the endpoint.
984                 stream.createdBy().numActiveStreams--;
985                 notifyClosed(stream);
986             }
987             removeStream(stream, itr);
988         }
989 
990         boolean allowModifications() {
991             return pendingIterations == 0;
992         }
993 
994         void incrementPendingIterations() {
995             ++pendingIterations;
996         }
997 
998         void decrementPendingIterations() {
999             --pendingIterations;
1000             if (allowModifications()) {
1001                 for (;;) {
1002                     Event event = pendingEvents.poll();
1003                     if (event == null) {
1004                         break;
1005                     }
1006                     try {
1007                         event.process();
1008                     } catch (Throwable cause) {
1009                         logger.error("Caught Throwable while processing pending ActiveStreams$Event.", cause);
1010                     }
1011                 }
1012             }
1013         }
1014     }
1015 
1016     /**
1017      * Implementation of {@link PropertyKey} that specifies the index position of the property.
1018      */
1019     final class DefaultPropertyKey implements PropertyKey {
1020         final int index;
1021 
1022         DefaultPropertyKey(int index) {
1023             this.index = index;
1024         }
1025 
1026         DefaultPropertyKey verifyConnection(Http2Connection connection) {
1027             if (connection != DefaultHttp2Connection.this) {
1028                 throw new IllegalArgumentException("Using a key that was not created by this connection");
1029             }
1030             return this;
1031         }
1032     }
1033 
1034     /**
1035      * A registry of all stream property keys known by this connection.
1036      */
1037     private final class PropertyKeyRegistry {
1038         /**
1039          * Initial size of 4 because the default configuration currently has 3 listeners
1040          * (local/remote flow controller and {@link StreamByteDistributor}) and we leave room for 1 extra.
1041          * We could be more aggressive but the ArrayList resize will double the size if we are too small.
1042          */
1043         final List<DefaultPropertyKey> keys = new ArrayList<DefaultPropertyKey>(4);
1044 
1045         /**
1046          * Registers a new property key.
1047          */
1048         DefaultPropertyKey newKey() {
1049             DefaultPropertyKey key = new DefaultPropertyKey(keys.size());
1050             keys.add(key);
1051             return key;
1052         }
1053 
1054         int size() {
1055             return keys.size();
1056         }
1057     }
1058 }