View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.channel.ChannelHandlerContext;
18  import io.netty.handler.codec.http2.StreamByteDistributor.Writer;
19  import io.netty.util.BooleanSupplier;
20  import io.netty.util.internal.UnstableApi;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.util.ArrayDeque;
25  import java.util.Deque;
26  
27  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
28  import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
29  import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
30  import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
31  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
32  import static io.netty.handler.codec.http2.Http2Exception.streamError;
33  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
34  import static io.netty.util.internal.ObjectUtil.checkNotNull;
35  import static java.lang.Math.max;
36  import static java.lang.Math.min;
37  
38  /**
39   * Basic implementation of {@link Http2RemoteFlowController}.
40   * <p>
41   * This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
42   * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
43   */
44  @UnstableApi
45  public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
46      private static final InternalLogger logger =
47              InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class);
48      private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
49      private final Http2Connection connection;
50      private final Http2Connection.PropertyKey stateKey;
51      private final StreamByteDistributor streamByteDistributor;
52      private final FlowState connectionState;
53      private int initialWindowSize = DEFAULT_WINDOW_SIZE;
54      private WritabilityMonitor monitor;
55      private ChannelHandlerContext ctx;
56  
57      public DefaultHttp2RemoteFlowController(Http2Connection connection) {
58          this(connection, (Listener) null);
59      }
60  
61      public DefaultHttp2RemoteFlowController(Http2Connection connection,
62                                              StreamByteDistributor streamByteDistributor) {
63          this(connection, streamByteDistributor, null);
64      }
65  
66      public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) {
67          this(connection, new WeightedFairQueueByteDistributor(connection), listener);
68      }
69  
70      public DefaultHttp2RemoteFlowController(Http2Connection connection,
71                                              StreamByteDistributor streamByteDistributor,
72                                              final Listener listener) {
73          this.connection = checkNotNull(connection, "connection");
74          this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor");
75  
76          // Add a flow state for the connection.
77          stateKey = connection.newKey();
78          connectionState = new FlowState(connection.connectionStream());
79          connection.connectionStream().setProperty(stateKey, connectionState);
80  
81          // Monitor may depend upon connectionState, and so initialize after connectionState
82          listener(listener);
83          monitor.windowSize(connectionState, initialWindowSize);
84  
85          // Register for notification of new streams.
86          connection.addListener(new Http2ConnectionAdapter() {
87              @Override
88              public void onStreamAdded(Http2Stream stream) {
89                  // If the stream state is not open then the stream is not yet eligible for flow controlled frames and
90                  // only requires the ReducedFlowState. Otherwise the full amount of memory is required.
91                  stream.setProperty(stateKey, new FlowState(stream));
92              }
93  
94              @Override
95              public void onStreamActive(Http2Stream stream) {
96                  // If the object was previously created, but later activated then we have to ensure the proper
97                  // initialWindowSize is used.
98                  monitor.windowSize(state(stream), initialWindowSize);
99              }
100 
101             @Override
102             public void onStreamClosed(Http2Stream stream) {
103                 // Any pending frames can never be written, cancel and
104                 // write errors for any pending frames.
105                 state(stream).cancel();
106             }
107 
108             @Override
109             public void onStreamHalfClosed(Http2Stream stream) {
110                 if (HALF_CLOSED_LOCAL.equals(stream.state())) {
111                     /**
112                      * When this method is called there should not be any
113                      * pending frames left if the API is used correctly. However,
114                      * it is possible that a erroneous application can sneak
115                      * in a frame even after having already written a frame with the
116                      * END_STREAM flag set, as the stream state might not transition
117                      * immediately to HALF_CLOSED_LOCAL / CLOSED due to flow control
118                      * delaying the write.
119                      *
120                      * This is to cancel any such illegal writes.
121                      */
122                     state(stream).cancel();
123                 }
124             }
125         });
126     }
127 
128     /**
129      * {@inheritDoc}
130      * <p>
131      * Any queued {@link FlowControlled} objects will be sent.
132      */
133     @Override
134     public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
135         this.ctx = checkNotNull(ctx, "ctx");
136 
137         // Writing the pending bytes will not check writability change and instead a writability change notification
138         // to be provided by an explicit call.
139         channelWritabilityChanged();
140 
141         // Don't worry about cleaning up queued frames here if ctx is null. It is expected that all streams will be
142         // closed and the queue cleanup will occur when the stream state transitions occur.
143 
144         // If any frames have been queued up, we should send them now that we have a channel context.
145         if (isChannelWritable()) {
146             writePendingBytes();
147         }
148     }
149 
150     @Override
151     public ChannelHandlerContext channelHandlerContext() {
152         return ctx;
153     }
154 
155     @Override
156     public void initialWindowSize(int newWindowSize) throws Http2Exception {
157         assert ctx == null || ctx.executor().inEventLoop();
158         monitor.initialWindowSize(newWindowSize);
159     }
160 
161     @Override
162     public int initialWindowSize() {
163         return initialWindowSize;
164     }
165 
166     @Override
167     public int windowSize(Http2Stream stream) {
168         return state(stream).windowSize();
169     }
170 
171     @Override
172     public boolean isWritable(Http2Stream stream) {
173         return monitor.isWritable(state(stream));
174     }
175 
176     @Override
177     public void channelWritabilityChanged() throws Http2Exception {
178         monitor.channelWritabilityChange();
179     }
180 
181     @Override
182     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
183         // It is assumed there are all validated at a higher level. For example in the Http2FrameReader.
184         assert weight >= MIN_WEIGHT && weight <= MAX_WEIGHT : "Invalid weight";
185         assert childStreamId != parentStreamId : "A stream cannot depend on itself";
186         assert childStreamId > 0 && parentStreamId >= 0 : "childStreamId must be > 0. parentStreamId must be >= 0.";
187 
188         streamByteDistributor.updateDependencyTree(childStreamId, parentStreamId, weight, exclusive);
189     }
190 
191     private boolean isChannelWritable() {
192         return ctx != null && isChannelWritable0();
193     }
194 
195     private boolean isChannelWritable0() {
196         return ctx.channel().isWritable();
197     }
198 
199     @Override
200     public void listener(Listener listener) {
201         monitor = listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener);
202     }
203 
204     @Override
205     public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
206         assert ctx == null || ctx.executor().inEventLoop();
207         monitor.incrementWindowSize(state(stream), delta);
208     }
209 
210     @Override
211     public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
212         // The context can be null assuming the frame will be queued and send later when the context is set.
213         assert ctx == null || ctx.executor().inEventLoop();
214         checkNotNull(frame, "frame");
215         try {
216             monitor.enqueueFrame(state(stream), frame);
217         } catch (Throwable t) {
218             frame.error(ctx, t);
219         }
220     }
221 
222     @Override
223     public boolean hasFlowControlled(Http2Stream stream) {
224         return state(stream).hasFrame();
225     }
226 
227     private FlowState state(Http2Stream stream) {
228         return (FlowState) stream.getProperty(stateKey);
229     }
230 
231     /**
232      * Returns the flow control window for the entire connection.
233      */
234     private int connectionWindowSize() {
235         return connectionState.windowSize();
236     }
237 
238     private int minUsableChannelBytes() {
239         // The current allocation algorithm values "fairness" and doesn't give any consideration to "goodput". It
240         // is possible that 1 byte will be allocated to many streams. In an effort to try to make "goodput"
241         // reasonable with the current allocation algorithm we have this "cheap" check up front to ensure there is
242         // an "adequate" amount of connection window before allocation is attempted. This is not foolproof as if the
243         // number of streams is >= this minimal number then we may still have the issue, but the idea is to narrow the
244         // circumstances in which this can happen without rewriting the allocation algorithm.
245         return max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK);
246     }
247 
248     private int maxUsableChannelBytes() {
249         // If the channel isWritable, allow at least minUsableChannelBytes.
250         int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable());
251         int usableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
252 
253         // Clip the usable bytes by the connection window.
254         return min(connectionState.windowSize(), usableBytes);
255     }
256 
257     /**
258      * The amount of bytes that can be supported by underlying {@link io.netty.channel.Channel} without
259      * queuing "too-much".
260      */
261     private int writableBytes() {
262         return min(connectionWindowSize(), maxUsableChannelBytes());
263     }
264 
265     @Override
266     public void writePendingBytes() throws Http2Exception {
267         monitor.writePendingBytes();
268     }
269 
270     /**
271      * The remote flow control state for a single stream.
272      */
273     private final class FlowState implements StreamByteDistributor.StreamState {
274         private final Http2Stream stream;
275         private final Deque<FlowControlled> pendingWriteQueue;
276         private int window;
277         private int pendingBytes;
278         private boolean markedWritable;
279 
280         /**
281          * Set to true while a frame is being written, false otherwise.
282          */
283         private boolean writing;
284         /**
285          * Set to true if cancel() was called.
286          */
287         private boolean cancelled;
288         private BooleanSupplier isWritableSupplier = new BooleanSupplier() {
289             @Override
290             public boolean get() throws Exception {
291                 return windowSize() > pendingBytes();
292             }
293         };
294 
295         FlowState(Http2Stream stream) {
296             this.stream = stream;
297             pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
298         }
299 
300         /**
301          * Determine if the stream associated with this object is writable.
302          * @return {@code true} if the stream associated with this object is writable.
303          */
304         boolean isWritable() {
305             try {
306                 return isWritableSupplier.get();
307             } catch (Throwable cause) {
308                 throw new Error("isWritableSupplier should never throw!", cause);
309             }
310         }
311 
312         /**
313          * The stream this state is associated with.
314          */
315         @Override
316         public Http2Stream stream() {
317             return stream;
318         }
319 
320         /**
321          * Returns the parameter from the last call to {@link #markedWritability(boolean)}.
322          */
323         boolean markedWritability() {
324             return markedWritable;
325         }
326 
327         /**
328          * Save the state of writability.
329          */
330         void markedWritability(boolean isWritable) {
331             this.markedWritable = isWritable;
332         }
333 
334         @Override
335         public int windowSize() {
336             return window;
337         }
338 
339         /**
340          * Reset the window size for this stream.
341          */
342         void windowSize(int initialWindowSize) {
343             window = initialWindowSize;
344         }
345 
346         /**
347          * Write the allocated bytes for this stream.
348          * @return the number of bytes written for a stream or {@code -1} if no write occurred.
349          */
350         int writeAllocatedBytes(int allocated) {
351             final int initialAllocated = allocated;
352             int writtenBytes;
353             // In case an exception is thrown we want to remember it and pass it to cancel(Throwable).
354             Throwable cause = null;
355             FlowControlled frame;
356             try {
357                 assert !writing;
358                 writing = true;
359 
360                 // Write the remainder of frames that we are allowed to
361                 boolean writeOccurred = false;
362                 while (!cancelled && (frame = peek()) != null) {
363                     int maxBytes = min(allocated, writableWindow());
364                     if (maxBytes <= 0 && frame.size() > 0) {
365                         // The frame still has data, but the amount of allocated bytes has been exhausted.
366                         // Don't write needless empty frames.
367                         break;
368                     }
369                     writeOccurred = true;
370                     int initialFrameSize = frame.size();
371                     try {
372                         frame.write(ctx, max(0, maxBytes));
373                         if (frame.size() == 0) {
374                             // This frame has been fully written, remove this frame and notify it.
375                             // Since we remove this frame first, we're guaranteed that its error
376                             // method will not be called when we call cancel.
377                             pendingWriteQueue.remove();
378                             frame.writeComplete();
379                         }
380                     } finally {
381                         // Decrement allocated by how much was actually written.
382                         allocated -= initialFrameSize - frame.size();
383                     }
384                 }
385 
386                 if (!writeOccurred) {
387                     // Either there was no frame, or the amount of allocated bytes has been exhausted.
388                     return -1;
389                 }
390 
391             } catch (Throwable t) {
392                 // Mark the state as cancelled, we'll clear the pending queue via cancel() below.
393                 cancelled = true;
394                 cause = t;
395             } finally {
396                 writing = false;
397                 // Make sure we always decrement the flow control windows
398                 // by the bytes written.
399                 writtenBytes = initialAllocated - allocated;
400 
401                 decrementPendingBytes(writtenBytes, false);
402                 decrementFlowControlWindow(writtenBytes);
403 
404                 // If a cancellation occurred while writing, call cancel again to
405                 // clear and error all of the pending writes.
406                 if (cancelled) {
407                     cancel(cause);
408                 }
409             }
410             return writtenBytes;
411         }
412 
413         /**
414          * Increments the flow control window for this stream by the given delta and returns the new value.
415          */
416         int incrementStreamWindow(int delta) throws Http2Exception {
417             if (delta > 0 && Integer.MAX_VALUE - delta < window) {
418                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
419                         "Window size overflow for stream: %d", stream.id());
420             }
421             window += delta;
422 
423             streamByteDistributor.updateStreamableBytes(this);
424             return window;
425         }
426 
427         /**
428          * Returns the maximum writable window (minimum of the stream and connection windows).
429          */
430         private int writableWindow() {
431             return min(window, connectionWindowSize());
432         }
433 
434         @Override
435         public int pendingBytes() {
436             return pendingBytes;
437         }
438 
439         /**
440          * Adds the {@code frame} to the pending queue and increments the pending byte count.
441          */
442         void enqueueFrame(FlowControlled frame) {
443             FlowControlled last = pendingWriteQueue.peekLast();
444             if (last == null) {
445                 enqueueFrameWithoutMerge(frame);
446                 return;
447             }
448 
449             int lastSize = last.size();
450             if (last.merge(ctx, frame)) {
451                 incrementPendingBytes(last.size() - lastSize, true);
452                 return;
453             }
454             enqueueFrameWithoutMerge(frame);
455         }
456 
457         private void enqueueFrameWithoutMerge(FlowControlled frame) {
458             pendingWriteQueue.offer(frame);
459             // This must be called after adding to the queue in order so that hasFrame() is
460             // updated before updating the stream state.
461             incrementPendingBytes(frame.size(), true);
462         }
463 
464         @Override
465         public boolean hasFrame() {
466             return !pendingWriteQueue.isEmpty();
467         }
468 
469         /**
470          * Returns the the head of the pending queue, or {@code null} if empty.
471          */
472         private FlowControlled peek() {
473             return pendingWriteQueue.peek();
474         }
475 
476         /**
477          * Any operations that may be pending are cleared and the status of these operations is failed.
478          */
479         void cancel() {
480             cancel(null);
481         }
482 
483         /**
484          * Clears the pending queue and writes errors for each remaining frame.
485          * @param cause the {@link Throwable} that caused this method to be invoked.
486          */
487         private void cancel(Throwable cause) {
488             cancelled = true;
489             // Ensure that the queue can't be modified while we are writing.
490             if (writing) {
491                 return;
492             }
493 
494             for (;;) {
495                 FlowControlled frame = pendingWriteQueue.poll();
496                 if (frame == null) {
497                     break;
498                 }
499                 writeError(frame, streamError(stream.id(), INTERNAL_ERROR, cause,
500                                               "Stream closed before write could take place"));
501             }
502 
503             streamByteDistributor.updateStreamableBytes(this);
504 
505             isWritableSupplier = BooleanSupplier.FALSE_SUPPLIER;
506             monitor.stateCancelled(this);
507         }
508 
509         /**
510          * Increments the number of pending bytes for this node and optionally updates the
511          * {@link StreamByteDistributor}.
512          */
513         private void incrementPendingBytes(int numBytes, boolean updateStreamableBytes) {
514             pendingBytes += numBytes;
515             monitor.incrementPendingBytes(numBytes);
516             if (updateStreamableBytes) {
517                 streamByteDistributor.updateStreamableBytes(this);
518             }
519         }
520 
521         /**
522          * If this frame is in the pending queue, decrements the number of pending bytes for the stream.
523          */
524         private void decrementPendingBytes(int bytes, boolean updateStreamableBytes) {
525             incrementPendingBytes(-bytes, updateStreamableBytes);
526         }
527 
528         /**
529          * Decrement the per stream and connection flow control window by {@code bytes}.
530          */
531         private void decrementFlowControlWindow(int bytes) {
532             try {
533                 int negativeBytes = -bytes;
534                 connectionState.incrementStreamWindow(negativeBytes);
535                 incrementStreamWindow(negativeBytes);
536             } catch (Http2Exception e) {
537                 // Should never get here since we're decrementing.
538                 throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e);
539             }
540         }
541 
542         /**
543          * Discards this {@link FlowControlled}, writing an error. If this frame is in the pending queue,
544          * the unwritten bytes are removed from this branch of the priority tree.
545          */
546         private void writeError(FlowControlled frame, Http2Exception cause) {
547             assert ctx != null;
548             decrementPendingBytes(frame.size(), true);
549             frame.error(ctx, cause);
550         }
551     }
552 
553     /**
554      * Abstract class which provides common functionality for writability monitor implementations.
555      */
556     private class WritabilityMonitor {
557         private boolean inWritePendingBytes;
558         private long totalPendingBytes;
559         private final Writer writer = new StreamByteDistributor.Writer() {
560             @Override
561             public void write(Http2Stream stream, int numBytes) {
562                 state(stream).writeAllocatedBytes(numBytes);
563             }
564         };
565 
566         /**
567          * Called when the writability of the underlying channel changes.
568          * @throws Http2Exception If a write occurs and an exception happens in the write operation.
569          */
570         void channelWritabilityChange() throws Http2Exception { }
571 
572         /**
573          * Called when the state is cancelled.
574          * @param state the state that was cancelled.
575          */
576         void stateCancelled(FlowState state) { }
577 
578         /**
579          * Set the initial window size for {@code state}.
580          * @param state the state to change the initial window size for.
581          * @param initialWindowSize the size of the window in bytes.
582          */
583         void windowSize(FlowState state, int initialWindowSize) {
584             state.windowSize(initialWindowSize);
585         }
586 
587         /**
588          * Increment the window size for a particular stream.
589          * @param state the state associated with the stream whose window is being incremented.
590          * @param delta The amount to increment by.
591          * @throws Http2Exception If this operation overflows the window for {@code state}.
592          */
593         void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
594             state.incrementStreamWindow(delta);
595         }
596 
597         /**
598          * Add a frame to be sent via flow control.
599          * @param state The state associated with the stream which the {@code frame} is associated with.
600          * @param frame the frame to enqueue.
601          * @throws Http2Exception If a writability error occurs.
602          */
603         void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
604             state.enqueueFrame(frame);
605         }
606 
607         /**
608          * Increment the total amount of pending bytes for all streams. When any stream's pending bytes changes
609          * method should be called.
610          * @param delta The amount to increment by.
611          */
612         final void incrementPendingBytes(int delta) {
613             totalPendingBytes += delta;
614 
615             // Notification of writibilty change should be delayed until the end of the top level event.
616             // This is to ensure the flow controller is more consistent state before calling external listener methods.
617         }
618 
619         /**
620          * Determine if the stream associated with {@code state} is writable.
621          * @param state The state which is associated with the stream to test writability for.
622          * @return {@code true} if {@link FlowState#stream()} is writable. {@code false} otherwise.
623          */
624         final boolean isWritable(FlowState state) {
625             return isWritableConnection() && state.isWritable();
626         }
627 
628         final void writePendingBytes() throws Http2Exception {
629             // Reentry is not permitted during the byte distribution process. It may lead to undesirable distribution of
630             // bytes and even infinite loops. We protect against reentry and make sure each call has an opportunity to
631             // cause a distribution to occur. This may be useful for example if the channel's writability changes from
632             // Writable -> Not Writable (because we are writing) -> Writable (because the user flushed to make more room
633             // in the channel outbound buffer).
634             if (inWritePendingBytes) {
635                 return;
636             }
637             inWritePendingBytes = true;
638             try {
639                 int bytesToWrite = writableBytes();
640                 // Make sure we always write at least once, regardless if we have bytesToWrite or not.
641                 // This ensures that zero-length frames will always be written.
642                 for (;;) {
643                     if (!streamByteDistributor.distribute(bytesToWrite, writer) ||
644                         (bytesToWrite = writableBytes()) <= 0 ||
645                         !isChannelWritable0()) {
646                         break;
647                     }
648                 }
649             } finally {
650                 inWritePendingBytes = false;
651             }
652         }
653 
654         void initialWindowSize(int newWindowSize) throws Http2Exception {
655             if (newWindowSize < 0) {
656                 throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
657             }
658 
659             final int delta = newWindowSize - initialWindowSize;
660             initialWindowSize = newWindowSize;
661             connection.forEachActiveStream(new Http2StreamVisitor() {
662                 @Override
663                 public boolean visit(Http2Stream stream) throws Http2Exception {
664                     state(stream).incrementStreamWindow(delta);
665                     return true;
666                 }
667             });
668 
669             if (delta > 0 && isChannelWritable()) {
670                 // The window size increased, send any pending frames for all streams.
671                 writePendingBytes();
672             }
673         }
674 
675         final boolean isWritableConnection() {
676             return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
677         }
678     }
679 
680     /**
681      * Writability of a {@code stream} is calculated using the following:
682      * <pre>
683      * Connection Window - Total Queued Bytes > 0 &&
684      * Stream Window - Bytes Queued for Stream > 0 &&
685      * isChannelWritable()
686      * </pre>
687      */
688     private final class ListenerWritabilityMonitor extends WritabilityMonitor {
689         private final Listener listener;
690         private final Http2StreamVisitor checkStreamWritabilityVisitor = new Http2StreamVisitor() {
691             @Override
692             public boolean visit(Http2Stream stream) throws Http2Exception {
693                 FlowState state = state(stream);
694                 if (isWritable(state) != state.markedWritability()) {
695                     notifyWritabilityChanged(state);
696                 }
697                 return true;
698             }
699         };
700 
701         ListenerWritabilityMonitor(Listener listener) {
702             this.listener = listener;
703         }
704 
705         @Override
706         void windowSize(FlowState state, int initialWindowSize) {
707             super.windowSize(state, initialWindowSize);
708             try {
709                 checkStateWritability(state);
710             } catch (Http2Exception e) {
711                 throw new RuntimeException("Caught unexpected exception from window", e);
712             }
713         }
714 
715         @Override
716         void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
717             super.incrementWindowSize(state, delta);
718             checkStateWritability(state);
719         }
720 
721         @Override
722         void initialWindowSize(int newWindowSize) throws Http2Exception {
723             super.initialWindowSize(newWindowSize);
724             if (isWritableConnection()) {
725                 // If the write operation does not occur we still need to check all streams because they
726                 // may have transitioned from writable to not writable.
727                 checkAllWritabilityChanged();
728             }
729         }
730 
731         @Override
732         void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
733             super.enqueueFrame(state, frame);
734             checkConnectionThenStreamWritabilityChanged(state);
735         }
736 
737         @Override
738         void stateCancelled(FlowState state) {
739             try {
740                 checkConnectionThenStreamWritabilityChanged(state);
741             } catch (Http2Exception e) {
742                 throw new RuntimeException("Caught unexpected exception from checkAllWritabilityChanged", e);
743             }
744         }
745 
746         @Override
747         void channelWritabilityChange() throws Http2Exception {
748             if (connectionState.markedWritability() != isChannelWritable()) {
749                 checkAllWritabilityChanged();
750             }
751         }
752 
753         private void checkStateWritability(FlowState state) throws Http2Exception {
754             if (isWritable(state) != state.markedWritability()) {
755                 if (state == connectionState) {
756                     checkAllWritabilityChanged();
757                 } else {
758                     notifyWritabilityChanged(state);
759                 }
760             }
761         }
762 
763         private void notifyWritabilityChanged(FlowState state) {
764             state.markedWritability(!state.markedWritability());
765             try {
766                 listener.writabilityChanged(state.stream);
767             } catch (Throwable cause) {
768                 logger.error("Caught Throwable from listener.writabilityChanged", cause);
769             }
770         }
771 
772         private void checkConnectionThenStreamWritabilityChanged(FlowState state) throws Http2Exception {
773             // It is possible that the connection window and/or the individual stream writability could change.
774             if (isWritableConnection() != connectionState.markedWritability()) {
775                 checkAllWritabilityChanged();
776             } else if (isWritable(state) != state.markedWritability()) {
777                 notifyWritabilityChanged(state);
778             }
779         }
780 
781         private void checkAllWritabilityChanged() throws Http2Exception {
782             // Make sure we mark that we have notified as a result of this change.
783             connectionState.markedWritability(isWritableConnection());
784             connection.forEachActiveStream(checkStreamWritabilityVisitor);
785         }
786     }
787 }