View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.handler.codec.http2;
16  
17  import io.netty5.channel.ChannelHandlerContext;
18  import io.netty5.channel.ChannelOption;
19  import io.netty5.util.internal.UnstableApi;
20  import io.netty5.util.internal.logging.InternalLogger;
21  import io.netty5.util.internal.logging.InternalLoggerFactory;
22  
23  import java.util.ArrayDeque;
24  import java.util.Deque;
25  
26  import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
27  import static io.netty5.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
28  import static io.netty5.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
29  import static io.netty5.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
30  import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
31  import static io.netty5.handler.codec.http2.Http2Error.STREAM_CLOSED;
32  import static io.netty5.handler.codec.http2.Http2Exception.streamError;
33  import static io.netty5.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
34  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
35  import static java.lang.Math.max;
36  import static java.lang.Math.min;
37  import static java.util.Objects.requireNonNull;
38  
39  /**
40   * Basic implementation of {@link Http2RemoteFlowController}.
41   * <p>
42   * This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
43   * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
44   */
45  @UnstableApi
46  public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
47      private static final InternalLogger logger =
48              InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class);
49      private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
50      private final Http2Connection connection;
51      private final Http2Connection.PropertyKey stateKey;
52      private final StreamByteDistributor streamByteDistributor;
53      private final FlowState connectionState;
54      private int initialWindowSize = DEFAULT_WINDOW_SIZE;
55      private WritabilityMonitor monitor;
56      private ChannelHandlerContext ctx;
57  
58      public DefaultHttp2RemoteFlowController(Http2Connection connection) {
59          this(connection, (Listener) null);
60      }
61  
62      public DefaultHttp2RemoteFlowController(Http2Connection connection,
63                                              StreamByteDistributor streamByteDistributor) {
64          this(connection, streamByteDistributor, null);
65      }
66  
67      public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) {
68          this(connection, new WeightedFairQueueByteDistributor(connection), listener);
69      }
70  
71      public DefaultHttp2RemoteFlowController(Http2Connection connection,
72                                              StreamByteDistributor streamByteDistributor,
73                                              final Listener listener) {
74          this.connection = requireNonNull(connection, "connection");
75          this.streamByteDistributor = requireNonNull(streamByteDistributor, "streamWriteDistributor");
76  
77          // Add a flow state for the connection.
78          stateKey = connection.newKey();
79          connectionState = new FlowState(connection.connectionStream());
80          connection.connectionStream().setProperty(stateKey, connectionState);
81  
82          // Monitor may depend upon connectionState, and so initialize after connectionState
83          listener(listener);
84          monitor.windowSize(connectionState, initialWindowSize);
85  
86          // Register for notification of new streams.
87          connection.addListener(new Http2ConnectionAdapter() {
88              @Override
89              public void onStreamAdded(Http2Stream stream) {
90                  // If the stream state is not open then the stream is not yet eligible for flow controlled frames and
91                  // only requires the ReducedFlowState. Otherwise the full amount of memory is required.
92                  stream.setProperty(stateKey, new FlowState(stream));
93              }
94  
95              @Override
96              public void onStreamActive(Http2Stream stream) {
97                  // If the object was previously created, but later activated then we have to ensure the proper
98                  // initialWindowSize is used.
99                  monitor.windowSize(state(stream), initialWindowSize);
100             }
101 
102             @Override
103             public void onStreamClosed(Http2Stream stream) {
104                 // Any pending frames can never be written, cancel and
105                 // write errors for any pending frames.
106                 state(stream).cancel(STREAM_CLOSED, null);
107             }
108 
109             @Override
110             public void onStreamHalfClosed(Http2Stream stream) {
111                 if (HALF_CLOSED_LOCAL == stream.state()) {
112                     /*
113                      * When this method is called there should not be any
114                      * pending frames left if the API is used correctly. However,
115                      * it is possible that a erroneous application can sneak
116                      * in a frame even after having already written a frame with the
117                      * END_STREAM flag set, as the stream state might not transition
118                      * immediately to HALF_CLOSED_LOCAL / CLOSED due to flow control
119                      * delaying the write.
120                      *
121                      * This is to cancel any such illegal writes.
122                      */
123                     state(stream).cancel(STREAM_CLOSED, null);
124                 }
125             }
126         });
127     }
128 
129     /**
130      * {@inheritDoc}
131      * <p>
132      * Any queued {@link FlowControlled} objects will be sent.
133      */
134     @Override
135     public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
136         this.ctx = requireNonNull(ctx, "ctx");
137 
138         // Writing the pending bytes will not check writability change and instead a writability change notification
139         // to be provided by an explicit call.
140         channelWritabilityChanged();
141 
142         // Don't worry about cleaning up queued frames here if ctx is null. It is expected that all streams will be
143         // closed and the queue cleanup will occur when the stream state transitions occur.
144 
145         // If any frames have been queued up, we should send them now that we have a channel context.
146         if (isChannelWritable()) {
147             writePendingBytes();
148         }
149     }
150 
151     @Override
152     public ChannelHandlerContext channelHandlerContext() {
153         return ctx;
154     }
155 
156     @Override
157     public void initialWindowSize(int newWindowSize) throws Http2Exception {
158         assert ctx == null || ctx.executor().inEventLoop();
159         monitor.initialWindowSize(newWindowSize);
160     }
161 
162     @Override
163     public int initialWindowSize() {
164         return initialWindowSize;
165     }
166 
167     @Override
168     public int windowSize(Http2Stream stream) {
169         return state(stream).windowSize();
170     }
171 
172     @Override
173     public boolean isWritable(Http2Stream stream) {
174         return monitor.isWritable(state(stream));
175     }
176 
177     @Override
178     public void channelWritabilityChanged() throws Http2Exception {
179         monitor.channelWritabilityChange();
180     }
181 
182     @Override
183     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
184         // It is assumed there are all validated at a higher level. For example in the Http2FrameReader.
185         assert weight >= MIN_WEIGHT && weight <= MAX_WEIGHT : "Invalid weight";
186         assert childStreamId != parentStreamId : "A stream cannot depend on itself";
187         assert childStreamId > 0 && parentStreamId >= 0 : "childStreamId must be > 0. parentStreamId must be >= 0.";
188 
189         streamByteDistributor.updateDependencyTree(childStreamId, parentStreamId, weight, exclusive);
190     }
191 
192     private boolean isChannelWritable() {
193         return ctx != null && isChannelWritable0();
194     }
195 
196     private boolean isChannelWritable0() {
197         return ctx.channel().isWritable();
198     }
199 
200     @Override
201     public void listener(Listener listener) {
202         monitor = listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener);
203     }
204 
205     @Override
206     public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
207         assert ctx == null || ctx.executor().inEventLoop();
208         monitor.incrementWindowSize(state(stream), delta);
209     }
210 
211     @Override
212     public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
213         // The context can be null assuming the frame will be queued and send later when the context is set.
214         assert ctx == null || ctx.executor().inEventLoop();
215         requireNonNull(frame, "frame");
216         try {
217             monitor.enqueueFrame(state(stream), frame);
218         } catch (Throwable t) {
219             frame.error(ctx, t);
220         }
221     }
222 
223     @Override
224     public boolean hasFlowControlled(Http2Stream stream) {
225         return state(stream).hasFrame();
226     }
227 
228     private FlowState state(Http2Stream stream) {
229         return stream.getProperty(stateKey);
230     }
231 
232     /**
233      * Returns the flow control window for the entire connection.
234      */
235     private int connectionWindowSize() {
236         return connectionState.windowSize();
237     }
238 
239     private int minUsableChannelBytes() {
240         // The current allocation algorithm values "fairness" and doesn't give any consideration to "goodput". It
241         // is possible that 1 byte will be allocated to many streams. In an effort to try to make "goodput"
242         // reasonable with the current allocation algorithm we have this "cheap" check up front to ensure there is
243         // an "adequate" amount of connection window before allocation is attempted. This is not foolproof as if the
244         // number of streams is >= this minimal number then we may still have the issue, but the idea is to narrow the
245         // circumstances in which this can happen without rewriting the allocation algorithm.
246         return max(ctx.channel().getOption(ChannelOption.WRITE_BUFFER_WATER_MARK).low(), MIN_WRITABLE_CHUNK);
247     }
248 
249     private int maxUsableChannelBytes() {
250         // If the channel isWritable, allow at least minUsableChannelBytes.
251         int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().writableBytes());
252         int usableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
253 
254         // Clip the usable bytes by the connection window.
255         return min(connectionState.windowSize(), usableBytes);
256     }
257 
258     /**
259      * The amount of bytes that can be supported by underlying {@link io.netty5.channel.Channel} without
260      * queuing "too-much".
261      */
262     private int writableBytes() {
263         return min(connectionWindowSize(), maxUsableChannelBytes());
264     }
265 
266     @Override
267     public void writePendingBytes() throws Http2Exception {
268         monitor.writePendingBytes();
269     }
270 
271     /**
272      * The remote flow control state for a single stream.
273      */
274     private final class FlowState implements StreamByteDistributor.StreamState {
275         private final Http2Stream stream;
276         private final Deque<FlowControlled> pendingWriteQueue;
277         private int window;
278         private long pendingBytes;
279         private boolean markedWritable;
280 
281         /**
282          * Set to true while a frame is being written, false otherwise.
283          */
284         private boolean writing;
285         /**
286          * Set to true if cancel() was called.
287          */
288         private boolean cancelled;
289 
290         FlowState(Http2Stream stream) {
291             this.stream = stream;
292             pendingWriteQueue = new ArrayDeque<>(2);
293         }
294 
295         /**
296          * Determine if the stream associated with this object is writable.
297          * @return {@code true} if the stream associated with this object is writable.
298          */
299         boolean isWritable() {
300             return windowSize() > pendingBytes() && !cancelled;
301         }
302 
303         /**
304          * The stream this state is associated with.
305          */
306         @Override
307         public Http2Stream stream() {
308             return stream;
309         }
310 
311         /**
312          * Returns the parameter from the last call to {@link #markedWritability(boolean)}.
313          */
314         boolean markedWritability() {
315             return markedWritable;
316         }
317 
318         /**
319          * Save the state of writability.
320          */
321         void markedWritability(boolean isWritable) {
322             markedWritable = isWritable;
323         }
324 
325         @Override
326         public int windowSize() {
327             return window;
328         }
329 
330         /**
331          * Reset the window size for this stream.
332          */
333         void windowSize(int initialWindowSize) {
334             window = initialWindowSize;
335         }
336 
337         /**
338          * Write the allocated bytes for this stream.
339          * @return the number of bytes written for a stream or {@code -1} if no write occurred.
340          */
341         int writeAllocatedBytes(int allocated) {
342             final int initialAllocated = allocated;
343             int writtenBytes;
344             // In case an exception is thrown we want to remember it and pass it to cancel(Throwable).
345             Throwable cause = null;
346             FlowControlled frame;
347             try {
348                 assert !writing;
349                 writing = true;
350 
351                 // Write the remainder of frames that we are allowed to
352                 boolean writeOccurred = false;
353                 while (!cancelled && (frame = peek()) != null) {
354                     int maxBytes = min(allocated, writableWindow());
355                     if (maxBytes <= 0 && frame.size() > 0) {
356                         // The frame still has data, but the amount of allocated bytes has been exhausted.
357                         // Don't write needless empty frames.
358                         break;
359                     }
360                     writeOccurred = true;
361                     int initialFrameSize = frame.size();
362                     try {
363                         frame.write(ctx, max(0, maxBytes));
364                         if (frame.size() == 0) {
365                             // This frame has been fully written, remove this frame and notify it.
366                             // Since we remove this frame first, we're guaranteed that its error
367                             // method will not be called when we call cancel.
368                             pendingWriteQueue.remove();
369                             frame.writeComplete();
370                         }
371                     } finally {
372                         // Decrement allocated by how much was actually written.
373                         allocated -= initialFrameSize - frame.size();
374                     }
375                 }
376 
377                 if (!writeOccurred) {
378                     // Either there was no frame, or the amount of allocated bytes has been exhausted.
379                     return -1;
380                 }
381 
382             } catch (Throwable t) {
383                 // Mark the state as cancelled, we'll clear the pending queue via cancel() below.
384                 cancelled = true;
385                 cause = t;
386             } finally {
387                 writing = false;
388                 // Make sure we always decrement the flow control windows
389                 // by the bytes written.
390                 writtenBytes = initialAllocated - allocated;
391 
392                 decrementPendingBytes(writtenBytes, false);
393                 decrementFlowControlWindow(writtenBytes);
394 
395                 // If a cancellation occurred while writing, call cancel again to
396                 // clear and error all of the pending writes.
397                 if (cancelled) {
398                     cancel(INTERNAL_ERROR, cause);
399                 }
400             }
401             return writtenBytes;
402         }
403 
404         /**
405          * Increments the flow control window for this stream by the given delta and returns the new value.
406          */
407         int incrementStreamWindow(int delta) throws Http2Exception {
408             if (delta > 0 && Integer.MAX_VALUE - delta < window) {
409                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
410                         "Window size overflow for stream: %d", stream.id());
411             }
412             window += delta;
413 
414             streamByteDistributor.updateStreamableBytes(this);
415             return window;
416         }
417 
418         /**
419          * Returns the maximum writable window (minimum of the stream and connection windows).
420          */
421         private int writableWindow() {
422             return min(window, connectionWindowSize());
423         }
424 
425         @Override
426         public long pendingBytes() {
427             return pendingBytes;
428         }
429 
430         /**
431          * Adds the {@code frame} to the pending queue and increments the pending byte count.
432          */
433         void enqueueFrame(FlowControlled frame) {
434             FlowControlled last = pendingWriteQueue.peekLast();
435             if (last == null) {
436                 enqueueFrameWithoutMerge(frame);
437                 return;
438             }
439 
440             int lastSize = last.size();
441             if (last.merge(ctx, frame)) {
442                 incrementPendingBytes(last.size() - lastSize, true);
443                 return;
444             }
445             enqueueFrameWithoutMerge(frame);
446         }
447 
448         private void enqueueFrameWithoutMerge(FlowControlled frame) {
449             pendingWriteQueue.offer(frame);
450             // This must be called after adding to the queue in order so that hasFrame() is
451             // updated before updating the stream state.
452             incrementPendingBytes(frame.size(), true);
453         }
454 
455         @Override
456         public boolean hasFrame() {
457             return !pendingWriteQueue.isEmpty();
458         }
459 
460         /**
461          * Returns the head of the pending queue, or {@code null} if empty.
462          */
463         private FlowControlled peek() {
464             return pendingWriteQueue.peek();
465         }
466 
467         /**
468          * Clears the pending queue and writes errors for each remaining frame.
469          * @param error the {@link Http2Error} to use.
470          * @param cause the {@link Throwable} that caused this method to be invoked.
471          */
472         void cancel(Http2Error error, Throwable cause) {
473             cancelled = true;
474             // Ensure that the queue can't be modified while we are writing.
475             if (writing) {
476                 return;
477             }
478 
479             FlowControlled frame = pendingWriteQueue.poll();
480             if (frame != null) {
481                 // Only create exception once and reuse to reduce overhead of filling in the stacktrace.
482                 final Http2Exception exception = streamError(stream.id(), error, cause,
483                         "Stream closed before write could take place");
484                 do {
485                     writeError(frame, exception);
486                     frame = pendingWriteQueue.poll();
487                 } while (frame != null);
488             }
489 
490             streamByteDistributor.updateStreamableBytes(this);
491 
492             monitor.stateCancelled(this);
493         }
494 
495         /**
496          * Increments the number of pending bytes for this node and optionally updates the
497          * {@link StreamByteDistributor}.
498          */
499         private void incrementPendingBytes(int numBytes, boolean updateStreamableBytes) {
500             pendingBytes += numBytes;
501             monitor.incrementPendingBytes(numBytes);
502             if (updateStreamableBytes) {
503                 streamByteDistributor.updateStreamableBytes(this);
504             }
505         }
506 
507         /**
508          * If this frame is in the pending queue, decrements the number of pending bytes for the stream.
509          */
510         private void decrementPendingBytes(int bytes, boolean updateStreamableBytes) {
511             incrementPendingBytes(-bytes, updateStreamableBytes);
512         }
513 
514         /**
515          * Decrement the per stream and connection flow control window by {@code bytes}.
516          */
517         private void decrementFlowControlWindow(int bytes) {
518             try {
519                 int negativeBytes = -bytes;
520                 connectionState.incrementStreamWindow(negativeBytes);
521                 incrementStreamWindow(negativeBytes);
522             } catch (Http2Exception e) {
523                 // Should never get here since we're decrementing.
524                 throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e);
525             }
526         }
527 
528         /**
529          * Discards this {@link FlowControlled}, writing an error. If this frame is in the pending queue,
530          * the unwritten bytes are removed from this branch of the priority tree.
531          */
532         private void writeError(FlowControlled frame, Http2Exception cause) {
533             assert ctx != null;
534             decrementPendingBytes(frame.size(), true);
535             frame.error(ctx, cause);
536         }
537     }
538 
539     /**
540      * Abstract class which provides common functionality for writability monitor implementations.
541      */
542     private class WritabilityMonitor implements StreamByteDistributor.Writer {
543         private boolean inWritePendingBytes;
544         private long totalPendingBytes;
545 
546         @Override
547         public final void write(Http2Stream stream, int numBytes) {
548             state(stream).writeAllocatedBytes(numBytes);
549         }
550 
551         /**
552          * Called when the writability of the underlying channel changes.
553          * @throws Http2Exception If a write occurs and an exception happens in the write operation.
554          */
555         void channelWritabilityChange() throws Http2Exception { }
556 
557         /**
558          * Called when the state is cancelled.
559          * @param state the state that was cancelled.
560          */
561         void stateCancelled(FlowState state) { }
562 
563         /**
564          * Set the initial window size for {@code state}.
565          * @param state the state to change the initial window size for.
566          * @param initialWindowSize the size of the window in bytes.
567          */
568         void windowSize(FlowState state, int initialWindowSize) {
569             state.windowSize(initialWindowSize);
570         }
571 
572         /**
573          * Increment the window size for a particular stream.
574          * @param state the state associated with the stream whose window is being incremented.
575          * @param delta The amount to increment by.
576          * @throws Http2Exception If this operation overflows the window for {@code state}.
577          */
578         void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
579             state.incrementStreamWindow(delta);
580         }
581 
582         /**
583          * Add a frame to be sent via flow control.
584          * @param state The state associated with the stream which the {@code frame} is associated with.
585          * @param frame the frame to enqueue.
586          * @throws Http2Exception If a writability error occurs.
587          */
588         void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
589             state.enqueueFrame(frame);
590         }
591 
592         /**
593          * Increment the total amount of pending bytes for all streams. When any stream's pending bytes changes
594          * method should be called.
595          * @param delta The amount to increment by.
596          */
597         final void incrementPendingBytes(int delta) {
598             totalPendingBytes += delta;
599 
600             // Notification of writibilty change should be delayed until the end of the top level event.
601             // This is to ensure the flow controller is more consistent state before calling external listener methods.
602         }
603 
604         /**
605          * Determine if the stream associated with {@code state} is writable.
606          * @param state The state which is associated with the stream to test writability for.
607          * @return {@code true} if {@link FlowState#stream()} is writable. {@code false} otherwise.
608          */
609         final boolean isWritable(FlowState state) {
610             return isWritableConnection() && state.isWritable();
611         }
612 
613         final void writePendingBytes() throws Http2Exception {
614             // Reentry is not permitted during the byte distribution process. It may lead to undesirable distribution of
615             // bytes and even infinite loops. We protect against reentry and make sure each call has an opportunity to
616             // cause a distribution to occur. This may be useful for example if the channel's writability changes from
617             // Writable -> Not Writable (because we are writing) -> Writable (because the user flushed to make more room
618             // in the channel outbound buffer).
619             if (inWritePendingBytes) {
620                 return;
621             }
622             inWritePendingBytes = true;
623             try {
624                 int bytesToWrite = writableBytes();
625                 // Make sure we always write at least once, regardless if we have bytesToWrite or not.
626                 // This ensures that zero-length frames will always be written.
627                 for (;;) {
628                     if (!streamByteDistributor.distribute(bytesToWrite, this) ||
629                         (bytesToWrite = writableBytes()) <= 0 ||
630                         !isChannelWritable0()) {
631                         break;
632                     }
633                 }
634             } finally {
635                 inWritePendingBytes = false;
636             }
637         }
638 
639         void initialWindowSize(int newWindowSize) throws Http2Exception {
640             checkPositiveOrZero(newWindowSize, "newWindowSize");
641 
642             final int delta = newWindowSize - initialWindowSize;
643             initialWindowSize = newWindowSize;
644             connection.forEachActiveStream(stream -> {
645                 state(stream).incrementStreamWindow(delta);
646                 return true;
647             });
648 
649             if (delta > 0 && isChannelWritable()) {
650                 // The window size increased, send any pending frames for all streams.
651                 writePendingBytes();
652             }
653         }
654 
655         final boolean isWritableConnection() {
656             return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
657         }
658     }
659 
660     /**
661      * Writability of a {@code stream} is calculated using the following:
662      * <pre>
663      * Connection Window - Total Queued Bytes > 0 &&
664      * Stream Window - Bytes Queued for Stream > 0 &&
665      * isChannelWritable()
666      * </pre>
667      */
668     private final class ListenerWritabilityMonitor extends WritabilityMonitor implements Http2StreamVisitor {
669         private final Listener listener;
670 
671         ListenerWritabilityMonitor(Listener listener) {
672             this.listener = listener;
673         }
674 
675         @Override
676         public boolean visit(Http2Stream stream) throws Http2Exception {
677             FlowState state = state(stream);
678             if (isWritable(state) != state.markedWritability()) {
679                 notifyWritabilityChanged(state);
680             }
681             return true;
682         }
683 
684         @Override
685         void windowSize(FlowState state, int initialWindowSize) {
686             super.windowSize(state, initialWindowSize);
687             try {
688                 checkStateWritability(state);
689             } catch (Http2Exception e) {
690                 throw new RuntimeException("Caught unexpected exception from window", e);
691             }
692         }
693 
694         @Override
695         void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
696             super.incrementWindowSize(state, delta);
697             checkStateWritability(state);
698         }
699 
700         @Override
701         void initialWindowSize(int newWindowSize) throws Http2Exception {
702             super.initialWindowSize(newWindowSize);
703             if (isWritableConnection()) {
704                 // If the write operation does not occur we still need to check all streams because they
705                 // may have transitioned from writable to not writable.
706                 checkAllWritabilityChanged();
707             }
708         }
709 
710         @Override
711         void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
712             super.enqueueFrame(state, frame);
713             checkConnectionThenStreamWritabilityChanged(state);
714         }
715 
716         @Override
717         void stateCancelled(FlowState state) {
718             try {
719                 checkConnectionThenStreamWritabilityChanged(state);
720             } catch (Http2Exception e) {
721                 throw new RuntimeException("Caught unexpected exception from checkAllWritabilityChanged", e);
722             }
723         }
724 
725         @Override
726         void channelWritabilityChange() throws Http2Exception {
727             if (connectionState.markedWritability() != isChannelWritable()) {
728                 checkAllWritabilityChanged();
729             }
730         }
731 
732         private void checkStateWritability(FlowState state) throws Http2Exception {
733             if (isWritable(state) != state.markedWritability()) {
734                 if (state == connectionState) {
735                     checkAllWritabilityChanged();
736                 } else {
737                     notifyWritabilityChanged(state);
738                 }
739             }
740         }
741 
742         private void notifyWritabilityChanged(FlowState state) {
743             state.markedWritability(!state.markedWritability());
744             try {
745                 listener.writabilityChanged(state.stream);
746             } catch (Throwable cause) {
747                 logger.error("Caught Throwable from listener.writabilityChanged", cause);
748             }
749         }
750 
751         private void checkConnectionThenStreamWritabilityChanged(FlowState state) throws Http2Exception {
752             // It is possible that the connection window and/or the individual stream writability could change.
753             if (isWritableConnection() != connectionState.markedWritability()) {
754                 checkAllWritabilityChanged();
755             } else if (isWritable(state) != state.markedWritability()) {
756                 notifyWritabilityChanged(state);
757             }
758         }
759 
760         private void checkAllWritabilityChanged() throws Http2Exception {
761             // Make sure we mark that we have notified as a result of this change.
762             connectionState.markedWritability(isWritableConnection());
763             connection.forEachActiveStream(this);
764         }
765     }
766 }