View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.channel.ChannelHandlerContext;
18  import io.netty.util.internal.UnstableApi;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.util.ArrayDeque;
23  import java.util.Deque;
24  
25  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
26  import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
27  import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
28  import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
29  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
30  import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
31  import static io.netty.handler.codec.http2.Http2Exception.streamError;
32  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
33  import static io.netty.util.internal.ObjectUtil.checkNotNull;
34  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
35  import static java.lang.Math.max;
36  import static java.lang.Math.min;
37  
38  /**
39   * Basic implementation of {@link Http2RemoteFlowController}.
40   * <p>
41   * This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
42   * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
43   */
44  @UnstableApi
45  public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
46      private static final InternalLogger logger =
47              InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class);
48      private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
49      private final Http2Connection connection;
50      private final Http2Connection.PropertyKey stateKey;
51      private final StreamByteDistributor streamByteDistributor;
52      private final FlowState connectionState;
53      private int initialWindowSize = DEFAULT_WINDOW_SIZE;
54      private WritabilityMonitor monitor;
55      private ChannelHandlerContext ctx;
56  
57      public DefaultHttp2RemoteFlowController(Http2Connection connection) {
58          this(connection, (Listener) null);
59      }
60  
61      public DefaultHttp2RemoteFlowController(Http2Connection connection,
62                                              StreamByteDistributor streamByteDistributor) {
63          this(connection, streamByteDistributor, null);
64      }
65  
66      public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) {
67          this(connection, new WeightedFairQueueByteDistributor(connection), listener);
68      }
69  
70      public DefaultHttp2RemoteFlowController(Http2Connection connection,
71                                              StreamByteDistributor streamByteDistributor,
72                                              final Listener listener) {
73          this.connection = checkNotNull(connection, "connection");
74          this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor");
75  
76          // Add a flow state for the connection.
77          stateKey = connection.newKey();
78          connectionState = new FlowState(connection.connectionStream());
79          connection.connectionStream().setProperty(stateKey, connectionState);
80  
81          // Monitor may depend upon connectionState, and so initialize after connectionState
82          listener(listener);
83          monitor.windowSize(connectionState, initialWindowSize);
84  
85          // Register for notification of new streams.
86          connection.addListener(new Http2ConnectionAdapter() {
87              @Override
88              public void onStreamAdded(Http2Stream stream) {
89                  // If the stream state is not open then the stream is not yet eligible for flow controlled frames and
90                  // only requires the ReducedFlowState. Otherwise the full amount of memory is required.
91                  stream.setProperty(stateKey, new FlowState(stream));
92              }
93  
94              @Override
95              public void onStreamActive(Http2Stream stream) {
96                  // If the object was previously created, but later activated then we have to ensure the proper
97                  // initialWindowSize is used.
98                  monitor.windowSize(state(stream), initialWindowSize);
99              }
100 
101             @Override
102             public void onStreamClosed(Http2Stream stream) {
103                 // Any pending frames can never be written, cancel and
104                 // write errors for any pending frames.
105                 state(stream).cancel(STREAM_CLOSED, null);
106             }
107 
108             @Override
109             public void onStreamHalfClosed(Http2Stream stream) {
110                 if (HALF_CLOSED_LOCAL == stream.state()) {
111                     /*
112                      * When this method is called there should not be any
113                      * pending frames left if the API is used correctly. However,
114                      * it is possible that a erroneous application can sneak
115                      * in a frame even after having already written a frame with the
116                      * END_STREAM flag set, as the stream state might not transition
117                      * immediately to HALF_CLOSED_LOCAL / CLOSED due to flow control
118                      * delaying the write.
119                      *
120                      * This is to cancel any such illegal writes.
121                      */
122                     state(stream).cancel(STREAM_CLOSED, null);
123                 }
124             }
125         });
126     }
127 
128     /**
129      * {@inheritDoc}
130      * <p>
131      * Any queued {@link FlowControlled} objects will be sent.
132      */
133     @Override
134     public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
135         this.ctx = checkNotNull(ctx, "ctx");
136 
137         // Writing the pending bytes will not check writability change and instead a writability change notification
138         // to be provided by an explicit call.
139         channelWritabilityChanged();
140 
141         // Don't worry about cleaning up queued frames here if ctx is null. It is expected that all streams will be
142         // closed and the queue cleanup will occur when the stream state transitions occur.
143 
144         // If any frames have been queued up, we should send them now that we have a channel context.
145         if (isChannelWritable()) {
146             writePendingBytes();
147         }
148     }
149 
150     @Override
151     public ChannelHandlerContext channelHandlerContext() {
152         return ctx;
153     }
154 
155     @Override
156     public void initialWindowSize(int newWindowSize) throws Http2Exception {
157         assert ctx == null || ctx.executor().inEventLoop();
158         monitor.initialWindowSize(newWindowSize);
159     }
160 
161     @Override
162     public int initialWindowSize() {
163         return initialWindowSize;
164     }
165 
166     @Override
167     public int windowSize(Http2Stream stream) {
168         return state(stream).windowSize();
169     }
170 
171     @Override
172     public boolean isWritable(Http2Stream stream) {
173         return monitor.isWritable(state(stream));
174     }
175 
176     @Override
177     public void channelWritabilityChanged() throws Http2Exception {
178         monitor.channelWritabilityChange();
179     }
180 
181     @Override
182     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
183         // It is assumed there are all validated at a higher level. For example in the Http2FrameReader.
184         assert weight >= MIN_WEIGHT && weight <= MAX_WEIGHT : "Invalid weight";
185         assert childStreamId != parentStreamId : "A stream cannot depend on itself";
186         assert childStreamId > 0 && parentStreamId >= 0 : "childStreamId must be > 0. parentStreamId must be >= 0.";
187 
188         streamByteDistributor.updateDependencyTree(childStreamId, parentStreamId, weight, exclusive);
189     }
190 
191     private boolean isChannelWritable() {
192         return ctx != null && isChannelWritable0();
193     }
194 
195     private boolean isChannelWritable0() {
196         return ctx.channel().isWritable();
197     }
198 
199     @Override
200     public void listener(Listener listener) {
201         monitor = listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener);
202     }
203 
204     @Override
205     public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
206         assert ctx == null || ctx.executor().inEventLoop();
207         monitor.incrementWindowSize(state(stream), delta);
208     }
209 
210     @Override
211     public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
212         // The context can be null assuming the frame will be queued and send later when the context is set.
213         assert ctx == null || ctx.executor().inEventLoop();
214         checkNotNull(frame, "frame");
215         try {
216             monitor.enqueueFrame(state(stream), frame);
217         } catch (Throwable t) {
218             frame.error(ctx, t);
219         }
220     }
221 
222     @Override
223     public boolean hasFlowControlled(Http2Stream stream) {
224         return state(stream).hasFrame();
225     }
226 
227     private FlowState state(Http2Stream stream) {
228         return (FlowState) stream.getProperty(stateKey);
229     }
230 
231     /**
232      * Returns the flow control window for the entire connection.
233      */
234     private int connectionWindowSize() {
235         return connectionState.windowSize();
236     }
237 
238     private int minUsableChannelBytes() {
239         // The current allocation algorithm values "fairness" and doesn't give any consideration to "goodput". It
240         // is possible that 1 byte will be allocated to many streams. In an effort to try to make "goodput"
241         // reasonable with the current allocation algorithm we have this "cheap" check up front to ensure there is
242         // an "adequate" amount of connection window before allocation is attempted. This is not foolproof as if the
243         // number of streams is >= this minimal number then we may still have the issue, but the idea is to narrow the
244         // circumstances in which this can happen without rewriting the allocation algorithm.
245         return max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK);
246     }
247 
248     private int maxUsableChannelBytes() {
249         // If the channel isWritable, allow at least minUsableChannelBytes.
250         int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable());
251         int usableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
252 
253         // Clip the usable bytes by the connection window.
254         return min(connectionState.windowSize(), usableBytes);
255     }
256 
257     /**
258      * The amount of bytes that can be supported by underlying {@link io.netty.channel.Channel} without
259      * queuing "too-much".
260      */
261     private int writableBytes() {
262         return min(connectionWindowSize(), maxUsableChannelBytes());
263     }
264 
265     @Override
266     public void writePendingBytes() throws Http2Exception {
267         monitor.writePendingBytes();
268     }
269 
270     /**
271      * The remote flow control state for a single stream.
272      */
273     private final class FlowState implements StreamByteDistributor.StreamState {
274         private final Http2Stream stream;
275         private final Deque<FlowControlled> pendingWriteQueue;
276         private int window;
277         private long pendingBytes;
278         private boolean markedWritable;
279 
280         /**
281          * Set to true while a frame is being written, false otherwise.
282          */
283         private boolean writing;
284         /**
285          * Set to true if cancel() was called.
286          */
287         private boolean cancelled;
288 
289         FlowState(Http2Stream stream) {
290             this.stream = stream;
291             pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
292         }
293 
294         /**
295          * Determine if the stream associated with this object is writable.
296          * @return {@code true} if the stream associated with this object is writable.
297          */
298         boolean isWritable() {
299             return windowSize() > pendingBytes() && !cancelled;
300         }
301 
302         /**
303          * The stream this state is associated with.
304          */
305         @Override
306         public Http2Stream stream() {
307             return stream;
308         }
309 
310         /**
311          * Returns the parameter from the last call to {@link #markedWritability(boolean)}.
312          */
313         boolean markedWritability() {
314             return markedWritable;
315         }
316 
317         /**
318          * Save the state of writability.
319          */
320         void markedWritability(boolean isWritable) {
321             this.markedWritable = isWritable;
322         }
323 
324         @Override
325         public int windowSize() {
326             return window;
327         }
328 
329         /**
330          * Reset the window size for this stream.
331          */
332         void windowSize(int initialWindowSize) {
333             window = initialWindowSize;
334         }
335 
336         /**
337          * Write the allocated bytes for this stream.
338          * @return the number of bytes written for a stream or {@code -1} if no write occurred.
339          */
340         int writeAllocatedBytes(int allocated) {
341             final int initialAllocated = allocated;
342             int writtenBytes;
343             // In case an exception is thrown we want to remember it and pass it to cancel(Throwable).
344             Throwable cause = null;
345             FlowControlled frame;
346             try {
347                 assert !writing;
348                 writing = true;
349 
350                 // Write the remainder of frames that we are allowed to
351                 boolean writeOccurred = false;
352                 while (!cancelled && (frame = peek()) != null) {
353                     int maxBytes = min(allocated, writableWindow());
354                     if (maxBytes <= 0 && frame.size() > 0) {
355                         // The frame still has data, but the amount of allocated bytes has been exhausted.
356                         // Don't write needless empty frames.
357                         break;
358                     }
359                     writeOccurred = true;
360                     int initialFrameSize = frame.size();
361                     try {
362                         frame.write(ctx, max(0, maxBytes));
363                         if (frame.size() == 0) {
364                             // This frame has been fully written, remove this frame and notify it.
365                             // Since we remove this frame first, we're guaranteed that its error
366                             // method will not be called when we call cancel.
367                             pendingWriteQueue.remove();
368                             frame.writeComplete();
369                         }
370                     } finally {
371                         // Decrement allocated by how much was actually written.
372                         allocated -= initialFrameSize - frame.size();
373                     }
374                 }
375 
376                 if (!writeOccurred) {
377                     // Either there was no frame, or the amount of allocated bytes has been exhausted.
378                     return -1;
379                 }
380 
381             } catch (Throwable t) {
382                 // Mark the state as cancelled, we'll clear the pending queue via cancel() below.
383                 cancelled = true;
384                 cause = t;
385             } finally {
386                 writing = false;
387                 // Make sure we always decrement the flow control windows
388                 // by the bytes written.
389                 writtenBytes = initialAllocated - allocated;
390 
391                 decrementPendingBytes(writtenBytes, false);
392                 decrementFlowControlWindow(writtenBytes);
393 
394                 // If a cancellation occurred while writing, call cancel again to
395                 // clear and error all of the pending writes.
396                 if (cancelled) {
397                     cancel(INTERNAL_ERROR, cause);
398                 }
399             }
400             return writtenBytes;
401         }
402 
403         /**
404          * Increments the flow control window for this stream by the given delta and returns the new value.
405          */
406         int incrementStreamWindow(int delta) throws Http2Exception {
407             if (delta > 0 && Integer.MAX_VALUE - delta < window) {
408                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
409                         "Window size overflow for stream: %d", stream.id());
410             }
411             window += delta;
412 
413             streamByteDistributor.updateStreamableBytes(this);
414             return window;
415         }
416 
417         /**
418          * Returns the maximum writable window (minimum of the stream and connection windows).
419          */
420         private int writableWindow() {
421             return min(window, connectionWindowSize());
422         }
423 
424         @Override
425         public long pendingBytes() {
426             return pendingBytes;
427         }
428 
429         /**
430          * Adds the {@code frame} to the pending queue and increments the pending byte count.
431          */
432         void enqueueFrame(FlowControlled frame) {
433             FlowControlled last = pendingWriteQueue.peekLast();
434             if (last == null) {
435                 enqueueFrameWithoutMerge(frame);
436                 return;
437             }
438 
439             int lastSize = last.size();
440             if (last.merge(ctx, frame)) {
441                 incrementPendingBytes(last.size() - lastSize, true);
442                 return;
443             }
444             enqueueFrameWithoutMerge(frame);
445         }
446 
447         private void enqueueFrameWithoutMerge(FlowControlled frame) {
448             pendingWriteQueue.offer(frame);
449             // This must be called after adding to the queue in order so that hasFrame() is
450             // updated before updating the stream state.
451             incrementPendingBytes(frame.size(), true);
452         }
453 
454         @Override
455         public boolean hasFrame() {
456             return !pendingWriteQueue.isEmpty();
457         }
458 
459         /**
460          * Returns the head of the pending queue, or {@code null} if empty.
461          */
462         private FlowControlled peek() {
463             return pendingWriteQueue.peek();
464         }
465 
466         /**
467          * Clears the pending queue and writes errors for each remaining frame.
468          * @param error the {@link Http2Error} to use.
469          * @param cause the {@link Throwable} that caused this method to be invoked.
470          */
471         void cancel(Http2Error error, Throwable cause) {
472             cancelled = true;
473             // Ensure that the queue can't be modified while we are writing.
474             if (writing) {
475                 return;
476             }
477 
478             FlowControlled frame = pendingWriteQueue.poll();
479             if (frame != null) {
480                 // Only create exception once and reuse to reduce overhead of filling in the stacktrace.
481                 final Http2Exception exception = streamError(stream.id(), error, cause,
482                         "Stream closed before write could take place");
483                 do {
484                     writeError(frame, exception);
485                     frame = pendingWriteQueue.poll();
486                 } while (frame != null);
487             }
488 
489             streamByteDistributor.updateStreamableBytes(this);
490 
491             monitor.stateCancelled(this);
492         }
493 
494         /**
495          * Increments the number of pending bytes for this node and optionally updates the
496          * {@link StreamByteDistributor}.
497          */
498         private void incrementPendingBytes(int numBytes, boolean updateStreamableBytes) {
499             pendingBytes += numBytes;
500             monitor.incrementPendingBytes(numBytes);
501             if (updateStreamableBytes) {
502                 streamByteDistributor.updateStreamableBytes(this);
503             }
504         }
505 
506         /**
507          * If this frame is in the pending queue, decrements the number of pending bytes for the stream.
508          */
509         private void decrementPendingBytes(int bytes, boolean updateStreamableBytes) {
510             incrementPendingBytes(-bytes, updateStreamableBytes);
511         }
512 
513         /**
514          * Decrement the per stream and connection flow control window by {@code bytes}.
515          */
516         private void decrementFlowControlWindow(int bytes) {
517             try {
518                 int negativeBytes = -bytes;
519                 connectionState.incrementStreamWindow(negativeBytes);
520                 incrementStreamWindow(negativeBytes);
521             } catch (Http2Exception e) {
522                 // Should never get here since we're decrementing.
523                 throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e);
524             }
525         }
526 
527         /**
528          * Discards this {@link FlowControlled}, writing an error. If this frame is in the pending queue,
529          * the unwritten bytes are removed from this branch of the priority tree.
530          */
531         private void writeError(FlowControlled frame, Http2Exception cause) {
532             assert ctx != null;
533             decrementPendingBytes(frame.size(), true);
534             frame.error(ctx, cause);
535         }
536     }
537 
538     /**
539      * Abstract class which provides common functionality for writability monitor implementations.
540      */
541     private class WritabilityMonitor implements StreamByteDistributor.Writer {
542         private boolean inWritePendingBytes;
543         private long totalPendingBytes;
544 
545         @Override
546         public final void write(Http2Stream stream, int numBytes) {
547             state(stream).writeAllocatedBytes(numBytes);
548         }
549 
550         /**
551          * Called when the writability of the underlying channel changes.
552          * @throws Http2Exception If a write occurs and an exception happens in the write operation.
553          */
554         void channelWritabilityChange() throws Http2Exception { }
555 
556         /**
557          * Called when the state is cancelled.
558          * @param state the state that was cancelled.
559          */
560         void stateCancelled(FlowState state) { }
561 
562         /**
563          * Set the initial window size for {@code state}.
564          * @param state the state to change the initial window size for.
565          * @param initialWindowSize the size of the window in bytes.
566          */
567         void windowSize(FlowState state, int initialWindowSize) {
568             state.windowSize(initialWindowSize);
569         }
570 
571         /**
572          * Increment the window size for a particular stream.
573          * @param state the state associated with the stream whose window is being incremented.
574          * @param delta The amount to increment by.
575          * @throws Http2Exception If this operation overflows the window for {@code state}.
576          */
577         void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
578             state.incrementStreamWindow(delta);
579         }
580 
581         /**
582          * Add a frame to be sent via flow control.
583          * @param state The state associated with the stream which the {@code frame} is associated with.
584          * @param frame the frame to enqueue.
585          * @throws Http2Exception If a writability error occurs.
586          */
587         void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
588             state.enqueueFrame(frame);
589         }
590 
591         /**
592          * Increment the total amount of pending bytes for all streams. When any stream's pending bytes changes
593          * method should be called.
594          * @param delta The amount to increment by.
595          */
596         final void incrementPendingBytes(int delta) {
597             totalPendingBytes += delta;
598 
599             // Notification of writibilty change should be delayed until the end of the top level event.
600             // This is to ensure the flow controller is more consistent state before calling external listener methods.
601         }
602 
603         /**
604          * Determine if the stream associated with {@code state} is writable.
605          * @param state The state which is associated with the stream to test writability for.
606          * @return {@code true} if {@link FlowState#stream()} is writable. {@code false} otherwise.
607          */
608         final boolean isWritable(FlowState state) {
609             return isWritableConnection() && state.isWritable();
610         }
611 
612         final void writePendingBytes() throws Http2Exception {
613             // Reentry is not permitted during the byte distribution process. It may lead to undesirable distribution of
614             // bytes and even infinite loops. We protect against reentry and make sure each call has an opportunity to
615             // cause a distribution to occur. This may be useful for example if the channel's writability changes from
616             // Writable -> Not Writable (because we are writing) -> Writable (because the user flushed to make more room
617             // in the channel outbound buffer).
618             if (inWritePendingBytes) {
619                 return;
620             }
621             inWritePendingBytes = true;
622             try {
623                 int bytesToWrite = writableBytes();
624                 // Make sure we always write at least once, regardless if we have bytesToWrite or not.
625                 // This ensures that zero-length frames will always be written.
626                 for (;;) {
627                     if (!streamByteDistributor.distribute(bytesToWrite, this) ||
628                         (bytesToWrite = writableBytes()) <= 0 ||
629                         !isChannelWritable0()) {
630                         break;
631                     }
632                 }
633             } finally {
634                 inWritePendingBytes = false;
635             }
636         }
637 
638         void initialWindowSize(int newWindowSize) throws Http2Exception {
639             checkPositiveOrZero(newWindowSize, "newWindowSize");
640 
641             final int delta = newWindowSize - initialWindowSize;
642             initialWindowSize = newWindowSize;
643             connection.forEachActiveStream(new Http2StreamVisitor() {
644                 @Override
645                 public boolean visit(Http2Stream stream) throws Http2Exception {
646                     state(stream).incrementStreamWindow(delta);
647                     return true;
648                 }
649             });
650 
651             if (delta > 0 && isChannelWritable()) {
652                 // The window size increased, send any pending frames for all streams.
653                 writePendingBytes();
654             }
655         }
656 
657         final boolean isWritableConnection() {
658             return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
659         }
660     }
661 
662     /**
663      * Writability of a {@code stream} is calculated using the following:
664      * <pre>
665      * Connection Window - Total Queued Bytes > 0 &&
666      * Stream Window - Bytes Queued for Stream > 0 &&
667      * isChannelWritable()
668      * </pre>
669      */
670     private final class ListenerWritabilityMonitor extends WritabilityMonitor implements Http2StreamVisitor {
671         private final Listener listener;
672 
673         ListenerWritabilityMonitor(Listener listener) {
674             this.listener = listener;
675         }
676 
677         @Override
678         public boolean visit(Http2Stream stream) throws Http2Exception {
679             FlowState state = state(stream);
680             if (isWritable(state) != state.markedWritability()) {
681                 notifyWritabilityChanged(state);
682             }
683             return true;
684         }
685 
686         @Override
687         void windowSize(FlowState state, int initialWindowSize) {
688             super.windowSize(state, initialWindowSize);
689             try {
690                 checkStateWritability(state);
691             } catch (Http2Exception e) {
692                 throw new RuntimeException("Caught unexpected exception from window", e);
693             }
694         }
695 
696         @Override
697         void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
698             super.incrementWindowSize(state, delta);
699             checkStateWritability(state);
700         }
701 
702         @Override
703         void initialWindowSize(int newWindowSize) throws Http2Exception {
704             super.initialWindowSize(newWindowSize);
705             if (isWritableConnection()) {
706                 // If the write operation does not occur we still need to check all streams because they
707                 // may have transitioned from writable to not writable.
708                 checkAllWritabilityChanged();
709             }
710         }
711 
712         @Override
713         void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
714             super.enqueueFrame(state, frame);
715             checkConnectionThenStreamWritabilityChanged(state);
716         }
717 
718         @Override
719         void stateCancelled(FlowState state) {
720             try {
721                 checkConnectionThenStreamWritabilityChanged(state);
722             } catch (Http2Exception e) {
723                 throw new RuntimeException("Caught unexpected exception from checkAllWritabilityChanged", e);
724             }
725         }
726 
727         @Override
728         void channelWritabilityChange() throws Http2Exception {
729             if (connectionState.markedWritability() != isChannelWritable()) {
730                 checkAllWritabilityChanged();
731             }
732         }
733 
734         private void checkStateWritability(FlowState state) throws Http2Exception {
735             if (isWritable(state) != state.markedWritability()) {
736                 if (state == connectionState) {
737                     checkAllWritabilityChanged();
738                 } else {
739                     notifyWritabilityChanged(state);
740                 }
741             }
742         }
743 
744         private void notifyWritabilityChanged(FlowState state) {
745             state.markedWritability(!state.markedWritability());
746             try {
747                 listener.writabilityChanged(state.stream);
748             } catch (Throwable cause) {
749                 logger.error("Caught Throwable from listener.writabilityChanged", cause);
750             }
751         }
752 
753         private void checkConnectionThenStreamWritabilityChanged(FlowState state) throws Http2Exception {
754             // It is possible that the connection window and/or the individual stream writability could change.
755             if (isWritableConnection() != connectionState.markedWritability()) {
756                 checkAllWritabilityChanged();
757             } else if (isWritable(state) != state.markedWritability()) {
758                 notifyWritabilityChanged(state);
759             }
760         }
761 
762         private void checkAllWritabilityChanged() throws Http2Exception {
763             // Make sure we mark that we have notified as a result of this change.
764             connectionState.markedWritability(isWritableConnection());
765             connection.forEachActiveStream(this);
766         }
767     }
768 }