View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.channel.ChannelHandlerContext;
18  import io.netty.util.internal.UnstableApi;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.util.ArrayDeque;
23  import java.util.Deque;
24  
25  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
26  import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
27  import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
28  import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
29  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
30  import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
31  import static io.netty.handler.codec.http2.Http2Exception.streamError;
32  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
33  import static io.netty.util.internal.ObjectUtil.checkNotNull;
34  import static java.lang.Math.max;
35  import static java.lang.Math.min;
36  
37  /**
38   * Basic implementation of {@link Http2RemoteFlowController}.
39   * <p>
40   * This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
41   * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
42   */
43  @UnstableApi
44  public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
45      private static final InternalLogger logger =
46              InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class);
47      private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
48      private final Http2Connection connection;
49      private final Http2Connection.PropertyKey stateKey;
50      private final StreamByteDistributor streamByteDistributor;
51      private final FlowState connectionState;
52      private int initialWindowSize = DEFAULT_WINDOW_SIZE;
53      private WritabilityMonitor monitor;
54      private ChannelHandlerContext ctx;
55  
56      public DefaultHttp2RemoteFlowController(Http2Connection connection) {
57          this(connection, (Listener) null);
58      }
59  
60      public DefaultHttp2RemoteFlowController(Http2Connection connection,
61                                              StreamByteDistributor streamByteDistributor) {
62          this(connection, streamByteDistributor, null);
63      }
64  
65      public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) {
66          this(connection, new WeightedFairQueueByteDistributor(connection), listener);
67      }
68  
69      public DefaultHttp2RemoteFlowController(Http2Connection connection,
70                                              StreamByteDistributor streamByteDistributor,
71                                              final Listener listener) {
72          this.connection = checkNotNull(connection, "connection");
73          this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor");
74  
75          // Add a flow state for the connection.
76          stateKey = connection.newKey();
77          connectionState = new FlowState(connection.connectionStream());
78          connection.connectionStream().setProperty(stateKey, connectionState);
79  
80          // Monitor may depend upon connectionState, and so initialize after connectionState
81          listener(listener);
82          monitor.windowSize(connectionState, initialWindowSize);
83  
84          // Register for notification of new streams.
85          connection.addListener(new Http2ConnectionAdapter() {
86              @Override
87              public void onStreamAdded(Http2Stream stream) {
88                  // If the stream state is not open then the stream is not yet eligible for flow controlled frames and
89                  // only requires the ReducedFlowState. Otherwise the full amount of memory is required.
90                  stream.setProperty(stateKey, new FlowState(stream));
91              }
92  
93              @Override
94              public void onStreamActive(Http2Stream stream) {
95                  // If the object was previously created, but later activated then we have to ensure the proper
96                  // initialWindowSize is used.
97                  monitor.windowSize(state(stream), initialWindowSize);
98              }
99  
100             @Override
101             public void onStreamClosed(Http2Stream stream) {
102                 // Any pending frames can never be written, cancel and
103                 // write errors for any pending frames.
104                 state(stream).cancel(STREAM_CLOSED, null);
105             }
106 
107             @Override
108             public void onStreamHalfClosed(Http2Stream stream) {
109                 if (HALF_CLOSED_LOCAL == stream.state()) {
110                     /**
111                      * When this method is called there should not be any
112                      * pending frames left if the API is used correctly. However,
113                      * it is possible that a erroneous application can sneak
114                      * in a frame even after having already written a frame with the
115                      * END_STREAM flag set, as the stream state might not transition
116                      * immediately to HALF_CLOSED_LOCAL / CLOSED due to flow control
117                      * delaying the write.
118                      *
119                      * This is to cancel any such illegal writes.
120                      */
121                     state(stream).cancel(STREAM_CLOSED, null);
122                 }
123             }
124         });
125     }
126 
127     /**
128      * {@inheritDoc}
129      * <p>
130      * Any queued {@link FlowControlled} objects will be sent.
131      */
132     @Override
133     public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
134         this.ctx = checkNotNull(ctx, "ctx");
135 
136         // Writing the pending bytes will not check writability change and instead a writability change notification
137         // to be provided by an explicit call.
138         channelWritabilityChanged();
139 
140         // Don't worry about cleaning up queued frames here if ctx is null. It is expected that all streams will be
141         // closed and the queue cleanup will occur when the stream state transitions occur.
142 
143         // If any frames have been queued up, we should send them now that we have a channel context.
144         if (isChannelWritable()) {
145             writePendingBytes();
146         }
147     }
148 
149     @Override
150     public ChannelHandlerContext channelHandlerContext() {
151         return ctx;
152     }
153 
154     @Override
155     public void initialWindowSize(int newWindowSize) throws Http2Exception {
156         assert ctx == null || ctx.executor().inEventLoop();
157         monitor.initialWindowSize(newWindowSize);
158     }
159 
160     @Override
161     public int initialWindowSize() {
162         return initialWindowSize;
163     }
164 
165     @Override
166     public int windowSize(Http2Stream stream) {
167         return state(stream).windowSize();
168     }
169 
170     @Override
171     public boolean isWritable(Http2Stream stream) {
172         return monitor.isWritable(state(stream));
173     }
174 
175     @Override
176     public void channelWritabilityChanged() throws Http2Exception {
177         monitor.channelWritabilityChange();
178     }
179 
180     @Override
181     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
182         // It is assumed there are all validated at a higher level. For example in the Http2FrameReader.
183         assert weight >= MIN_WEIGHT && weight <= MAX_WEIGHT : "Invalid weight";
184         assert childStreamId != parentStreamId : "A stream cannot depend on itself";
185         assert childStreamId > 0 && parentStreamId >= 0 : "childStreamId must be > 0. parentStreamId must be >= 0.";
186 
187         streamByteDistributor.updateDependencyTree(childStreamId, parentStreamId, weight, exclusive);
188     }
189 
190     private boolean isChannelWritable() {
191         return ctx != null && isChannelWritable0();
192     }
193 
194     private boolean isChannelWritable0() {
195         return ctx.channel().isWritable();
196     }
197 
198     @Override
199     public void listener(Listener listener) {
200         monitor = listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener);
201     }
202 
203     @Override
204     public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
205         assert ctx == null || ctx.executor().inEventLoop();
206         monitor.incrementWindowSize(state(stream), delta);
207     }
208 
209     @Override
210     public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
211         // The context can be null assuming the frame will be queued and send later when the context is set.
212         assert ctx == null || ctx.executor().inEventLoop();
213         checkNotNull(frame, "frame");
214         try {
215             monitor.enqueueFrame(state(stream), frame);
216         } catch (Throwable t) {
217             frame.error(ctx, t);
218         }
219     }
220 
221     @Override
222     public boolean hasFlowControlled(Http2Stream stream) {
223         return state(stream).hasFrame();
224     }
225 
226     private FlowState state(Http2Stream stream) {
227         return (FlowState) stream.getProperty(stateKey);
228     }
229 
230     /**
231      * Returns the flow control window for the entire connection.
232      */
233     private int connectionWindowSize() {
234         return connectionState.windowSize();
235     }
236 
237     private int minUsableChannelBytes() {
238         // The current allocation algorithm values "fairness" and doesn't give any consideration to "goodput". It
239         // is possible that 1 byte will be allocated to many streams. In an effort to try to make "goodput"
240         // reasonable with the current allocation algorithm we have this "cheap" check up front to ensure there is
241         // an "adequate" amount of connection window before allocation is attempted. This is not foolproof as if the
242         // number of streams is >= this minimal number then we may still have the issue, but the idea is to narrow the
243         // circumstances in which this can happen without rewriting the allocation algorithm.
244         return max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK);
245     }
246 
247     private int maxUsableChannelBytes() {
248         // If the channel isWritable, allow at least minUsableChannelBytes.
249         int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable());
250         int usableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
251 
252         // Clip the usable bytes by the connection window.
253         return min(connectionState.windowSize(), usableBytes);
254     }
255 
256     /**
257      * The amount of bytes that can be supported by underlying {@link io.netty.channel.Channel} without
258      * queuing "too-much".
259      */
260     private int writableBytes() {
261         return min(connectionWindowSize(), maxUsableChannelBytes());
262     }
263 
264     @Override
265     public void writePendingBytes() throws Http2Exception {
266         monitor.writePendingBytes();
267     }
268 
269     /**
270      * The remote flow control state for a single stream.
271      */
272     private final class FlowState implements StreamByteDistributor.StreamState {
273         private final Http2Stream stream;
274         private final Deque<FlowControlled> pendingWriteQueue;
275         private int window;
276         private long pendingBytes;
277         private boolean markedWritable;
278 
279         /**
280          * Set to true while a frame is being written, false otherwise.
281          */
282         private boolean writing;
283         /**
284          * Set to true if cancel() was called.
285          */
286         private boolean cancelled;
287 
288         FlowState(Http2Stream stream) {
289             this.stream = stream;
290             pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
291         }
292 
293         /**
294          * Determine if the stream associated with this object is writable.
295          * @return {@code true} if the stream associated with this object is writable.
296          */
297         boolean isWritable() {
298             return windowSize() > pendingBytes() && !cancelled;
299         }
300 
301         /**
302          * The stream this state is associated with.
303          */
304         @Override
305         public Http2Stream stream() {
306             return stream;
307         }
308 
309         /**
310          * Returns the parameter from the last call to {@link #markedWritability(boolean)}.
311          */
312         boolean markedWritability() {
313             return markedWritable;
314         }
315 
316         /**
317          * Save the state of writability.
318          */
319         void markedWritability(boolean isWritable) {
320             this.markedWritable = isWritable;
321         }
322 
323         @Override
324         public int windowSize() {
325             return window;
326         }
327 
328         /**
329          * Reset the window size for this stream.
330          */
331         void windowSize(int initialWindowSize) {
332             window = initialWindowSize;
333         }
334 
335         /**
336          * Write the allocated bytes for this stream.
337          * @return the number of bytes written for a stream or {@code -1} if no write occurred.
338          */
339         int writeAllocatedBytes(int allocated) {
340             final int initialAllocated = allocated;
341             int writtenBytes;
342             // In case an exception is thrown we want to remember it and pass it to cancel(Throwable).
343             Throwable cause = null;
344             FlowControlled frame;
345             try {
346                 assert !writing;
347                 writing = true;
348 
349                 // Write the remainder of frames that we are allowed to
350                 boolean writeOccurred = false;
351                 while (!cancelled && (frame = peek()) != null) {
352                     int maxBytes = min(allocated, writableWindow());
353                     if (maxBytes <= 0 && frame.size() > 0) {
354                         // The frame still has data, but the amount of allocated bytes has been exhausted.
355                         // Don't write needless empty frames.
356                         break;
357                     }
358                     writeOccurred = true;
359                     int initialFrameSize = frame.size();
360                     try {
361                         frame.write(ctx, max(0, maxBytes));
362                         if (frame.size() == 0) {
363                             // This frame has been fully written, remove this frame and notify it.
364                             // Since we remove this frame first, we're guaranteed that its error
365                             // method will not be called when we call cancel.
366                             pendingWriteQueue.remove();
367                             frame.writeComplete();
368                         }
369                     } finally {
370                         // Decrement allocated by how much was actually written.
371                         allocated -= initialFrameSize - frame.size();
372                     }
373                 }
374 
375                 if (!writeOccurred) {
376                     // Either there was no frame, or the amount of allocated bytes has been exhausted.
377                     return -1;
378                 }
379 
380             } catch (Throwable t) {
381                 // Mark the state as cancelled, we'll clear the pending queue via cancel() below.
382                 cancelled = true;
383                 cause = t;
384             } finally {
385                 writing = false;
386                 // Make sure we always decrement the flow control windows
387                 // by the bytes written.
388                 writtenBytes = initialAllocated - allocated;
389 
390                 decrementPendingBytes(writtenBytes, false);
391                 decrementFlowControlWindow(writtenBytes);
392 
393                 // If a cancellation occurred while writing, call cancel again to
394                 // clear and error all of the pending writes.
395                 if (cancelled) {
396                     cancel(INTERNAL_ERROR, cause);
397                 }
398             }
399             return writtenBytes;
400         }
401 
402         /**
403          * Increments the flow control window for this stream by the given delta and returns the new value.
404          */
405         int incrementStreamWindow(int delta) throws Http2Exception {
406             if (delta > 0 && Integer.MAX_VALUE - delta < window) {
407                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
408                         "Window size overflow for stream: %d", stream.id());
409             }
410             window += delta;
411 
412             streamByteDistributor.updateStreamableBytes(this);
413             return window;
414         }
415 
416         /**
417          * Returns the maximum writable window (minimum of the stream and connection windows).
418          */
419         private int writableWindow() {
420             return min(window, connectionWindowSize());
421         }
422 
423         @Override
424         public long pendingBytes() {
425             return pendingBytes;
426         }
427 
428         /**
429          * Adds the {@code frame} to the pending queue and increments the pending byte count.
430          */
431         void enqueueFrame(FlowControlled frame) {
432             FlowControlled last = pendingWriteQueue.peekLast();
433             if (last == null) {
434                 enqueueFrameWithoutMerge(frame);
435                 return;
436             }
437 
438             int lastSize = last.size();
439             if (last.merge(ctx, frame)) {
440                 incrementPendingBytes(last.size() - lastSize, true);
441                 return;
442             }
443             enqueueFrameWithoutMerge(frame);
444         }
445 
446         private void enqueueFrameWithoutMerge(FlowControlled frame) {
447             pendingWriteQueue.offer(frame);
448             // This must be called after adding to the queue in order so that hasFrame() is
449             // updated before updating the stream state.
450             incrementPendingBytes(frame.size(), true);
451         }
452 
453         @Override
454         public boolean hasFrame() {
455             return !pendingWriteQueue.isEmpty();
456         }
457 
458         /**
459          * Returns the head of the pending queue, or {@code null} if empty.
460          */
461         private FlowControlled peek() {
462             return pendingWriteQueue.peek();
463         }
464 
465         /**
466          * Clears the pending queue and writes errors for each remaining frame.
467          * @param error the {@link Http2Error} to use.
468          * @param cause the {@link Throwable} that caused this method to be invoked.
469          */
470         void cancel(Http2Error error, Throwable cause) {
471             cancelled = true;
472             // Ensure that the queue can't be modified while we are writing.
473             if (writing) {
474                 return;
475             }
476 
477             FlowControlled frame = pendingWriteQueue.poll();
478             if (frame != null) {
479                 // Only create exception once and reuse to reduce overhead of filling in the stacktrace.
480                 final Http2Exception exception = streamError(stream.id(), error, cause,
481                         "Stream closed before write could take place");
482                 do {
483                     writeError(frame, exception);
484                     frame = pendingWriteQueue.poll();
485                 } while (frame != null);
486             }
487 
488             streamByteDistributor.updateStreamableBytes(this);
489 
490             monitor.stateCancelled(this);
491         }
492 
493         /**
494          * Increments the number of pending bytes for this node and optionally updates the
495          * {@link StreamByteDistributor}.
496          */
497         private void incrementPendingBytes(int numBytes, boolean updateStreamableBytes) {
498             pendingBytes += numBytes;
499             monitor.incrementPendingBytes(numBytes);
500             if (updateStreamableBytes) {
501                 streamByteDistributor.updateStreamableBytes(this);
502             }
503         }
504 
505         /**
506          * If this frame is in the pending queue, decrements the number of pending bytes for the stream.
507          */
508         private void decrementPendingBytes(int bytes, boolean updateStreamableBytes) {
509             incrementPendingBytes(-bytes, updateStreamableBytes);
510         }
511 
512         /**
513          * Decrement the per stream and connection flow control window by {@code bytes}.
514          */
515         private void decrementFlowControlWindow(int bytes) {
516             try {
517                 int negativeBytes = -bytes;
518                 connectionState.incrementStreamWindow(negativeBytes);
519                 incrementStreamWindow(negativeBytes);
520             } catch (Http2Exception e) {
521                 // Should never get here since we're decrementing.
522                 throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e);
523             }
524         }
525 
526         /**
527          * Discards this {@link FlowControlled}, writing an error. If this frame is in the pending queue,
528          * the unwritten bytes are removed from this branch of the priority tree.
529          */
530         private void writeError(FlowControlled frame, Http2Exception cause) {
531             assert ctx != null;
532             decrementPendingBytes(frame.size(), true);
533             frame.error(ctx, cause);
534         }
535     }
536 
537     /**
538      * Abstract class which provides common functionality for writability monitor implementations.
539      */
540     private class WritabilityMonitor implements StreamByteDistributor.Writer {
541         private boolean inWritePendingBytes;
542         private long totalPendingBytes;
543 
544         @Override
545         public final void write(Http2Stream stream, int numBytes) {
546             state(stream).writeAllocatedBytes(numBytes);
547         }
548 
549         /**
550          * Called when the writability of the underlying channel changes.
551          * @throws Http2Exception If a write occurs and an exception happens in the write operation.
552          */
553         void channelWritabilityChange() throws Http2Exception { }
554 
555         /**
556          * Called when the state is cancelled.
557          * @param state the state that was cancelled.
558          */
559         void stateCancelled(FlowState state) { }
560 
561         /**
562          * Set the initial window size for {@code state}.
563          * @param state the state to change the initial window size for.
564          * @param initialWindowSize the size of the window in bytes.
565          */
566         void windowSize(FlowState state, int initialWindowSize) {
567             state.windowSize(initialWindowSize);
568         }
569 
570         /**
571          * Increment the window size for a particular stream.
572          * @param state the state associated with the stream whose window is being incremented.
573          * @param delta The amount to increment by.
574          * @throws Http2Exception If this operation overflows the window for {@code state}.
575          */
576         void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
577             state.incrementStreamWindow(delta);
578         }
579 
580         /**
581          * Add a frame to be sent via flow control.
582          * @param state The state associated with the stream which the {@code frame} is associated with.
583          * @param frame the frame to enqueue.
584          * @throws Http2Exception If a writability error occurs.
585          */
586         void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
587             state.enqueueFrame(frame);
588         }
589 
590         /**
591          * Increment the total amount of pending bytes for all streams. When any stream's pending bytes changes
592          * method should be called.
593          * @param delta The amount to increment by.
594          */
595         final void incrementPendingBytes(int delta) {
596             totalPendingBytes += delta;
597 
598             // Notification of writibilty change should be delayed until the end of the top level event.
599             // This is to ensure the flow controller is more consistent state before calling external listener methods.
600         }
601 
602         /**
603          * Determine if the stream associated with {@code state} is writable.
604          * @param state The state which is associated with the stream to test writability for.
605          * @return {@code true} if {@link FlowState#stream()} is writable. {@code false} otherwise.
606          */
607         final boolean isWritable(FlowState state) {
608             return isWritableConnection() && state.isWritable();
609         }
610 
611         final void writePendingBytes() throws Http2Exception {
612             // Reentry is not permitted during the byte distribution process. It may lead to undesirable distribution of
613             // bytes and even infinite loops. We protect against reentry and make sure each call has an opportunity to
614             // cause a distribution to occur. This may be useful for example if the channel's writability changes from
615             // Writable -> Not Writable (because we are writing) -> Writable (because the user flushed to make more room
616             // in the channel outbound buffer).
617             if (inWritePendingBytes) {
618                 return;
619             }
620             inWritePendingBytes = true;
621             try {
622                 int bytesToWrite = writableBytes();
623                 // Make sure we always write at least once, regardless if we have bytesToWrite or not.
624                 // This ensures that zero-length frames will always be written.
625                 for (;;) {
626                     if (!streamByteDistributor.distribute(bytesToWrite, this) ||
627                         (bytesToWrite = writableBytes()) <= 0 ||
628                         !isChannelWritable0()) {
629                         break;
630                     }
631                 }
632             } finally {
633                 inWritePendingBytes = false;
634             }
635         }
636 
637         void initialWindowSize(int newWindowSize) throws Http2Exception {
638             if (newWindowSize < 0) {
639                 throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
640             }
641 
642             final int delta = newWindowSize - initialWindowSize;
643             initialWindowSize = newWindowSize;
644             connection.forEachActiveStream(new Http2StreamVisitor() {
645                 @Override
646                 public boolean visit(Http2Stream stream) throws Http2Exception {
647                     state(stream).incrementStreamWindow(delta);
648                     return true;
649                 }
650             });
651 
652             if (delta > 0 && isChannelWritable()) {
653                 // The window size increased, send any pending frames for all streams.
654                 writePendingBytes();
655             }
656         }
657 
658         final boolean isWritableConnection() {
659             return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
660         }
661     }
662 
663     /**
664      * Writability of a {@code stream} is calculated using the following:
665      * <pre>
666      * Connection Window - Total Queued Bytes > 0 &&
667      * Stream Window - Bytes Queued for Stream > 0 &&
668      * isChannelWritable()
669      * </pre>
670      */
671     private final class ListenerWritabilityMonitor extends WritabilityMonitor implements Http2StreamVisitor {
672         private final Listener listener;
673 
674         ListenerWritabilityMonitor(Listener listener) {
675             this.listener = listener;
676         }
677 
678         @Override
679         public boolean visit(Http2Stream stream) throws Http2Exception {
680             FlowState state = state(stream);
681             if (isWritable(state) != state.markedWritability()) {
682                 notifyWritabilityChanged(state);
683             }
684             return true;
685         }
686 
687         @Override
688         void windowSize(FlowState state, int initialWindowSize) {
689             super.windowSize(state, initialWindowSize);
690             try {
691                 checkStateWritability(state);
692             } catch (Http2Exception e) {
693                 throw new RuntimeException("Caught unexpected exception from window", e);
694             }
695         }
696 
697         @Override
698         void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
699             super.incrementWindowSize(state, delta);
700             checkStateWritability(state);
701         }
702 
703         @Override
704         void initialWindowSize(int newWindowSize) throws Http2Exception {
705             super.initialWindowSize(newWindowSize);
706             if (isWritableConnection()) {
707                 // If the write operation does not occur we still need to check all streams because they
708                 // may have transitioned from writable to not writable.
709                 checkAllWritabilityChanged();
710             }
711         }
712 
713         @Override
714         void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
715             super.enqueueFrame(state, frame);
716             checkConnectionThenStreamWritabilityChanged(state);
717         }
718 
719         @Override
720         void stateCancelled(FlowState state) {
721             try {
722                 checkConnectionThenStreamWritabilityChanged(state);
723             } catch (Http2Exception e) {
724                 throw new RuntimeException("Caught unexpected exception from checkAllWritabilityChanged", e);
725             }
726         }
727 
728         @Override
729         void channelWritabilityChange() throws Http2Exception {
730             if (connectionState.markedWritability() != isChannelWritable()) {
731                 checkAllWritabilityChanged();
732             }
733         }
734 
735         private void checkStateWritability(FlowState state) throws Http2Exception {
736             if (isWritable(state) != state.markedWritability()) {
737                 if (state == connectionState) {
738                     checkAllWritabilityChanged();
739                 } else {
740                     notifyWritabilityChanged(state);
741                 }
742             }
743         }
744 
745         private void notifyWritabilityChanged(FlowState state) {
746             state.markedWritability(!state.markedWritability());
747             try {
748                 listener.writabilityChanged(state.stream);
749             } catch (Throwable cause) {
750                 logger.error("Caught Throwable from listener.writabilityChanged", cause);
751             }
752         }
753 
754         private void checkConnectionThenStreamWritabilityChanged(FlowState state) throws Http2Exception {
755             // It is possible that the connection window and/or the individual stream writability could change.
756             if (isWritableConnection() != connectionState.markedWritability()) {
757                 checkAllWritabilityChanged();
758             } else if (isWritable(state) != state.markedWritability()) {
759                 notifyWritabilityChanged(state);
760             }
761         }
762 
763         private void checkAllWritabilityChanged() throws Http2Exception {
764             // Make sure we mark that we have notified as a result of this change.
765             connectionState.markedWritability(isWritableConnection());
766             connection.forEachActiveStream(this);
767         }
768     }
769 }