View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.handler.codec.http2;
16  
17  import io.netty5.buffer.api.Buffer;
18  import io.netty5.channel.Channel;
19  import io.netty5.channel.ChannelHandlerContext;
20  import io.netty5.channel.CoalescingBufferQueue;
21  import io.netty5.handler.codec.http.HttpStatusClass;
22  import io.netty5.handler.codec.http2.Http2CodecUtil.SimpleChannelPromiseAggregator;
23  import io.netty5.util.concurrent.Future;
24  import io.netty5.util.concurrent.FutureListener;
25  import io.netty5.util.concurrent.Promise;
26  import io.netty5.util.internal.UnstableApi;
27  
28  import java.util.ArrayDeque;
29  import java.util.Queue;
30  
31  import static io.netty5.handler.codec.http.HttpStatusClass.INFORMATIONAL;
32  import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
33  import static io.netty5.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
34  import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
35  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
36  import static java.lang.Integer.MAX_VALUE;
37  import static java.lang.Math.min;
38  import static java.util.Objects.requireNonNull;
39  
40  /**
41   * Default implementation of {@link Http2ConnectionEncoder}.
42   */
43  @UnstableApi
44  public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Http2SettingsReceivedConsumer {
45      private final Http2FrameWriter frameWriter;
46      private final Http2Connection connection;
47      private Http2LifecycleManager lifecycleManager;
48      // We prefer ArrayDeque to LinkedList because later will produce more GC.
49      // This initial capacity is plenty for SETTINGS traffic.
50      private final Queue<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<>(4);
51      private Queue<Http2Settings> outstandingRemoteSettingsQueue;
52  
53      public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) {
54          this.connection = requireNonNull(connection, "connection");
55          this.frameWriter = requireNonNull(frameWriter, "frameWriter");
56          if (connection.remote().flowController() == null) {
57              connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
58          }
59      }
60  
61      @Override
62      public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
63          this.lifecycleManager = requireNonNull(lifecycleManager, "lifecycleManager");
64      }
65  
66      @Override
67      public Http2FrameWriter frameWriter() {
68          return frameWriter;
69      }
70  
71      @Override
72      public Http2Connection connection() {
73          return connection;
74      }
75  
76      @Override
77      public final Http2RemoteFlowController flowController() {
78          return connection().remote().flowController();
79      }
80  
81      @Override
82      public void remoteSettings(Http2Settings settings) throws Http2Exception {
83          Boolean pushEnabled = settings.pushEnabled();
84          Http2FrameWriter.Configuration config = configuration();
85          Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
86          Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
87          if (pushEnabled != null) {
88              if (!connection.isServer() && pushEnabled) {
89                  throw connectionError(PROTOCOL_ERROR,
90                      "Client received a value of ENABLE_PUSH specified to other than 0");
91              }
92              connection.remote().allowPushTo(pushEnabled);
93          }
94  
95          Long maxConcurrentStreams = settings.maxConcurrentStreams();
96          if (maxConcurrentStreams != null) {
97              connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
98          }
99  
100         Long headerTableSize = settings.headerTableSize();
101         if (headerTableSize != null) {
102             outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE));
103         }
104 
105         Long maxHeaderListSize = settings.maxHeaderListSize();
106         if (maxHeaderListSize != null) {
107             outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
108         }
109 
110         Integer maxFrameSize = settings.maxFrameSize();
111         if (maxFrameSize != null) {
112             outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
113         }
114 
115         Integer initialWindowSize = settings.initialWindowSize();
116         if (initialWindowSize != null) {
117             flowController().initialWindowSize(initialWindowSize);
118         }
119     }
120 
121     @Override
122     public Future<Void> writeData(final ChannelHandlerContext ctx, final int streamId, Buffer data, int padding,
123                                   final boolean endOfStream) {
124         final Http2Stream stream;
125         try {
126             stream = requireStream(streamId);
127 
128             // Verify that the stream is in the appropriate state for sending DATA frames.
129             switch (stream.state()) {
130                 case OPEN:
131                 case HALF_CLOSED_REMOTE:
132                     // Allowed sending DATA frames in these states.
133                     break;
134                 default:
135                     throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " + stream.state());
136             }
137         } catch (Throwable e) {
138             data.close();
139             return ctx.newFailedFuture(e);
140         }
141 
142         Promise<Void> promise = ctx.newPromise();
143         // Hand control of the frame to the flow controller.
144         flowController().addFlowControlled(stream,
145                 new FlowControlledData(stream, data, padding, endOfStream, promise, ctx.channel()));
146         return promise.asFuture();
147     }
148 
149     @Override
150     public Future<Void> writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
151                                      boolean endStream) {
152         return writeHeaders0(ctx, streamId, headers, false, 0, (short) 0, false, padding, endStream);
153     }
154 
155     private static boolean validateHeadersSentState(Http2Stream stream, Http2Headers headers, boolean isServer,
156                                                     boolean endOfStream) {
157         boolean isInformational = isServer && HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
158         if ((isInformational || !endOfStream) && stream.isHeadersSent() || stream.isTrailersSent()) {
159             throw new IllegalStateException("Stream " + stream.id() + " sent too many headers EOS: " + endOfStream);
160         }
161         return isInformational;
162     }
163 
164     @Override
165     public Future<Void> writeHeaders(final ChannelHandlerContext ctx, final int streamId,
166                                      final Http2Headers headers, final int streamDependency, final short weight,
167                                      final boolean exclusive, final int padding, final boolean endOfStream) {
168         return writeHeaders0(ctx, streamId, headers, true, streamDependency,
169                 weight, exclusive, padding, endOfStream);
170     }
171 
172     /**
173      * Write headers via {@link Http2FrameWriter}. If {@code hasPriority} is {@code false} it will ignore the
174      * {@code streamDependency}, {@code weight} and {@code exclusive} parameters.
175      */
176     private static Future<Void> sendHeaders(Http2FrameWriter frameWriter, ChannelHandlerContext ctx, int streamId,
177                                        Http2Headers headers, final boolean hasPriority,
178                                        int streamDependency, final short weight,
179                                        boolean exclusive, final int padding,
180                                        boolean endOfStream) {
181         if (hasPriority) {
182             return frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
183                     weight, exclusive, padding, endOfStream);
184         }
185         return frameWriter.writeHeaders(ctx, streamId, headers, padding, endOfStream);
186     }
187 
188     private Future<Void> writeHeaders0(final ChannelHandlerContext ctx, final int streamId,
189                                         final Http2Headers headers, final boolean hasPriority,
190                                         final int streamDependency, final short weight,
191                                         final boolean exclusive, final int padding,
192                                         final boolean endOfStream) {
193         try {
194             Http2Stream stream = connection.stream(streamId);
195             if (stream == null) {
196                 try {
197                     // We don't create the stream in a `halfClosed` state because if this is an initial
198                     // HEADERS frame we don't want the connection state to signify that the HEADERS have
199                     // been sent until after they have been encoded and placed in the outbound buffer.
200                     // Therefore, we let the `LifeCycleManager` will take care of transitioning the state
201                     // as appropriate.
202                     stream = connection.local().createStream(streamId, /*endOfStream*/ false);
203                 } catch (Http2Exception cause) {
204                     if (connection.remote().mayHaveCreatedStream(streamId)) {
205                         return ctx.newFailedFuture(
206                                 new IllegalStateException("Stream no longer exists: " + streamId, cause));
207                     }
208                     throw cause;
209                 }
210             } else {
211                 switch (stream.state()) {
212                     case RESERVED_LOCAL:
213                         stream.open(endOfStream);
214                         break;
215                     case OPEN:
216                     case HALF_CLOSED_REMOTE:
217                         // Allowed sending headers in these states.
218                         break;
219                     default:
220                         throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " +
221                                                         stream.state());
222                 }
223             }
224 
225             // Trailing headers must go through flow control if there are other frames queued in flow control
226             // for this stream.
227             Http2RemoteFlowController flowController = flowController();
228             if (!endOfStream || !flowController.hasFlowControlled(stream)) {
229                 // The behavior here should mirror that in FlowControlledHeaders
230 
231                 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
232 
233                 Future<Void> future = sendHeaders(frameWriter, ctx, streamId, headers, hasPriority, streamDependency,
234                         weight, exclusive, padding, endOfStream);
235 
236                 // Writing headers may fail during the encode state if they violate HPACK limits.
237 
238                 if (future.isSuccess() || !future.isDone()) {
239                     // Synchronously set the headersSent flag to ensure that we do not subsequently write
240                     // other headers containing pseudo-header fields.
241                     //
242                     // This just sets internal stream state which is used elsewhere in the codec and doesn't
243                     // necessarily mean the write will complete successfully.
244                     stream.headersSent(isInformational);
245 
246                     if (!future.isSuccess()) {
247                         // Either the future is not done or failed in the meantime.
248                         notifyLifecycleManagerOnError(future, ctx);
249                     }
250                 } else {
251                     Throwable failureCause = future.cause();
252                     lifecycleManager.onError(ctx, true, failureCause);
253                 }
254 
255                 if (endOfStream) {
256                     // Must handle calling onError before calling closeStreamLocal, otherwise the error handler will
257                     // incorrectly think the stream no longer exists and so may not send RST_STREAM or perform similar
258                     // appropriate action.
259                     lifecycleManager.closeStreamLocal(stream, future);
260                 }
261 
262                 return future;
263             } else {
264                 Promise<Void> promise = ctx.newPromise();
265                 // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
266                 flowController.addFlowControlled(stream,
267                         new FlowControlledHeaders(stream, headers, hasPriority, streamDependency,
268                                 weight, exclusive, padding, true, promise));
269                 return promise.asFuture();
270             }
271         } catch (Throwable t) {
272             lifecycleManager.onError(ctx, true, t);
273             return ctx.newFailedFuture(t);
274         }
275     }
276 
277     @Override
278     public Future<Void> writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
279                                       boolean exclusive) {
280         return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive);
281     }
282 
283     @Override
284     public Future<Void> writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode) {
285         // Delegate to the lifecycle manager for proper updating of connection state.
286         return lifecycleManager.resetStream(ctx, streamId, errorCode);
287     }
288 
289     @Override
290     public Future<Void> writeSettings(ChannelHandlerContext ctx, Http2Settings settings) {
291         outstandingLocalSettingsQueue.add(settings);
292         try {
293             Boolean pushEnabled = settings.pushEnabled();
294             if (pushEnabled != null && connection.isServer()) {
295                 throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
296             }
297         } catch (Throwable e) {
298             return ctx.newFailedFuture(e);
299         }
300 
301         return frameWriter.writeSettings(ctx, settings);
302     }
303 
304     @Override
305     public Future<Void> writeSettingsAck(ChannelHandlerContext ctx) {
306         if (outstandingRemoteSettingsQueue == null) {
307             return frameWriter.writeSettingsAck(ctx);
308         }
309         Http2Settings settings = outstandingRemoteSettingsQueue.poll();
310         if (settings == null) {
311             return ctx.newFailedFuture(new Http2Exception(INTERNAL_ERROR, "attempted to write a SETTINGS ACK with no " +
312                                                                          " pending SETTINGS"));
313         }
314         SimpleChannelPromiseAggregator aggregator =
315                 new SimpleChannelPromiseAggregator(ctx.newPromise(), ctx.executor());
316         // Acknowledge receipt of the settings. We should do this before we process the settings to ensure our
317         // remote peer applies these settings before any subsequent frames that we may send which depend upon
318         // these new settings. See https://github.com/netty/netty/issues/6520.
319         frameWriter.writeSettingsAck(ctx).cascadeTo(aggregator.newPromise());
320 
321         // We create a "new promise" to make sure that status from both the write and the application are taken into
322         // account independently.
323         Promise<Void> applySettingsPromise = aggregator.newPromise();
324         try {
325             remoteSettings(settings);
326             applySettingsPromise.setSuccess(null);
327         } catch (Throwable e) {
328             applySettingsPromise.setFailure(e);
329             lifecycleManager.onError(ctx, true, e);
330         }
331         return aggregator.doneAllocatingPromises();
332     }
333 
334     @Override
335     public Future<Void> writePing(ChannelHandlerContext ctx, boolean ack, long data) {
336         return frameWriter.writePing(ctx, ack, data);
337     }
338 
339     @Override
340     public Future<Void> writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
341                                          Http2Headers headers, int padding) {
342         try {
343             if (connection.goAwayReceived()) {
344                 throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
345             }
346 
347             Http2Stream stream = requireStream(streamId);
348             // Reserve the promised stream.
349             connection.local().reservePushStream(promisedStreamId, stream);
350 
351             Future<Void> future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding);
352             // Writing headers may fail during the encode state if they violate HPACK limits.
353             if (future.isSuccess() || !future.isDone()) {
354                 // This just sets internal stream state which is used elsewhere in the codec and doesn't
355                 // necessarily mean the write will complete successfully.
356                 stream.pushPromiseSent();
357 
358                 if (!future.isSuccess()) {
359                     // Either the future is not done or failed in the meantime.
360                     notifyLifecycleManagerOnError(future, ctx);
361                 }
362             } else {
363                 Throwable failureCause = future.cause();
364                 lifecycleManager.onError(ctx, true, failureCause);
365             }
366             return future;
367         } catch (Throwable t) {
368             lifecycleManager.onError(ctx, true, t);
369             return ctx.newFailedFuture(t);
370         }
371     }
372 
373     @Override
374     public Future<Void> writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, Buffer debugData) {
375         return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData);
376     }
377 
378     @Override
379     public Future<Void> writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
380         return ctx.newFailedFuture(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
381                                                                     " objects to control window sizes"));
382     }
383 
384     @Override
385     public Future<Void> writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
386                                    Buffer payload) {
387         return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload);
388     }
389 
390     @Override
391     public void close() {
392         frameWriter.close();
393     }
394 
395     @Override
396     public Http2Settings pollSentSettings() {
397         return outstandingLocalSettingsQueue.poll();
398     }
399 
400     @Override
401     public Configuration configuration() {
402         return frameWriter.configuration();
403     }
404 
405     private Http2Stream requireStream(int streamId) {
406         Http2Stream stream = connection.stream(streamId);
407         if (stream == null) {
408             final String message;
409             if (connection.streamMayHaveExisted(streamId)) {
410                 message = "Stream no longer exists: " + streamId;
411             } else {
412                 message = "Stream does not exist: " + streamId;
413             }
414             throw new IllegalArgumentException(message);
415         }
416         return stream;
417     }
418 
419     @Override
420     public void consumeReceivedSettings(Http2Settings settings) {
421         if (outstandingRemoteSettingsQueue == null) {
422             outstandingRemoteSettingsQueue = new ArrayDeque<>(2);
423         }
424         outstandingRemoteSettingsQueue.add(settings);
425     }
426 
427     /**
428      * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
429      * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
430      * {@link #size} calculation deterministic thereby greatly simplifying the implementation.
431      * <p>
432      * If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that
433      * the passed promise is not completed until last frame write.
434      * </p>
435      */
436     private final class FlowControlledData extends FlowControlledBase {
437         private final CoalescingBufferQueue queue;
438         private int dataSize;
439 
440         FlowControlledData(Http2Stream stream, Buffer buf, int padding, boolean endOfStream,
441                            Promise<Void> promise, Channel channel) {
442             super(stream, padding, endOfStream, promise);
443             queue = new CoalescingBufferQueue(channel);
444             queue.add(buf, promise);
445             dataSize = queue.readableBytes();
446         }
447 
448         @Override
449         public int size() {
450             return dataSize + padding;
451         }
452 
453         @Override
454         public void error(ChannelHandlerContext ctx, Throwable cause) {
455             queue.releaseAndFailAll(cause);
456             // Don't update dataSize because we need to ensure the size() method returns a consistent size even after
457             // error so we don't invalidate flow control when returning bytes to flow control.
458             //
459             // That said we will set dataSize and padding to 0 in the write(...) method if we cleared the queue
460             // because of an error.
461             lifecycleManager.onError(ctx, true, cause);
462         }
463 
464         @Override
465         public void write(ChannelHandlerContext ctx, int allowedBytes) {
466             int queuedData = queue.readableBytes();
467             if (!endOfStream) {
468                 if (queuedData == 0) {
469                     if (queue.isEmpty()) {
470                         // When the queue is empty it means we did clear it because of an error(...) call
471                         // (as otherwise we will have at least 1 entry in there), which will happen either when called
472                         // explicit or when the write itself fails. In this case just set dataSize and padding to 0
473                         // which will signal back that the whole frame was consumed.
474                         //
475                         // See https://github.com/netty/netty/issues/8707.
476                         padding = dataSize = 0;
477                     } else {
478                         // There's no need to write any data frames because there are only empty data frames in the
479                         // queue and it is not end of stream yet. Just complete their promises by getting the buffer
480                         // corresponding to 0 bytes and writing it to the channel (to preserve notification order).
481                         Promise<Void> writePromise = ctx.newPromise();
482                         writePromise.asFuture().addListener(this);
483                         ctx.write(queue.remove(0, writePromise)).cascadeTo(writePromise);
484                     }
485                     return;
486                 }
487 
488                 if (allowedBytes == 0) {
489                     return;
490                 }
491             }
492 
493             // Determine how much data to write.
494             int writableData = min(queuedData, allowedBytes);
495             Promise<Void> writePromise = ctx.newPromise();
496             writePromise.asFuture().addListener(this);
497             Buffer toWrite = queue.remove(writableData, writePromise);
498             dataSize = queue.readableBytes();
499 
500             // Determine how much padding to write.
501             int writablePadding = min(allowedBytes - writableData, padding);
502             padding -= writablePadding;
503 
504             // Write the frame(s).
505             frameWriter().writeData(ctx, stream.id(), toWrite, writablePadding,
506                     endOfStream && size() == 0).cascadeTo(writePromise);
507         }
508 
509         @Override
510         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
511             FlowControlledData nextData;
512             if (FlowControlledData.class != next.getClass() ||
513                 MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
514                 return false;
515             }
516             nextData.queue.copyTo(queue);
517             dataSize = queue.readableBytes();
518             // Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
519             padding = Math.max(padding, nextData.padding);
520             endOfStream = nextData.endOfStream;
521             return true;
522         }
523     }
524 
525     private void notifyLifecycleManagerOnError(Future<Void> future, final ChannelHandlerContext ctx) {
526         future.addListener(future1 -> {
527             Throwable cause = future1.cause();
528             if (cause != null) {
529                 lifecycleManager.onError(ctx, true, cause);
530             }
531         });
532     }
533 
534     /**
535      * Wrap headers so they can be written subject to flow-control. While headers do not have cost against the
536      * flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is
537      * blocked on flow-control a HEADER frame must wait until this frame has been written.
538      */
539     private final class FlowControlledHeaders extends FlowControlledBase {
540         private final Http2Headers headers;
541         private final boolean hasPriority;
542         private final int streamDependency;
543         private final short weight;
544         private final boolean exclusive;
545 
546         FlowControlledHeaders(Http2Stream stream, Http2Headers headers, boolean hasPriority,
547                               int streamDependency, short weight, boolean exclusive,
548                               int padding, boolean endOfStream, Promise<Void> promise) {
549             super(stream, padding, endOfStream, promise);
550             this.headers = headers;
551             this.hasPriority = hasPriority;
552             this.streamDependency = streamDependency;
553             this.weight = weight;
554             this.exclusive = exclusive;
555         }
556 
557         @Override
558         public int size() {
559             return 0;
560         }
561 
562         @Override
563         public void error(ChannelHandlerContext ctx, Throwable cause) {
564             if (ctx != null) {
565                 lifecycleManager.onError(ctx, true, cause);
566             }
567             promise.tryFailure(cause);
568         }
569 
570         @Override
571         public void write(ChannelHandlerContext ctx, int allowedBytes) {
572             boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
573             // The code is currently requiring adding this listener before writing, in order to call onError() before
574             // closeStreamLocal().
575             promise.asFuture().addListener(this);
576 
577             Future<Void> f = sendHeaders(frameWriter, ctx, stream.id(), headers, hasPriority, streamDependency,
578                                          weight, exclusive, padding, endOfStream);
579             f.cascadeTo(promise);
580             // Writing headers may fail during the encode state if they violate HPACK limits.
581             if (!f.isFailed()) { // "not failed" means either not done, or completed successfully.
582                 // This just sets internal stream state which is used elsewhere in the codec and doesn't
583                 // necessarily mean the write will complete successfully.
584                 stream.headersSent(isInformational);
585             }
586         }
587 
588         @Override
589         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
590             return false;
591         }
592     }
593 
594     /**
595      * Common base type for payloads to deliver via flow-control.
596      */
597     public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled, FutureListener<Void> {
598         protected final Http2Stream stream;
599         protected Promise<Void> promise;
600         protected boolean endOfStream;
601         protected int padding;
602 
603         FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream,
604                 final Promise<Void> promise) {
605             checkPositiveOrZero(padding, "padding");
606             this.padding = padding;
607             this.endOfStream = endOfStream;
608             this.stream = stream;
609             this.promise = promise;
610         }
611 
612         @Override
613         public void writeComplete() {
614             if (endOfStream) {
615                 lifecycleManager.closeStreamLocal(stream, promise.asFuture());
616             }
617         }
618 
619         @Override
620         public void operationComplete(Future<? extends Void> future) {
621             if (future.isFailed()) {
622                 error(flowController().channelHandlerContext(), future.cause());
623             }
624         }
625     }
626 }