View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.channel.ChannelHandlerContext;
18  import io.netty.util.internal.logging.InternalLogger;
19  import io.netty.util.internal.logging.InternalLoggerFactory;
20  
21  import java.util.ArrayDeque;
22  import java.util.Deque;
23  
24  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
25  import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
26  import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
27  import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
28  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
29  import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
30  import static io.netty.handler.codec.http2.Http2Exception.streamError;
31  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
32  import static io.netty.util.internal.ObjectUtil.checkNotNull;
33  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
34  import static java.lang.Math.max;
35  import static java.lang.Math.min;
36  
37  /**
38   * Basic implementation of {@link Http2RemoteFlowController}.
39   * <p>
40   * This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
41   * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
42   */
43  public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
44      private static final InternalLogger logger =
45              InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class);
46      private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
47      private final Http2Connection connection;
48      private final Http2Connection.PropertyKey stateKey;
49      private final StreamByteDistributor streamByteDistributor;
50      private final FlowState connectionState;
51      private int initialWindowSize = DEFAULT_WINDOW_SIZE;
52      private WritabilityMonitor monitor;
53      private ChannelHandlerContext ctx;
54  
55      public DefaultHttp2RemoteFlowController(Http2Connection connection) {
56          this(connection, (Listener) null);
57      }
58  
59      public DefaultHttp2RemoteFlowController(Http2Connection connection,
60                                              StreamByteDistributor streamByteDistributor) {
61          this(connection, streamByteDistributor, null);
62      }
63  
64      public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) {
65          this(connection, new WeightedFairQueueByteDistributor(connection), listener);
66      }
67  
68      public DefaultHttp2RemoteFlowController(Http2Connection connection,
69                                              StreamByteDistributor streamByteDistributor,
70                                              final Listener listener) {
71          this.connection = checkNotNull(connection, "connection");
72          this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor");
73  
74          // Add a flow state for the connection.
75          stateKey = connection.newKey();
76          connectionState = new FlowState(connection.connectionStream());
77          connection.connectionStream().setProperty(stateKey, connectionState);
78  
79          // Monitor may depend upon connectionState, and so initialize after connectionState
80          listener(listener);
81          monitor.windowSize(connectionState, initialWindowSize);
82  
83          // Register for notification of new streams.
84          connection.addListener(new Http2ConnectionAdapter() {
85              @Override
86              public void onStreamAdded(Http2Stream stream) {
87                  // If the stream state is not open then the stream is not yet eligible for flow controlled frames and
88                  // only requires the ReducedFlowState. Otherwise the full amount of memory is required.
89                  stream.setProperty(stateKey, new FlowState(stream));
90              }
91  
92              @Override
93              public void onStreamActive(Http2Stream stream) {
94                  // If the object was previously created, but later activated then we have to ensure the proper
95                  // initialWindowSize is used.
96                  monitor.windowSize(state(stream), initialWindowSize);
97              }
98  
99              @Override
100             public void onStreamClosed(Http2Stream stream) {
101                 // Any pending frames can never be written, cancel and
102                 // write errors for any pending frames.
103                 state(stream).cancel(STREAM_CLOSED, null);
104             }
105 
106             @Override
107             public void onStreamHalfClosed(Http2Stream stream) {
108                 if (HALF_CLOSED_LOCAL == stream.state()) {
109                     /*
110                      * When this method is called there should not be any
111                      * pending frames left if the API is used correctly. However,
112                      * it is possible that a erroneous application can sneak
113                      * in a frame even after having already written a frame with the
114                      * END_STREAM flag set, as the stream state might not transition
115                      * immediately to HALF_CLOSED_LOCAL / CLOSED due to flow control
116                      * delaying the write.
117                      *
118                      * This is to cancel any such illegal writes.
119                      */
120                     state(stream).cancel(STREAM_CLOSED, null);
121                 }
122             }
123         });
124     }
125 
126     /**
127      * {@inheritDoc}
128      * <p>
129      * Any queued {@link FlowControlled} objects will be sent.
130      */
131     @Override
132     public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
133         this.ctx = checkNotNull(ctx, "ctx");
134 
135         // Writing the pending bytes will not check writability change and instead a writability change notification
136         // to be provided by an explicit call.
137         channelWritabilityChanged();
138 
139         // Don't worry about cleaning up queued frames here if ctx is null. It is expected that all streams will be
140         // closed and the queue cleanup will occur when the stream state transitions occur.
141 
142         // If any frames have been queued up, we should send them now that we have a channel context.
143         if (isChannelWritable()) {
144             writePendingBytes();
145         }
146     }
147 
148     @Override
149     public ChannelHandlerContext channelHandlerContext() {
150         return ctx;
151     }
152 
153     @Override
154     public void initialWindowSize(int newWindowSize) throws Http2Exception {
155         assert ctx == null || ctx.executor().inEventLoop();
156         monitor.initialWindowSize(newWindowSize);
157     }
158 
159     @Override
160     public int initialWindowSize() {
161         return initialWindowSize;
162     }
163 
164     @Override
165     public int windowSize(Http2Stream stream) {
166         return state(stream).windowSize();
167     }
168 
169     @Override
170     public boolean isWritable(Http2Stream stream) {
171         return monitor.isWritable(state(stream));
172     }
173 
174     @Override
175     public void channelWritabilityChanged() throws Http2Exception {
176         monitor.channelWritabilityChange();
177     }
178 
179     @Override
180     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
181         // It is assumed there are all validated at a higher level. For example in the Http2FrameReader.
182         assert weight >= MIN_WEIGHT && weight <= MAX_WEIGHT : "Invalid weight";
183         assert childStreamId != parentStreamId : "A stream cannot depend on itself";
184         assert childStreamId > 0 && parentStreamId >= 0 : "childStreamId must be > 0. parentStreamId must be >= 0.";
185 
186         streamByteDistributor.updateDependencyTree(childStreamId, parentStreamId, weight, exclusive);
187     }
188 
189     private boolean isChannelWritable() {
190         return ctx != null && isChannelWritable0();
191     }
192 
193     private boolean isChannelWritable0() {
194         return ctx.channel().isWritable();
195     }
196 
197     @Override
198     public void listener(Listener listener) {
199         monitor = listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener);
200     }
201 
202     @Override
203     public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
204         assert ctx == null || ctx.executor().inEventLoop();
205         monitor.incrementWindowSize(state(stream), delta);
206     }
207 
208     @Override
209     public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
210         // The context can be null assuming the frame will be queued and send later when the context is set.
211         assert ctx == null || ctx.executor().inEventLoop();
212         checkNotNull(frame, "frame");
213         try {
214             monitor.enqueueFrame(state(stream), frame);
215         } catch (Throwable t) {
216             frame.error(ctx, t);
217         }
218     }
219 
220     @Override
221     public boolean hasFlowControlled(Http2Stream stream) {
222         return state(stream).hasFrame();
223     }
224 
225     private FlowState state(Http2Stream stream) {
226         return (FlowState) stream.getProperty(stateKey);
227     }
228 
229     /**
230      * Returns the flow control window for the entire connection.
231      */
232     private int connectionWindowSize() {
233         return connectionState.windowSize();
234     }
235 
236     private int minUsableChannelBytes() {
237         // The current allocation algorithm values "fairness" and doesn't give any consideration to "goodput". It
238         // is possible that 1 byte will be allocated to many streams. In an effort to try to make "goodput"
239         // reasonable with the current allocation algorithm we have this "cheap" check up front to ensure there is
240         // an "adequate" amount of connection window before allocation is attempted. This is not foolproof as if the
241         // number of streams is >= this minimal number then we may still have the issue, but the idea is to narrow the
242         // circumstances in which this can happen without rewriting the allocation algorithm.
243         return max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK);
244     }
245 
246     private int maxUsableChannelBytes() {
247         // If the channel isWritable, allow at least minUsableChannelBytes.
248         int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable());
249         int usableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
250 
251         // Clip the usable bytes by the connection window.
252         return min(connectionState.windowSize(), usableBytes);
253     }
254 
255     /**
256      * The amount of bytes that can be supported by underlying {@link io.netty.channel.Channel} without
257      * queuing "too-much".
258      */
259     private int writableBytes() {
260         return min(connectionWindowSize(), maxUsableChannelBytes());
261     }
262 
263     @Override
264     public void writePendingBytes() throws Http2Exception {
265         monitor.writePendingBytes();
266     }
267 
268     /**
269      * The remote flow control state for a single stream.
270      */
271     private final class FlowState implements StreamByteDistributor.StreamState {
272         private final Http2Stream stream;
273         private final Deque<FlowControlled> pendingWriteQueue;
274         private int window;
275         private long pendingBytes;
276         private boolean markedWritable;
277 
278         /**
279          * Set to true while a frame is being written, false otherwise.
280          */
281         private boolean writing;
282         /**
283          * Set to true if cancel() was called.
284          */
285         private boolean cancelled;
286 
287         FlowState(Http2Stream stream) {
288             this.stream = stream;
289             pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
290         }
291 
292         /**
293          * Determine if the stream associated with this object is writable.
294          * @return {@code true} if the stream associated with this object is writable.
295          */
296         boolean isWritable() {
297             return windowSize() > pendingBytes() && !cancelled;
298         }
299 
300         /**
301          * The stream this state is associated with.
302          */
303         @Override
304         public Http2Stream stream() {
305             return stream;
306         }
307 
308         /**
309          * Returns the parameter from the last call to {@link #markedWritability(boolean)}.
310          */
311         boolean markedWritability() {
312             return markedWritable;
313         }
314 
315         /**
316          * Save the state of writability.
317          */
318         void markedWritability(boolean isWritable) {
319             this.markedWritable = isWritable;
320         }
321 
322         @Override
323         public int windowSize() {
324             return window;
325         }
326 
327         /**
328          * Reset the window size for this stream.
329          */
330         void windowSize(int initialWindowSize) {
331             window = initialWindowSize;
332         }
333 
334         /**
335          * Write the allocated bytes for this stream.
336          * @return the number of bytes written for a stream or {@code -1} if no write occurred.
337          */
338         int writeAllocatedBytes(int allocated) {
339             final int initialAllocated = allocated;
340             int writtenBytes;
341             // In case an exception is thrown we want to remember it and pass it to cancel(Throwable).
342             Throwable cause = null;
343             FlowControlled frame;
344             try {
345                 assert !writing;
346                 writing = true;
347 
348                 // Write the remainder of frames that we are allowed to
349                 boolean writeOccurred = false;
350                 while (!cancelled && (frame = peek()) != null) {
351                     int maxBytes = min(allocated, writableWindow());
352                     if (maxBytes <= 0 && frame.size() > 0) {
353                         // The frame still has data, but the amount of allocated bytes has been exhausted.
354                         // Don't write needless empty frames.
355                         break;
356                     }
357                     writeOccurred = true;
358                     int initialFrameSize = frame.size();
359                     try {
360                         frame.write(ctx, max(0, maxBytes));
361                         if (frame.size() == 0) {
362                             // This frame has been fully written, remove this frame and notify it.
363                             // Since we remove this frame first, we're guaranteed that its error
364                             // method will not be called when we call cancel.
365                             pendingWriteQueue.remove();
366                             frame.writeComplete();
367                         }
368                     } finally {
369                         // Decrement allocated by how much was actually written.
370                         allocated -= initialFrameSize - frame.size();
371                     }
372                 }
373 
374                 if (!writeOccurred) {
375                     // Either there was no frame, or the amount of allocated bytes has been exhausted.
376                     return -1;
377                 }
378 
379             } catch (Throwable t) {
380                 // Mark the state as cancelled, we'll clear the pending queue via cancel() below.
381                 cancelled = true;
382                 cause = t;
383             } finally {
384                 writing = false;
385                 // Make sure we always decrement the flow control windows
386                 // by the bytes written.
387                 writtenBytes = initialAllocated - allocated;
388 
389                 decrementPendingBytes(writtenBytes, false);
390                 decrementFlowControlWindow(writtenBytes);
391 
392                 // If a cancellation occurred while writing, call cancel again to
393                 // clear and error all of the pending writes.
394                 if (cancelled) {
395                     cancel(INTERNAL_ERROR, cause);
396                 }
397             }
398             return writtenBytes;
399         }
400 
401         /**
402          * Increments the flow control window for this stream by the given delta and returns the new value.
403          */
404         int incrementStreamWindow(int delta) throws Http2Exception {
405             if (delta > 0 && Integer.MAX_VALUE - delta < window) {
406                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
407                         "Window size overflow for stream: %d", stream.id());
408             }
409             window += delta;
410 
411             streamByteDistributor.updateStreamableBytes(this);
412             return window;
413         }
414 
415         /**
416          * Returns the maximum writable window (minimum of the stream and connection windows).
417          */
418         private int writableWindow() {
419             return min(window, connectionWindowSize());
420         }
421 
422         @Override
423         public long pendingBytes() {
424             return pendingBytes;
425         }
426 
427         /**
428          * Adds the {@code frame} to the pending queue and increments the pending byte count.
429          */
430         void enqueueFrame(FlowControlled frame) {
431             FlowControlled last = pendingWriteQueue.peekLast();
432             if (last == null) {
433                 enqueueFrameWithoutMerge(frame);
434                 return;
435             }
436 
437             int lastSize = last.size();
438             if (last.merge(ctx, frame)) {
439                 incrementPendingBytes(last.size() - lastSize, true);
440                 return;
441             }
442             enqueueFrameWithoutMerge(frame);
443         }
444 
445         private void enqueueFrameWithoutMerge(FlowControlled frame) {
446             pendingWriteQueue.offer(frame);
447             // This must be called after adding to the queue in order so that hasFrame() is
448             // updated before updating the stream state.
449             incrementPendingBytes(frame.size(), true);
450         }
451 
452         @Override
453         public boolean hasFrame() {
454             return !pendingWriteQueue.isEmpty();
455         }
456 
457         /**
458          * Returns the head of the pending queue, or {@code null} if empty.
459          */
460         private FlowControlled peek() {
461             return pendingWriteQueue.peek();
462         }
463 
464         /**
465          * Clears the pending queue and writes errors for each remaining frame.
466          * @param error the {@link Http2Error} to use.
467          * @param cause the {@link Throwable} that caused this method to be invoked.
468          */
469         void cancel(Http2Error error, Throwable cause) {
470             cancelled = true;
471             // Ensure that the queue can't be modified while we are writing.
472             if (writing) {
473                 return;
474             }
475 
476             FlowControlled frame = pendingWriteQueue.poll();
477             if (frame != null) {
478                 // Only create exception once and reuse to reduce overhead of filling in the stacktrace.
479                 final Http2Exception exception = streamError(stream.id(), error, cause,
480                         "Stream closed before write could take place");
481                 do {
482                     writeError(frame, exception);
483                     frame = pendingWriteQueue.poll();
484                 } while (frame != null);
485             }
486 
487             streamByteDistributor.updateStreamableBytes(this);
488 
489             monitor.stateCancelled(this);
490         }
491 
492         /**
493          * Increments the number of pending bytes for this node and optionally updates the
494          * {@link StreamByteDistributor}.
495          */
496         private void incrementPendingBytes(int numBytes, boolean updateStreamableBytes) {
497             pendingBytes += numBytes;
498             monitor.incrementPendingBytes(numBytes);
499             if (updateStreamableBytes) {
500                 streamByteDistributor.updateStreamableBytes(this);
501             }
502         }
503 
504         /**
505          * If this frame is in the pending queue, decrements the number of pending bytes for the stream.
506          */
507         private void decrementPendingBytes(int bytes, boolean updateStreamableBytes) {
508             incrementPendingBytes(-bytes, updateStreamableBytes);
509         }
510 
511         /**
512          * Decrement the per stream and connection flow control window by {@code bytes}.
513          */
514         private void decrementFlowControlWindow(int bytes) {
515             try {
516                 int negativeBytes = -bytes;
517                 connectionState.incrementStreamWindow(negativeBytes);
518                 incrementStreamWindow(negativeBytes);
519             } catch (Http2Exception e) {
520                 // Should never get here since we're decrementing.
521                 throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e);
522             }
523         }
524 
525         /**
526          * Discards this {@link FlowControlled}, writing an error. If this frame is in the pending queue,
527          * the unwritten bytes are removed from this branch of the priority tree.
528          */
529         private void writeError(FlowControlled frame, Http2Exception cause) {
530             assert ctx != null;
531             decrementPendingBytes(frame.size(), true);
532             frame.error(ctx, cause);
533         }
534     }
535 
536     /**
537      * Abstract class which provides common functionality for writability monitor implementations.
538      */
539     private class WritabilityMonitor implements StreamByteDistributor.Writer {
540         private boolean inWritePendingBytes;
541         private long totalPendingBytes;
542 
543         @Override
544         public final void write(Http2Stream stream, int numBytes) {
545             state(stream).writeAllocatedBytes(numBytes);
546         }
547 
548         /**
549          * Called when the writability of the underlying channel changes.
550          * @throws Http2Exception If a write occurs and an exception happens in the write operation.
551          */
552         void channelWritabilityChange() throws Http2Exception { }
553 
554         /**
555          * Called when the state is cancelled.
556          * @param state the state that was cancelled.
557          */
558         void stateCancelled(FlowState state) { }
559 
560         /**
561          * Set the initial window size for {@code state}.
562          * @param state the state to change the initial window size for.
563          * @param initialWindowSize the size of the window in bytes.
564          */
565         void windowSize(FlowState state, int initialWindowSize) {
566             state.windowSize(initialWindowSize);
567         }
568 
569         /**
570          * Increment the window size for a particular stream.
571          * @param state the state associated with the stream whose window is being incremented.
572          * @param delta The amount to increment by.
573          * @throws Http2Exception If this operation overflows the window for {@code state}.
574          */
575         void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
576             state.incrementStreamWindow(delta);
577         }
578 
579         /**
580          * Add a frame to be sent via flow control.
581          * @param state The state associated with the stream which the {@code frame} is associated with.
582          * @param frame the frame to enqueue.
583          * @throws Http2Exception If a writability error occurs.
584          */
585         void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
586             state.enqueueFrame(frame);
587         }
588 
589         /**
590          * Increment the total amount of pending bytes for all streams. When any stream's pending bytes changes
591          * method should be called.
592          * @param delta The amount to increment by.
593          */
594         final void incrementPendingBytes(int delta) {
595             totalPendingBytes += delta;
596 
597             // Notification of writibilty change should be delayed until the end of the top level event.
598             // This is to ensure the flow controller is more consistent state before calling external listener methods.
599         }
600 
601         /**
602          * Determine if the stream associated with {@code state} is writable.
603          * @param state The state which is associated with the stream to test writability for.
604          * @return {@code true} if {@link FlowState#stream()} is writable. {@code false} otherwise.
605          */
606         final boolean isWritable(FlowState state) {
607             return isWritableConnection() && state.isWritable();
608         }
609 
610         final void writePendingBytes() throws Http2Exception {
611             // Reentry is not permitted during the byte distribution process. It may lead to undesirable distribution of
612             // bytes and even infinite loops. We protect against reentry and make sure each call has an opportunity to
613             // cause a distribution to occur. This may be useful for example if the channel's writability changes from
614             // Writable -> Not Writable (because we are writing) -> Writable (because the user flushed to make more room
615             // in the channel outbound buffer).
616             if (inWritePendingBytes) {
617                 return;
618             }
619             inWritePendingBytes = true;
620             try {
621                 int bytesToWrite = writableBytes();
622                 // Make sure we always write at least once, regardless if we have bytesToWrite or not.
623                 // This ensures that zero-length frames will always be written.
624                 for (;;) {
625                     if (!streamByteDistributor.distribute(bytesToWrite, this) ||
626                         (bytesToWrite = writableBytes()) <= 0 ||
627                         !isChannelWritable0()) {
628                         break;
629                     }
630                 }
631             } finally {
632                 inWritePendingBytes = false;
633             }
634         }
635 
636         void initialWindowSize(int newWindowSize) throws Http2Exception {
637             checkPositiveOrZero(newWindowSize, "newWindowSize");
638 
639             final int delta = newWindowSize - initialWindowSize;
640             initialWindowSize = newWindowSize;
641             connection.forEachActiveStream(new Http2StreamVisitor() {
642                 @Override
643                 public boolean visit(Http2Stream stream) throws Http2Exception {
644                     state(stream).incrementStreamWindow(delta);
645                     return true;
646                 }
647             });
648 
649             if (delta > 0 && isChannelWritable()) {
650                 // The window size increased, send any pending frames for all streams.
651                 writePendingBytes();
652             }
653         }
654 
655         final boolean isWritableConnection() {
656             return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
657         }
658     }
659 
660     /**
661      * Writability of a {@code stream} is calculated using the following:
662      * <pre>
663      * Connection Window - Total Queued Bytes > 0 &&
664      * Stream Window - Bytes Queued for Stream > 0 &&
665      * isChannelWritable()
666      * </pre>
667      */
668     private final class ListenerWritabilityMonitor extends WritabilityMonitor implements Http2StreamVisitor {
669         private final Listener listener;
670 
671         ListenerWritabilityMonitor(Listener listener) {
672             this.listener = listener;
673         }
674 
675         @Override
676         public boolean visit(Http2Stream stream) throws Http2Exception {
677             FlowState state = state(stream);
678             if (isWritable(state) != state.markedWritability()) {
679                 notifyWritabilityChanged(state);
680             }
681             return true;
682         }
683 
684         @Override
685         void windowSize(FlowState state, int initialWindowSize) {
686             super.windowSize(state, initialWindowSize);
687             try {
688                 checkStateWritability(state);
689             } catch (Http2Exception e) {
690                 throw new RuntimeException("Caught unexpected exception from window", e);
691             }
692         }
693 
694         @Override
695         void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
696             super.incrementWindowSize(state, delta);
697             checkStateWritability(state);
698         }
699 
700         @Override
701         void initialWindowSize(int newWindowSize) throws Http2Exception {
702             super.initialWindowSize(newWindowSize);
703             if (isWritableConnection()) {
704                 // If the write operation does not occur we still need to check all streams because they
705                 // may have transitioned from writable to not writable.
706                 checkAllWritabilityChanged();
707             }
708         }
709 
710         @Override
711         void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
712             super.enqueueFrame(state, frame);
713             checkConnectionThenStreamWritabilityChanged(state);
714         }
715 
716         @Override
717         void stateCancelled(FlowState state) {
718             try {
719                 checkConnectionThenStreamWritabilityChanged(state);
720             } catch (Http2Exception e) {
721                 throw new RuntimeException("Caught unexpected exception from checkAllWritabilityChanged", e);
722             }
723         }
724 
725         @Override
726         void channelWritabilityChange() throws Http2Exception {
727             if (connectionState.markedWritability() != isChannelWritable()) {
728                 checkAllWritabilityChanged();
729             }
730         }
731 
732         private void checkStateWritability(FlowState state) throws Http2Exception {
733             if (isWritable(state) != state.markedWritability()) {
734                 if (state == connectionState) {
735                     checkAllWritabilityChanged();
736                 } else {
737                     notifyWritabilityChanged(state);
738                 }
739             }
740         }
741 
742         private void notifyWritabilityChanged(FlowState state) {
743             state.markedWritability(!state.markedWritability());
744             try {
745                 listener.writabilityChanged(state.stream);
746             } catch (Throwable cause) {
747                 logger.error("Caught Throwable from listener.writabilityChanged", cause);
748             }
749         }
750 
751         private void checkConnectionThenStreamWritabilityChanged(FlowState state) throws Http2Exception {
752             // It is possible that the connection window and/or the individual stream writability could change.
753             if (isWritableConnection() != connectionState.markedWritability()) {
754                 checkAllWritabilityChanged();
755             } else if (isWritable(state) != state.markedWritability()) {
756                 notifyWritabilityChanged(state);
757             }
758         }
759 
760         private void checkAllWritabilityChanged() throws Http2Exception {
761             // Make sure we mark that we have notified as a result of this change.
762             connectionState.markedWritability(isWritableConnection());
763             connection.forEachActiveStream(this);
764         }
765     }
766 }