View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty.handler.codec.http2;
17  
18  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
19  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
20  import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
21  import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
22  import static io.netty.handler.codec.http2.Http2CodecUtil.immediateRemovalPolicy;
23  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
24  import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
25  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
26  import static io.netty.handler.codec.http2.Http2Exception.streamError;
27  import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
28  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
29  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
30  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
31  import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
32  import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
33  import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
34  import static io.netty.util.internal.ObjectUtil.checkNotNull;
35  import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
36  import io.netty.util.collection.IntObjectHashMap;
37  import io.netty.util.collection.IntObjectMap;
38  
39  import java.util.ArrayList;
40  import java.util.Collection;
41  import java.util.Collections;
42  import java.util.HashMap;
43  import java.util.HashSet;
44  import java.util.LinkedHashSet;
45  import java.util.List;
46  import java.util.Map;
47  import java.util.Set;
48  
49  /**
50   * Simple implementation of {@link Http2Connection}.
51   */
52  public class DefaultHttp2Connection implements Http2Connection {
53  
54      private final Set<Listener> listeners = new HashSet<Listener>(4);
55      private final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
56      private final ConnectionStream connectionStream = new ConnectionStream();
57      private final Set<Http2Stream> activeStreams = new LinkedHashSet<Http2Stream>();
58      private final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
59      private final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
60      private final Http2StreamRemovalPolicy removalPolicy;
61  
62      /**
63       * Creates a connection with an immediate stream removal policy.
64       *
65       * @param server
66       *            whether or not this end-point is the server-side of the HTTP/2 connection.
67       */
68      public DefaultHttp2Connection(boolean server) {
69          this(server, immediateRemovalPolicy());
70      }
71  
72      /**
73       * Creates a new connection with the given settings.
74       *
75       * @param server
76       *            whether or not this end-point is the server-side of the HTTP/2 connection.
77       * @param removalPolicy
78       *            the policy to be used for removal of closed stream.
79       */
80      public DefaultHttp2Connection(boolean server, Http2StreamRemovalPolicy removalPolicy) {
81  
82          this.removalPolicy = checkNotNull(removalPolicy, "removalPolicy");
83          localEndpoint = new DefaultEndpoint<Http2LocalFlowController>(server);
84          remoteEndpoint = new DefaultEndpoint<Http2RemoteFlowController>(!server);
85  
86          // Tell the removal policy how to remove a stream from this connection.
87          removalPolicy.setAction(new Action() {
88              @Override
89              public void removeStream(Http2Stream stream) {
90                  DefaultHttp2Connection.this.removeStream((DefaultStream) stream);
91              }
92          });
93  
94          // Add the connection stream to the map.
95          streamMap.put(connectionStream.id(), connectionStream);
96      }
97  
98      @Override
99      public void addListener(Listener listener) {
100         listeners.add(listener);
101     }
102 
103     @Override
104     public void removeListener(Listener listener) {
105         listeners.remove(listener);
106     }
107 
108     @Override
109     public boolean isServer() {
110         return localEndpoint.isServer();
111     }
112 
113     @Override
114     public Http2Stream connectionStream() {
115         return connectionStream;
116     }
117 
118     @Override
119     public Http2Stream requireStream(int streamId) throws Http2Exception {
120         Http2Stream stream = stream(streamId);
121         if (stream == null) {
122             throw connectionError(PROTOCOL_ERROR, "Stream does not exist %d", streamId);
123         }
124         return stream;
125     }
126 
127     @Override
128     public Http2Stream stream(int streamId) {
129         return streamMap.get(streamId);
130     }
131 
132     @Override
133     public int numActiveStreams() {
134         return activeStreams.size();
135     }
136 
137     @Override
138     public Set<Http2Stream> activeStreams() {
139         return Collections.unmodifiableSet(activeStreams);
140     }
141 
142     @Override
143     public void deactivate(Http2Stream stream) {
144       deactivateInternal((DefaultStream) stream);
145     }
146 
147     @Override
148     public Endpoint<Http2LocalFlowController> local() {
149         return localEndpoint;
150     }
151 
152     @Override
153     public Endpoint<Http2RemoteFlowController> remote() {
154         return remoteEndpoint;
155     }
156 
157     @Override
158     public boolean isGoAway() {
159         return goAwaySent() || goAwayReceived();
160     }
161 
162     @Override
163     public Http2Stream createLocalStream(int streamId) throws Http2Exception {
164         return local().createStream(streamId);
165     }
166 
167     @Override
168     public Http2Stream createRemoteStream(int streamId) throws Http2Exception {
169         return remote().createStream(streamId);
170     }
171 
172     @Override
173     public boolean goAwayReceived() {
174         return localEndpoint.lastKnownStream >= 0;
175     }
176 
177     @Override
178     public void goAwayReceived(int lastKnownStream) {
179         localEndpoint.lastKnownStream(lastKnownStream);
180     }
181 
182     @Override
183     public boolean goAwaySent() {
184         return remoteEndpoint.lastKnownStream >= 0;
185     }
186 
187     @Override
188     public void goAwaySent(int lastKnownStream) {
189         remoteEndpoint.lastKnownStream(lastKnownStream);
190     }
191 
192     private void removeStream(DefaultStream stream) {
193         // Notify the listeners of the event first.
194         for (Listener listener : listeners) {
195             listener.streamRemoved(stream);
196         }
197 
198         // Remove it from the map and priority tree.
199         streamMap.remove(stream.id());
200         stream.parent().removeChild(stream);
201     }
202 
203     private void activateInternal(DefaultStream stream) {
204         if (activeStreams.add(stream)) {
205             // Update the number of active streams initiated by the endpoint.
206             stream.createdBy().numActiveStreams++;
207 
208             // Notify the listeners.
209             for (Listener listener : listeners) {
210                 listener.streamActive(stream);
211             }
212         }
213     }
214 
215     private void deactivateInternal(DefaultStream stream) {
216         if (activeStreams.remove(stream)) {
217             // Update the number of active streams initiated by the endpoint.
218             stream.createdBy().numActiveStreams--;
219 
220             // Notify the listeners.
221             for (Listener listener : listeners) {
222                 listener.streamInactive(stream);
223             }
224 
225             // Mark this stream for removal.
226             removalPolicy.markForRemoval(stream);
227         }
228     }
229 
230     /**
231      * Simple stream implementation. Streams can be compared to each other by priority.
232      */
233     private class DefaultStream implements Http2Stream {
234         private final int id;
235         private State state = IDLE;
236         private short weight = DEFAULT_PRIORITY_WEIGHT;
237         private DefaultStream parent;
238         private IntObjectMap<DefaultStream> children = newChildMap();
239         private int totalChildWeights;
240         private boolean resetSent;
241         private PropertyMap data;
242 
243         DefaultStream(int id) {
244             this.id = id;
245             data = new LazyPropertyMap(this);
246         }
247 
248         @Override
249         public final int id() {
250             return id;
251         }
252 
253         @Override
254         public final State state() {
255             return state;
256         }
257 
258         @Override
259         public boolean isResetSent() {
260             return resetSent;
261         }
262 
263         @Override
264         public Http2Stream resetSent() {
265             resetSent = true;
266             return this;
267         }
268 
269         @Override
270         public Object setProperty(Object key, Object value) {
271             return data.put(key, value);
272         }
273 
274         @Override
275         public <V> V getProperty(Object key) {
276             return data.get(key);
277         }
278 
279         @Override
280         public <V> V removeProperty(Object key) {
281             return data.remove(key);
282         }
283 
284         @Override
285         public final boolean isRoot() {
286             return parent == null;
287         }
288 
289         @Override
290         public final short weight() {
291             return weight;
292         }
293 
294         @Override
295         public final int totalChildWeights() {
296             return totalChildWeights;
297         }
298 
299         @Override
300         public final DefaultStream parent() {
301             return parent;
302         }
303 
304         @Override
305         public final boolean isDescendantOf(Http2Stream stream) {
306             Http2Stream next = parent();
307             while (next != null) {
308                 if (next == stream) {
309                     return true;
310                 }
311                 next = next.parent();
312             }
313             return false;
314         }
315 
316         @Override
317         public final boolean isLeaf() {
318             return numChildren() == 0;
319         }
320 
321         @Override
322         public final int numChildren() {
323             return children.size();
324         }
325 
326         @Override
327         public final Collection<? extends Http2Stream> children() {
328             return children.values();
329         }
330 
331         @Override
332         public final boolean hasChild(int streamId) {
333             return child(streamId) != null;
334         }
335 
336         @Override
337         public final Http2Stream child(int streamId) {
338             return children.get(streamId);
339         }
340 
341         @Override
342         public Http2Stream setPriority(int parentStreamId, short weight, boolean exclusive) throws Http2Exception {
343             if (weight < MIN_WEIGHT || weight > MAX_WEIGHT) {
344                 throw new IllegalArgumentException(String.format(
345                         "Invalid weight: %d.  Must be between %d and %d (inclusive).", weight, MIN_WEIGHT, MAX_WEIGHT));
346             }
347 
348             DefaultStream newParent = (DefaultStream) stream(parentStreamId);
349             if (newParent == null) {
350                 // Streams can depend on other streams in the IDLE state. We must ensure
351                 // the stream has been "created" in order to use it in the priority tree.
352                 newParent = createdBy().createStream(parentStreamId);
353             } else if (this == newParent) {
354                 throw new IllegalArgumentException("A stream cannot depend on itself");
355             }
356 
357             // Already have a priority. Re-prioritize the stream.
358             weight(weight);
359 
360             if (newParent != parent() || exclusive) {
361                 List<ParentChangedEvent> events;
362                 if (newParent.isDescendantOf(this)) {
363                     events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.numChildren(): 0));
364                     parent.takeChild(newParent, false, events);
365                 } else {
366                     events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.numChildren() : 0));
367                 }
368                 newParent.takeChild(this, exclusive, events);
369                 notifyParentChanged(events);
370             }
371 
372             return this;
373         }
374 
375         @Override
376         public Http2Stream open(boolean halfClosed) throws Http2Exception {
377             switch (state) {
378             case IDLE:
379                 state = halfClosed ? isLocal() ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN;
380                 break;
381             case RESERVED_LOCAL:
382                 state = HALF_CLOSED_REMOTE;
383                 break;
384             case RESERVED_REMOTE:
385                 state = HALF_CLOSED_LOCAL;
386                 break;
387             default:
388                 throw streamError(id, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: " + state);
389             }
390 
391             activateInternal(this);
392             return this;
393         }
394 
395         @Override
396         public Http2Stream close() {
397             if (state == CLOSED) {
398                 return this;
399             }
400 
401             state = CLOSED;
402             deactivateInternal(this);
403             return this;
404         }
405 
406         @Override
407         public Http2Stream closeLocalSide() {
408             switch (state) {
409             case OPEN:
410                 state = HALF_CLOSED_LOCAL;
411                 notifyHalfClosed(this);
412                 break;
413             case HALF_CLOSED_LOCAL:
414                 break;
415             default:
416                 close();
417                 break;
418             }
419             return this;
420         }
421 
422         @Override
423         public Http2Stream closeRemoteSide() {
424             switch (state) {
425             case OPEN:
426                 state = HALF_CLOSED_REMOTE;
427                 notifyHalfClosed(this);
428                 break;
429             case HALF_CLOSED_REMOTE:
430                 break;
431             default:
432                 close();
433                 break;
434             }
435             return this;
436         }
437 
438         private void notifyHalfClosed(Http2Stream stream) {
439             for (Listener listener : listeners) {
440                 listener.streamHalfClosed(stream);
441             }
442         }
443 
444         @Override
445         public final boolean remoteSideOpen() {
446             return state == HALF_CLOSED_LOCAL || state == OPEN || state == RESERVED_REMOTE;
447         }
448 
449         @Override
450         public final boolean localSideOpen() {
451             return state == HALF_CLOSED_REMOTE || state == OPEN || state == RESERVED_LOCAL;
452         }
453 
454         final DefaultEndpoint<? extends Http2FlowController> createdBy() {
455             return localEndpoint.createdStreamId(id) ? localEndpoint : remoteEndpoint;
456         }
457 
458         final boolean isLocal() {
459             return localEndpoint.createdStreamId(id);
460         }
461 
462         final void weight(short weight) {
463             if (weight != this.weight) {
464                 if (parent != null) {
465                     int delta = weight - this.weight;
466                     parent.totalChildWeights += delta;
467                 }
468                 final short oldWeight = this.weight;
469                 this.weight = weight;
470                 for (Listener l : listeners) {
471                     l.onWeightChanged(this, oldWeight);
472                 }
473             }
474         }
475 
476         final IntObjectMap<DefaultStream> removeAllChildren() {
477             totalChildWeights = 0;
478             IntObjectMap<DefaultStream> prevChildren = children;
479             children = newChildMap();
480             return prevChildren;
481         }
482 
483         /**
484          * Adds a child to this priority. If exclusive is set, any children of this node are moved to being dependent on
485          * the child.
486          */
487         final void takeChild(DefaultStream child, boolean exclusive, List<ParentChangedEvent> events) {
488             DefaultStream oldParent = child.parent();
489             events.add(new ParentChangedEvent(child, oldParent));
490             notifyParentChanging(child, this);
491             child.parent = this;
492 
493             if (exclusive && !children.isEmpty()) {
494                 // If it was requested that this child be the exclusive dependency of this node,
495                 // move any previous children to the child node, becoming grand children
496                 // of this node.
497                 for (DefaultStream grandchild : removeAllChildren().values()) {
498                     child.takeChild(grandchild, false, events);
499                 }
500             }
501 
502             if (children.put(child.id(), child) == null) {
503                 totalChildWeights += child.weight();
504             }
505 
506             if (oldParent != null && oldParent.children.remove(child.id()) != null) {
507                 oldParent.totalChildWeights -= child.weight();
508             }
509         }
510 
511         /**
512          * Removes the child priority and moves any of its dependencies to being direct dependencies on this node.
513          */
514         final void removeChild(DefaultStream child) {
515             if (children.remove(child.id()) != null) {
516                 List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1 + child.children.size());
517                 events.add(new ParentChangedEvent(child, child.parent()));
518                 notifyParentChanging(child, null);
519                 child.parent = null;
520                 totalChildWeights -= child.weight();
521 
522                 // Move up any grand children to be directly dependent on this node.
523                 for (DefaultStream grandchild : child.children.values()) {
524                     takeChild(grandchild, false, events);
525                 }
526 
527                 notifyParentChanged(events);
528             }
529         }
530     }
531 
532     /**
533      * Allows the data map to be lazily initialized for {@link DefaultStream}.
534      */
535     private interface PropertyMap {
536         Object put(Object key, Object value);
537 
538         <V> V get(Object key);
539 
540         <V> V remove(Object key);
541     }
542 
543     /**
544      * Provides actual {@link HashMap} functionality for {@link DefaultStream}'s application data.
545      */
546     private static final class DefaultProperyMap implements PropertyMap {
547         private final Map<Object, Object> data;
548 
549         DefaultProperyMap(int initialSize) {
550             data = new HashMap<Object, Object>(initialSize);
551         }
552 
553         @Override
554         public Object put(Object key, Object value) {
555             return data.put(key, value);
556         }
557 
558         @SuppressWarnings("unchecked")
559         @Override
560         public <V> V get(Object key) {
561             return (V) data.get(key);
562         }
563 
564         @SuppressWarnings("unchecked")
565         @Override
566         public <V> V remove(Object key) {
567             return (V) data.remove(key);
568         }
569     }
570 
571     /**
572      * Provides the lazy initialization for the {@link DefaultStream} data map.
573      */
574     private static final class LazyPropertyMap implements PropertyMap {
575         private static final int DEFAULT_INITIAL_SIZE = 4;
576         private final DefaultStream stream;
577 
578         LazyPropertyMap(DefaultStream stream) {
579             this.stream = stream;
580         }
581 
582         @Override
583         public Object put(Object key, Object value) {
584             stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
585             return stream.data.put(key, value);
586         }
587 
588         @Override
589         public <V> V get(Object key) {
590             stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
591             return stream.data.get(key);
592         }
593 
594         @Override
595         public <V> V remove(Object key) {
596             stream.data = new DefaultProperyMap(DEFAULT_INITIAL_SIZE);
597             return stream.data.remove(key);
598         }
599     }
600 
601     private static IntObjectMap<DefaultStream> newChildMap() {
602         return new IntObjectHashMap<DefaultStream>(4);
603     }
604 
605     /**
606      * Allows a correlation to be made between a stream and its old parent before a parent change occurs
607      */
608     private static final class ParentChangedEvent {
609         private final Http2Stream stream;
610         private final Http2Stream oldParent;
611 
612         /**
613          * Create a new instance
614          * @param stream The stream who has had a parent change
615          * @param oldParent The previous parent
616          */
617         ParentChangedEvent(Http2Stream stream, Http2Stream oldParent) {
618             this.stream = stream;
619             this.oldParent = oldParent;
620         }
621 
622         /**
623          * Notify all listeners of the tree change event
624          * @param l The listener to notify
625          */
626         public void notifyListener(Listener l) {
627             l.priorityTreeParentChanged(stream, oldParent);
628         }
629     }
630 
631     /**
632      * Notify all listeners of the priority tree change events (in ascending order)
633      * @param events The events (top down order) which have changed
634      */
635     private void notifyParentChanged(List<ParentChangedEvent> events) {
636         for (int i = 0; i < events.size(); ++i) {
637             ParentChangedEvent event = events.get(i);
638             for (Listener l : listeners) {
639                 event.notifyListener(l);
640             }
641         }
642     }
643 
644     private void notifyParentChanging(Http2Stream stream, Http2Stream newParent) {
645         for (Listener l : listeners) {
646             l.priorityTreeParentChanging(stream, newParent);
647         }
648     }
649 
650     /**
651      * Stream class representing the connection, itself.
652      */
653     private final class ConnectionStream extends DefaultStream {
654         ConnectionStream() {
655             super(CONNECTION_STREAM_ID);
656         }
657 
658         @Override
659         public Http2Stream setPriority(int parentStreamId, short weight, boolean exclusive) {
660             throw new UnsupportedOperationException();
661         }
662 
663         @Override
664         public Http2Stream open(boolean halfClosed) {
665             throw new UnsupportedOperationException();
666         }
667 
668         @Override
669         public Http2Stream close() {
670             throw new UnsupportedOperationException();
671         }
672 
673         @Override
674         public Http2Stream closeLocalSide() {
675             throw new UnsupportedOperationException();
676         }
677 
678         @Override
679         public Http2Stream closeRemoteSide() {
680             throw new UnsupportedOperationException();
681         }
682     }
683 
684     /**
685      * Simple endpoint implementation.
686      */
687     private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
688         private final boolean server;
689         private int nextStreamId;
690         private int lastStreamCreated;
691         private int lastKnownStream = -1;
692         private boolean pushToAllowed = true;
693         private F flowController;
694 
695         /**
696          * The maximum number of active streams allowed to be created by this endpoint.
697          */
698         private int maxStreams;
699 
700         /**
701          * The current number of active streams created by this endpoint.
702          */
703         private int numActiveStreams;
704 
705         DefaultEndpoint(boolean server) {
706             this.server = server;
707 
708             // Determine the starting stream ID for this endpoint. Client-initiated streams
709             // are odd and server-initiated streams are even. Zero is reserved for the
710             // connection. Stream 1 is reserved client-initiated stream for responding to an
711             // upgrade from HTTP 1.1.
712             nextStreamId = server ? 2 : 1;
713 
714             // Push is disallowed by default for servers and allowed for clients.
715             pushToAllowed = !server;
716             maxStreams = Integer.MAX_VALUE;
717         }
718 
719         @Override
720         public int nextStreamId() {
721             // For manually created client-side streams, 1 is reserved for HTTP upgrade, so
722             // start at 3.
723             return nextStreamId > 1 ? nextStreamId : nextStreamId + 2;
724         }
725 
726         @Override
727         public boolean createdStreamId(int streamId) {
728             boolean even = (streamId & 1) == 0;
729             return server == even;
730         }
731 
732         @Override
733         public boolean acceptingNewStreams() {
734             return nextStreamId() > 0 && numActiveStreams + 1 <= maxStreams;
735         }
736 
737         @Override
738         public DefaultStream createStream(int streamId) throws Http2Exception {
739             checkNewStreamAllowed(streamId);
740 
741             // Create and initialize the stream.
742             DefaultStream stream = new DefaultStream(streamId);
743 
744             // Update the next and last stream IDs.
745             nextStreamId = streamId + 2;
746             lastStreamCreated = streamId;
747 
748             addStream(stream);
749             return stream;
750         }
751 
752         @Override
753         public boolean isServer() {
754             return server;
755         }
756 
757         @Override
758         public DefaultStream reservePushStream(int streamId, Http2Stream parent) throws Http2Exception {
759             if (parent == null) {
760                 throw connectionError(PROTOCOL_ERROR, "Parent stream missing");
761             }
762             if (isLocal() ? !parent.localSideOpen() : !parent.remoteSideOpen()) {
763                 throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id());
764             }
765             if (!opposite().allowPushTo()) {
766                 throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint.");
767             }
768             checkNewStreamAllowed(streamId);
769 
770             // Create and initialize the stream.
771             DefaultStream stream = new DefaultStream(streamId);
772             stream.state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE;
773 
774             // Update the next and last stream IDs.
775             nextStreamId = streamId + 2;
776             lastStreamCreated = streamId;
777 
778             // Register the stream.
779             addStream(stream);
780             return stream;
781         }
782 
783         private void addStream(DefaultStream stream) {
784             // Add the stream to the map and priority tree.
785             streamMap.put(stream.id(), stream);
786             List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
787             connectionStream.takeChild(stream, false, events);
788 
789             // Notify the listeners of the event.
790             for (Listener listener : listeners) {
791                 listener.streamAdded(stream);
792             }
793 
794             notifyParentChanged(events);
795         }
796 
797         @Override
798         public void allowPushTo(boolean allow) {
799             if (allow && server) {
800                 throw new IllegalArgumentException("Servers do not allow push");
801             }
802             pushToAllowed = allow;
803         }
804 
805         @Override
806         public boolean allowPushTo() {
807             return pushToAllowed;
808         }
809 
810         @Override
811         public int numActiveStreams() {
812             return numActiveStreams;
813         }
814 
815         @Override
816         public int maxStreams() {
817             return maxStreams;
818         }
819 
820         @Override
821         public void maxStreams(int maxStreams) {
822             this.maxStreams = maxStreams;
823         }
824 
825         @Override
826         public int lastStreamCreated() {
827             return lastStreamCreated;
828         }
829 
830         @Override
831         public int lastKnownStream() {
832             return lastKnownStream >= 0 ? lastKnownStream : lastStreamCreated;
833         }
834 
835         private void lastKnownStream(int lastKnownStream) {
836             boolean alreadyNotified = isGoAway();
837             this.lastKnownStream = lastKnownStream;
838             if (!alreadyNotified) {
839                 notifyGoingAway();
840             }
841         }
842 
843         private void notifyGoingAway() {
844             for (Listener listener : listeners) {
845                 listener.goingAway();
846             }
847         }
848 
849         @Override
850         public F flowController() {
851             return flowController;
852         }
853 
854         @Override
855         public void flowController(F flowController) {
856             this.flowController = checkNotNull(flowController, "flowController");
857         }
858 
859         @Override
860         public Endpoint<? extends Http2FlowController> opposite() {
861             return isLocal() ? remoteEndpoint : localEndpoint;
862         }
863 
864         private void checkNewStreamAllowed(int streamId) throws Http2Exception {
865             if (isGoAway()) {
866                 throw connectionError(PROTOCOL_ERROR, "Cannot create a stream since the connection is going away");
867             }
868             verifyStreamId(streamId);
869             if (!acceptingNewStreams()) {
870                 throw connectionError(REFUSED_STREAM, "Maximum streams exceeded for this endpoint.");
871             }
872         }
873 
874         private void verifyStreamId(int streamId) throws Http2Exception {
875             if (streamId < 0) {
876                 throw new Http2NoMoreStreamIdsException();
877             }
878             if (streamId < nextStreamId) {
879                 throw connectionError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
880                         streamId, nextStreamId);
881             }
882             if (!createdStreamId(streamId)) {
883                 throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection",
884                         streamId, server ? "server" : "client");
885             }
886         }
887 
888         private boolean isLocal() {
889             return this == localEndpoint;
890         }
891     }
892 }