View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.channel.ChannelFuture;
19  import io.netty.channel.ChannelFutureListener;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.CoalescingBufferQueue;
23  import io.netty.handler.codec.http.HttpStatusClass;
24  import io.netty.handler.codec.http2.Http2CodecUtil.SimpleChannelPromiseAggregator;
25  
26  import java.util.ArrayDeque;
27  import java.util.Queue;
28  
29  import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
30  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
31  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
32  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
33  import static io.netty.util.internal.ObjectUtil.checkNotNull;
34  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
35  import static java.lang.Integer.MAX_VALUE;
36  import static java.lang.Math.min;
37  
38  /**
39   * Default implementation of {@link Http2ConnectionEncoder}.
40   */
41  public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Http2SettingsReceivedConsumer {
42      private final Http2FrameWriter frameWriter;
43      private final Http2Connection connection;
44      private Http2LifecycleManager lifecycleManager;
45      // We prefer ArrayDeque to LinkedList because later will produce more GC.
46      // This initial capacity is plenty for SETTINGS traffic.
47      private final Queue<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
48      private Queue<Http2Settings> outstandingRemoteSettingsQueue;
49  
50      public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) {
51          this.connection = checkNotNull(connection, "connection");
52          this.frameWriter = checkNotNull(frameWriter, "frameWriter");
53          if (connection.remote().flowController() == null) {
54              connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
55          }
56      }
57  
58      @Override
59      public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
60          this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
61      }
62  
63      @Override
64      public Http2FrameWriter frameWriter() {
65          return frameWriter;
66      }
67  
68      @Override
69      public Http2Connection connection() {
70          return connection;
71      }
72  
73      @Override
74      public final Http2RemoteFlowController flowController() {
75          return connection().remote().flowController();
76      }
77  
78      @Override
79      public void remoteSettings(Http2Settings settings) throws Http2Exception {
80          Boolean pushEnabled = settings.pushEnabled();
81          Http2FrameWriter.Configuration config = configuration();
82          Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
83          Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
84          if (pushEnabled != null) {
85              if (!connection.isServer() && pushEnabled) {
86                  throw connectionError(PROTOCOL_ERROR,
87                      "Client received a value of ENABLE_PUSH specified to other than 0");
88              }
89              connection.remote().allowPushTo(pushEnabled);
90          }
91  
92          Long maxConcurrentStreams = settings.maxConcurrentStreams();
93          if (maxConcurrentStreams != null) {
94              connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
95          }
96  
97          Long headerTableSize = settings.headerTableSize();
98          if (headerTableSize != null) {
99              outboundHeaderConfig.maxHeaderTableSize(headerTableSize);
100         }
101 
102         Long maxHeaderListSize = settings.maxHeaderListSize();
103         if (maxHeaderListSize != null) {
104             outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
105         }
106 
107         Integer maxFrameSize = settings.maxFrameSize();
108         if (maxFrameSize != null) {
109             outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
110         }
111 
112         Integer initialWindowSize = settings.initialWindowSize();
113         if (initialWindowSize != null) {
114             flowController().initialWindowSize(initialWindowSize);
115         }
116     }
117 
118     @Override
119     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
120             final boolean endOfStream, ChannelPromise promise) {
121         promise = promise.unvoid();
122         final Http2Stream stream;
123         try {
124             stream = requireStream(streamId);
125 
126             // Verify that the stream is in the appropriate state for sending DATA frames.
127             switch (stream.state()) {
128                 case OPEN:
129                 case HALF_CLOSED_REMOTE:
130                     // Allowed sending DATA frames in these states.
131                     break;
132                 default:
133                     throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " + stream.state());
134             }
135         } catch (Throwable e) {
136             data.release();
137             return promise.setFailure(e);
138         }
139 
140         // Hand control of the frame to the flow controller.
141         flowController().addFlowControlled(stream,
142                 new FlowControlledData(stream, data, padding, endOfStream, promise));
143         return promise;
144     }
145 
146     @Override
147     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
148             boolean endStream, ChannelPromise promise) {
149         return writeHeaders0(ctx, streamId, headers, false, 0, (short) 0, false, padding, endStream, promise);
150     }
151 
152     private static boolean validateHeadersSentState(Http2Stream stream, Http2Headers headers, boolean isServer,
153                                                     boolean endOfStream) {
154         boolean isInformational = isServer && HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
155         if ((isInformational || !endOfStream) && stream.isHeadersSent() || stream.isTrailersSent()) {
156             throw new IllegalStateException("Stream " + stream.id() + " sent too many headers EOS: " + endOfStream);
157         }
158         return isInformational;
159     }
160 
161     @Override
162     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId,
163             final Http2Headers headers, final int streamDependency, final short weight,
164             final boolean exclusive, final int padding, final boolean endOfStream, ChannelPromise promise) {
165         return writeHeaders0(ctx, streamId, headers, true, streamDependency,
166                 weight, exclusive, padding, endOfStream, promise);
167     }
168 
169     /**
170      * Write headers via {@link Http2FrameWriter}. If {@code hasPriority} is {@code false} it will ignore the
171      * {@code streamDependency}, {@code weight} and {@code exclusive} parameters.
172      */
173     private static ChannelFuture sendHeaders(Http2FrameWriter frameWriter, ChannelHandlerContext ctx, int streamId,
174                                        Http2Headers headers, final boolean hasPriority,
175                                        int streamDependency, final short weight,
176                                        boolean exclusive, final int padding,
177                                        boolean endOfStream, ChannelPromise promise) {
178         if (hasPriority) {
179             return frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
180                     weight, exclusive, padding, endOfStream, promise);
181         }
182         return frameWriter.writeHeaders(ctx, streamId, headers, padding, endOfStream, promise);
183     }
184 
185     private ChannelFuture writeHeaders0(final ChannelHandlerContext ctx, final int streamId,
186                                         final Http2Headers headers, final boolean hasPriority,
187                                         final int streamDependency, final short weight,
188                                         final boolean exclusive, final int padding,
189                                         final boolean endOfStream, ChannelPromise promise) {
190         try {
191             Http2Stream stream = connection.stream(streamId);
192             if (stream == null) {
193                 try {
194                     // We don't create the stream in a `halfClosed` state because if this is an initial
195                     // HEADERS frame we don't want the connection state to signify that the HEADERS have
196                     // been sent until after they have been encoded and placed in the outbound buffer.
197                     // Therefore, we let the `LifeCycleManager` will take care of transitioning the state
198                     // as appropriate.
199                     stream = connection.local().createStream(streamId, /*endOfStream*/ false);
200                 } catch (Http2Exception cause) {
201                     if (connection.remote().mayHaveCreatedStream(streamId)) {
202                         promise.tryFailure(new IllegalStateException("Stream no longer exists: " + streamId, cause));
203                         return promise;
204                     }
205                     throw cause;
206                 }
207             } else {
208                 switch (stream.state()) {
209                     case RESERVED_LOCAL:
210                         stream.open(endOfStream);
211                         break;
212                     case OPEN:
213                     case HALF_CLOSED_REMOTE:
214                         // Allowed sending headers in these states.
215                         break;
216                     default:
217                         throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " +
218                                                         stream.state());
219                 }
220             }
221 
222             // Trailing headers must go through flow control if there are other frames queued in flow control
223             // for this stream.
224             Http2RemoteFlowController flowController = flowController();
225             if (!endOfStream || !flowController.hasFlowControlled(stream)) {
226                 // The behavior here should mirror that in FlowControlledHeaders
227 
228                 promise = promise.unvoid();
229                 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
230 
231                 ChannelFuture future = sendHeaders(frameWriter, ctx, streamId, headers, hasPriority, streamDependency,
232                         weight, exclusive, padding, endOfStream, promise);
233 
234                 // Writing headers may fail during the encode state if they violate HPACK limits.
235                 Throwable failureCause = future.cause();
236                 if (failureCause == null) {
237                     // Synchronously set the headersSent flag to ensure that we do not subsequently write
238                     // other headers containing pseudo-header fields.
239                     //
240                     // This just sets internal stream state which is used elsewhere in the codec and doesn't
241                     // necessarily mean the write will complete successfully.
242                     stream.headersSent(isInformational);
243 
244                     if (!future.isSuccess()) {
245                         // Either the future is not done or failed in the meantime.
246                         notifyLifecycleManagerOnError(future, ctx);
247                     }
248                 } else {
249                     lifecycleManager.onError(ctx, true, failureCause);
250                 }
251 
252                 if (endOfStream) {
253                     // Must handle calling onError before calling closeStreamLocal, otherwise the error handler will
254                     // incorrectly think the stream no longer exists and so may not send RST_STREAM or perform similar
255                     // appropriate action.
256                     lifecycleManager.closeStreamLocal(stream, future);
257                 }
258 
259                 return future;
260             } else {
261                 // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
262                 flowController.addFlowControlled(stream,
263                         new FlowControlledHeaders(stream, headers, hasPriority, streamDependency,
264                                 weight, exclusive, padding, true, promise));
265                 return promise;
266             }
267         } catch (Throwable t) {
268             lifecycleManager.onError(ctx, true, t);
269             promise.tryFailure(t);
270             return promise;
271         }
272     }
273 
274     @Override
275     public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
276             boolean exclusive, ChannelPromise promise) {
277         return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
278     }
279 
280     @Override
281     public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
282             ChannelPromise promise) {
283         // Delegate to the lifecycle manager for proper updating of connection state.
284         return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);
285     }
286 
287     @Override
288     public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
289             ChannelPromise promise) {
290         outstandingLocalSettingsQueue.add(settings);
291         try {
292             Boolean pushEnabled = settings.pushEnabled();
293             if (pushEnabled != null && connection.isServer()) {
294                 throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
295             }
296         } catch (Throwable e) {
297             return promise.setFailure(e);
298         }
299 
300         return frameWriter.writeSettings(ctx, settings, promise);
301     }
302 
303     @Override
304     public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
305         if (outstandingRemoteSettingsQueue == null) {
306             return frameWriter.writeSettingsAck(ctx, promise);
307         }
308         Http2Settings settings = outstandingRemoteSettingsQueue.poll();
309         if (settings == null) {
310             return promise.setFailure(new Http2Exception(INTERNAL_ERROR, "attempted to write a SETTINGS ACK with no " +
311                     " pending SETTINGS"));
312         }
313         SimpleChannelPromiseAggregator aggregator = new SimpleChannelPromiseAggregator(promise, ctx.channel(),
314                 ctx.executor());
315         // Acknowledge receipt of the settings. We should do this before we process the settings to ensure our
316         // remote peer applies these settings before any subsequent frames that we may send which depend upon
317         // these new settings. See https://github.com/netty/netty/issues/6520.
318         frameWriter.writeSettingsAck(ctx, aggregator.newPromise());
319 
320         // We create a "new promise" to make sure that status from both the write and the application are taken into
321         // account independently.
322         ChannelPromise applySettingsPromise = aggregator.newPromise();
323         try {
324             remoteSettings(settings);
325             applySettingsPromise.setSuccess();
326         } catch (Throwable e) {
327             applySettingsPromise.setFailure(e);
328             lifecycleManager.onError(ctx, true, e);
329         }
330         return aggregator.doneAllocatingPromises();
331     }
332 
333     @Override
334     public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
335         return frameWriter.writePing(ctx, ack, data, promise);
336     }
337 
338     @Override
339     public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
340             Http2Headers headers, int padding, ChannelPromise promise) {
341         try {
342             if (connection.goAwayReceived()) {
343                 throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
344             }
345 
346             Http2Stream stream = requireStream(streamId);
347             // Reserve the promised stream.
348             connection.local().reservePushStream(promisedStreamId, stream);
349 
350             promise = promise.unvoid();
351             ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
352                                                                 promise);
353             // Writing headers may fail during the encode state if they violate HPACK limits.
354             Throwable failureCause = future.cause();
355             if (failureCause == null) {
356                 // This just sets internal stream state which is used elsewhere in the codec and doesn't
357                 // necessarily mean the write will complete successfully.
358                 stream.pushPromiseSent();
359 
360                 if (!future.isSuccess()) {
361                     // Either the future is not done or failed in the meantime.
362                     notifyLifecycleManagerOnError(future, ctx);
363                 }
364             } else {
365                 lifecycleManager.onError(ctx, true, failureCause);
366             }
367             return future;
368         } catch (Throwable t) {
369             lifecycleManager.onError(ctx, true, t);
370             promise.tryFailure(t);
371             return promise;
372         }
373     }
374 
375     @Override
376     public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
377             ChannelPromise promise) {
378         return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);
379     }
380 
381     @Override
382     public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
383             ChannelPromise promise) {
384         return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
385                 " objects to control window sizes"));
386     }
387 
388     @Override
389     public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
390             ByteBuf payload, ChannelPromise promise) {
391         return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise);
392     }
393 
394     @Override
395     public void close() {
396         frameWriter.close();
397     }
398 
399     @Override
400     public Http2Settings pollSentSettings() {
401         return outstandingLocalSettingsQueue.poll();
402     }
403 
404     @Override
405     public Configuration configuration() {
406         return frameWriter.configuration();
407     }
408 
409     private Http2Stream requireStream(int streamId) {
410         Http2Stream stream = connection.stream(streamId);
411         if (stream == null) {
412             final String message;
413             if (connection.streamMayHaveExisted(streamId)) {
414                 message = "Stream no longer exists: " + streamId;
415             } else {
416                 message = "Stream does not exist: " + streamId;
417             }
418             throw new IllegalArgumentException(message);
419         }
420         return stream;
421     }
422 
423     @Override
424     public void consumeReceivedSettings(Http2Settings settings) {
425         if (outstandingRemoteSettingsQueue == null) {
426             outstandingRemoteSettingsQueue = new ArrayDeque<Http2Settings>(2);
427         }
428         outstandingRemoteSettingsQueue.add(settings);
429     }
430 
431     /**
432      * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
433      * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
434      * {@link #size} calculation deterministic thereby greatly simplifying the implementation.
435      * <p>
436      * If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that
437      * the passed promise is not completed until last frame write.
438      * </p>
439      */
440     private final class FlowControlledData extends FlowControlledBase {
441         private final CoalescingBufferQueue queue;
442         private int dataSize;
443 
444         FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream,
445                                    ChannelPromise promise) {
446             super(stream, padding, endOfStream, promise);
447             queue = new CoalescingBufferQueue(promise.channel());
448             queue.add(buf, promise);
449             dataSize = queue.readableBytes();
450         }
451 
452         @Override
453         public int size() {
454             return dataSize + padding;
455         }
456 
457         @Override
458         public void error(ChannelHandlerContext ctx, Throwable cause) {
459             queue.releaseAndFailAll(cause);
460             // Don't update dataSize because we need to ensure the size() method returns a consistent size even after
461             // error so we don't invalidate flow control when returning bytes to flow control.
462             //
463             // That said we will set dataSize and padding to 0 in the write(...) method if we cleared the queue
464             // because of an error.
465             lifecycleManager.onError(ctx, true, cause);
466         }
467 
468         @Override
469         public void write(ChannelHandlerContext ctx, int allowedBytes) {
470             int queuedData = queue.readableBytes();
471             if (!endOfStream) {
472                 if (queuedData == 0) {
473                     if (queue.isEmpty()) {
474                         // When the queue is empty it means we did clear it because of an error(...) call
475                         // (as otherwise we will have at least 1 entry in there), which will happen either when called
476                         // explicit or when the write itself fails. In this case just set dataSize and padding to 0
477                         // which will signal back that the whole frame was consumed.
478                         //
479                         // See https://github.com/netty/netty/issues/8707.
480                         padding = dataSize = 0;
481                     } else {
482                         // There's no need to write any data frames because there are only empty data frames in the
483                         // queue and it is not end of stream yet. Just complete their promises by getting the buffer
484                         // corresponding to 0 bytes and writing it to the channel (to preserve notification order).
485                         ChannelPromise writePromise = ctx.newPromise().addListener(this);
486                         ctx.write(queue.remove(0, writePromise), writePromise);
487                     }
488                     return;
489                 }
490 
491                 if (allowedBytes == 0) {
492                     return;
493                 }
494             }
495 
496             // Determine how much data to write.
497             int writableData = min(queuedData, allowedBytes);
498             ChannelPromise writePromise = ctx.newPromise().addListener(this);
499             ByteBuf toWrite = queue.remove(writableData, writePromise);
500             dataSize = queue.readableBytes();
501 
502             // Determine how much padding to write.
503             int writablePadding = min(allowedBytes - writableData, padding);
504             padding -= writablePadding;
505 
506             // Write the frame(s).
507             frameWriter().writeData(ctx, stream.id(), toWrite, writablePadding,
508                     endOfStream && size() == 0, writePromise);
509         }
510 
511         @Override
512         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
513             FlowControlledData nextData;
514             if (FlowControlledData.class != next.getClass() ||
515                 MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
516                 return false;
517             }
518             nextData.queue.copyTo(queue);
519             dataSize = queue.readableBytes();
520             // Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
521             padding = Math.max(padding, nextData.padding);
522             endOfStream = nextData.endOfStream;
523             return true;
524         }
525     }
526 
527     private void notifyLifecycleManagerOnError(ChannelFuture future, final ChannelHandlerContext ctx) {
528         future.addListener(new ChannelFutureListener() {
529             @Override
530             public void operationComplete(ChannelFuture future) throws Exception {
531                 Throwable cause = future.cause();
532                 if (cause != null) {
533                     lifecycleManager.onError(ctx, true, cause);
534                 }
535             }
536         });
537     }
538 
539     /**
540      * Wrap headers so they can be written subject to flow-control. While headers do not have cost against the
541      * flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is
542      * blocked on flow-control a HEADER frame must wait until this frame has been written.
543      */
544     private final class FlowControlledHeaders extends FlowControlledBase {
545         private final Http2Headers headers;
546         private final boolean hasPriority;
547         private final int streamDependency;
548         private final short weight;
549         private final boolean exclusive;
550 
551         FlowControlledHeaders(Http2Stream stream, Http2Headers headers, boolean hasPriority,
552                               int streamDependency, short weight, boolean exclusive,
553                               int padding, boolean endOfStream, ChannelPromise promise) {
554             super(stream, padding, endOfStream, promise.unvoid());
555             this.headers = headers;
556             this.hasPriority = hasPriority;
557             this.streamDependency = streamDependency;
558             this.weight = weight;
559             this.exclusive = exclusive;
560         }
561 
562         @Override
563         public int size() {
564             return 0;
565         }
566 
567         @Override
568         public void error(ChannelHandlerContext ctx, Throwable cause) {
569             if (ctx != null) {
570                 lifecycleManager.onError(ctx, true, cause);
571             }
572             promise.tryFailure(cause);
573         }
574 
575         @Override
576         public void write(ChannelHandlerContext ctx, int allowedBytes) {
577             boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
578             // The code is currently requiring adding this listener before writing, in order to call onError() before
579             // closeStreamLocal().
580             promise.addListener(this);
581 
582             ChannelFuture f = sendHeaders(frameWriter, ctx, stream.id(), headers, hasPriority, streamDependency,
583                     weight, exclusive, padding, endOfStream, promise);
584             // Writing headers may fail during the encode state if they violate HPACK limits.
585             Throwable failureCause = f.cause();
586             if (failureCause == null) {
587                 // This just sets internal stream state which is used elsewhere in the codec and doesn't
588                 // necessarily mean the write will complete successfully.
589                 stream.headersSent(isInformational);
590             }
591         }
592 
593         @Override
594         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
595             return false;
596         }
597     }
598 
599     /**
600      * Common base type for payloads to deliver via flow-control.
601      */
602     public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
603             ChannelFutureListener {
604         protected final Http2Stream stream;
605         protected ChannelPromise promise;
606         protected boolean endOfStream;
607         protected int padding;
608 
609         FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream,
610                 final ChannelPromise promise) {
611             checkPositiveOrZero(padding, "padding");
612             this.padding = padding;
613             this.endOfStream = endOfStream;
614             this.stream = stream;
615             this.promise = promise;
616         }
617 
618         @Override
619         public void writeComplete() {
620             if (endOfStream) {
621                 lifecycleManager.closeStreamLocal(stream, promise);
622             }
623         }
624 
625         @Override
626         public void operationComplete(ChannelFuture future) throws Exception {
627             if (!future.isSuccess()) {
628                 error(flowController().channelHandlerContext(), future.cause());
629             }
630         }
631     }
632 }