View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.channel.ChannelFuture;
19  import io.netty.channel.ChannelFutureListener;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.CoalescingBufferQueue;
23  import io.netty.handler.codec.http.HttpStatusClass;
24  import io.netty.util.internal.UnstableApi;
25  
26  import java.util.ArrayDeque;
27  
28  import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
29  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
30  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
31  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
32  import static io.netty.util.internal.ObjectUtil.checkNotNull;
33  import static java.lang.Integer.MAX_VALUE;
34  import static java.lang.Math.min;
35  
36  /**
37   * Default implementation of {@link Http2ConnectionEncoder}.
38   */
39  @UnstableApi
40  public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
41      private final Http2FrameWriter frameWriter;
42      private final Http2Connection connection;
43      private Http2LifecycleManager lifecycleManager;
44      // We prefer ArrayDeque to LinkedList because later will produce more GC.
45      // This initial capacity is plenty for SETTINGS traffic.
46      private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
47  
48      public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) {
49          this.connection = checkNotNull(connection, "connection");
50          this.frameWriter = checkNotNull(frameWriter, "frameWriter");
51          if (connection.remote().flowController() == null) {
52              connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
53          }
54      }
55  
56      @Override
57      public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
58          this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
59      }
60  
61      @Override
62      public Http2FrameWriter frameWriter() {
63          return frameWriter;
64      }
65  
66      @Override
67      public Http2Connection connection() {
68          return connection;
69      }
70  
71      @Override
72      public final Http2RemoteFlowController flowController() {
73          return connection().remote().flowController();
74      }
75  
76      @Override
77      public void remoteSettings(Http2Settings settings) throws Http2Exception {
78          Boolean pushEnabled = settings.pushEnabled();
79          Http2FrameWriter.Configuration config = configuration();
80          Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
81          Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
82          if (pushEnabled != null) {
83              if (!connection.isServer() && pushEnabled) {
84                  throw connectionError(PROTOCOL_ERROR,
85                      "Client received a value of ENABLE_PUSH specified to other than 0");
86              }
87              connection.remote().allowPushTo(pushEnabled);
88          }
89  
90          Long maxConcurrentStreams = settings.maxConcurrentStreams();
91          if (maxConcurrentStreams != null) {
92              connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
93          }
94  
95          Long headerTableSize = settings.headerTableSize();
96          if (headerTableSize != null) {
97              outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE));
98          }
99  
100         Long maxHeaderListSize = settings.maxHeaderListSize();
101         if (maxHeaderListSize != null) {
102             outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
103         }
104 
105         Integer maxFrameSize = settings.maxFrameSize();
106         if (maxFrameSize != null) {
107             outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
108         }
109 
110         Integer initialWindowSize = settings.initialWindowSize();
111         if (initialWindowSize != null) {
112             flowController().initialWindowSize(initialWindowSize);
113         }
114     }
115 
116     @Override
117     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
118             final boolean endOfStream, ChannelPromise promise) {
119         final Http2Stream stream;
120         try {
121             stream = requireStream(streamId);
122 
123             // Verify that the stream is in the appropriate state for sending DATA frames.
124             switch (stream.state()) {
125                 case OPEN:
126                 case HALF_CLOSED_REMOTE:
127                     // Allowed sending DATA frames in these states.
128                     break;
129                 default:
130                     throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " + stream.state());
131             }
132         } catch (Throwable e) {
133             data.release();
134             return promise.setFailure(e);
135         }
136 
137         // Hand control of the frame to the flow controller.
138         flowController().addFlowControlled(stream,
139                 new FlowControlledData(stream, data, padding, endOfStream, promise));
140         return promise;
141     }
142 
143     @Override
144     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
145             boolean endStream, ChannelPromise promise) {
146         return writeHeaders(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endStream, promise);
147     }
148 
149     private static boolean validateHeadersSentState(Http2Stream stream, Http2Headers headers, boolean isServer,
150                                                     boolean endOfStream) {
151         boolean isInformational = isServer && HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
152         if ((isInformational || !endOfStream) && stream.isHeadersSent() || stream.isTrailersSent()) {
153             throw new IllegalStateException("Stream " + stream.id() + " sent too many headers EOS: " + endOfStream);
154         }
155         return isInformational;
156     }
157 
158     @Override
159     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId,
160             final Http2Headers headers, final int streamDependency, final short weight,
161             final boolean exclusive, final int padding, final boolean endOfStream, ChannelPromise promise) {
162         try {
163             Http2Stream stream = connection.stream(streamId);
164             if (stream == null) {
165                 try {
166                     // We don't create the stream in a `halfClosed` state because if this is an initial
167                     // HEADERS frame we don't want the connection state to signify that the HEADERS have
168                     // been sent until after they have been encoded and placed in the outbound buffer.
169                     // Therefore, we let the `LifeCycleManager` will take care of transitioning the state
170                     // as appropriate.
171                     stream = connection.local().createStream(streamId, /*endOfStream*/ false);
172                 } catch (Http2Exception cause) {
173                     if (connection.remote().mayHaveCreatedStream(streamId)) {
174                         promise.tryFailure(new IllegalStateException("Stream no longer exists: " + streamId, cause));
175                         return promise;
176                     }
177                     throw cause;
178                 }
179             } else {
180                 switch (stream.state()) {
181                     case RESERVED_LOCAL:
182                         stream.open(endOfStream);
183                         break;
184                     case OPEN:
185                     case HALF_CLOSED_REMOTE:
186                         // Allowed sending headers in these states.
187                         break;
188                     default:
189                         throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " +
190                                                         stream.state());
191                 }
192             }
193 
194             // Trailing headers must go through flow control if there are other frames queued in flow control
195             // for this stream.
196             Http2RemoteFlowController flowController = flowController();
197             if (!endOfStream || !flowController.hasFlowControlled(stream)) {
198                 // The behavior here should mirror that in FlowControlledHeaders
199 
200                 promise = promise.unvoid();
201                 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
202 
203                 ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
204                                                                 weight, exclusive, padding, endOfStream, promise);
205                 // Writing headers may fail during the encode state if they violate HPACK limits.
206                 Throwable failureCause = future.cause();
207                 if (failureCause == null) {
208                     // Synchronously set the headersSent flag to ensure that we do not subsequently write
209                     // other headers containing pseudo-header fields.
210                     //
211                     // This just sets internal stream state which is used elsewhere in the codec and doesn't
212                     // necessarily mean the write will complete successfully.
213                     stream.headersSent(isInformational);
214 
215                     if (!future.isSuccess()) {
216                         // Either the future is not done or failed in the meantime.
217                         notifyLifecycleManagerOnError(future, ctx);
218                     }
219                 } else {
220                     lifecycleManager.onError(ctx, true, failureCause);
221                 }
222 
223                 if (endOfStream) {
224                     // Must handle calling onError before calling closeStreamLocal, otherwise the error handler will
225                     // incorrectly think the stream no longer exists and so may not send RST_STREAM or perform similar
226                     // appropriate action.
227                     lifecycleManager.closeStreamLocal(stream, future);
228                 }
229 
230                 return future;
231             } else {
232                 // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
233                 flowController.addFlowControlled(stream,
234                         new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding,
235                                                   true, promise));
236                 return promise;
237             }
238         } catch (Throwable t) {
239             lifecycleManager.onError(ctx, true, t);
240             promise.tryFailure(t);
241             return promise;
242         }
243     }
244 
245     @Override
246     public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
247             boolean exclusive, ChannelPromise promise) {
248         return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
249     }
250 
251     @Override
252     public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
253             ChannelPromise promise) {
254         // Delegate to the lifecycle manager for proper updating of connection state.
255         return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);
256     }
257 
258     @Override
259     public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
260             ChannelPromise promise) {
261         outstandingLocalSettingsQueue.add(settings);
262         try {
263             Boolean pushEnabled = settings.pushEnabled();
264             if (pushEnabled != null && connection.isServer()) {
265                 throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
266             }
267         } catch (Throwable e) {
268             return promise.setFailure(e);
269         }
270 
271         return frameWriter.writeSettings(ctx, settings, promise);
272     }
273 
274     @Override
275     public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
276         return frameWriter.writeSettingsAck(ctx, promise);
277     }
278 
279     @Override
280     public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
281         return frameWriter.writePing(ctx, ack, data, promise);
282     }
283 
284     @Override
285     public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
286             Http2Headers headers, int padding, ChannelPromise promise) {
287         try {
288             if (connection.goAwayReceived()) {
289                 throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
290             }
291 
292             Http2Stream stream = requireStream(streamId);
293             // Reserve the promised stream.
294             connection.local().reservePushStream(promisedStreamId, stream);
295 
296             promise = promise.unvoid();
297             ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
298                                                                 promise);
299             // Writing headers may fail during the encode state if they violate HPACK limits.
300             Throwable failureCause = future.cause();
301             if (failureCause == null) {
302                 // This just sets internal stream state which is used elsewhere in the codec and doesn't
303                 // necessarily mean the write will complete successfully.
304                 stream.pushPromiseSent();
305 
306                 if (!future.isSuccess()) {
307                     // Either the future is not done or failed in the meantime.
308                     notifyLifecycleManagerOnError(future, ctx);
309                 }
310             } else {
311                 lifecycleManager.onError(ctx, true, failureCause);
312             }
313             return future;
314         } catch (Throwable t) {
315             lifecycleManager.onError(ctx, true, t);
316             promise.tryFailure(t);
317             return promise;
318         }
319     }
320 
321     @Override
322     public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
323             ChannelPromise promise) {
324         return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);
325     }
326 
327     @Override
328     public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
329             ChannelPromise promise) {
330         return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
331                 " objects to control window sizes"));
332     }
333 
334     @Override
335     public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
336             ByteBuf payload, ChannelPromise promise) {
337         return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise);
338     }
339 
340     @Override
341     public void close() {
342         frameWriter.close();
343     }
344 
345     @Override
346     public Http2Settings pollSentSettings() {
347         return outstandingLocalSettingsQueue.poll();
348     }
349 
350     @Override
351     public Configuration configuration() {
352         return frameWriter.configuration();
353     }
354 
355     private Http2Stream requireStream(int streamId) {
356         Http2Stream stream = connection.stream(streamId);
357         if (stream == null) {
358             final String message;
359             if (connection.streamMayHaveExisted(streamId)) {
360                 message = "Stream no longer exists: " + streamId;
361             } else {
362                 message = "Stream does not exist: " + streamId;
363             }
364             throw new IllegalArgumentException(message);
365         }
366         return stream;
367     }
368 
369     /**
370      * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
371      * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
372      * {@link #size} calculation deterministic thereby greatly simplifying the implementation.
373      * <p>
374      * If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that
375      * the passed promise is not completed until last frame write.
376      * </p>
377      */
378     private final class FlowControlledData extends FlowControlledBase {
379         private final CoalescingBufferQueue queue;
380         private int dataSize;
381 
382         FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream,
383                                    ChannelPromise promise) {
384             super(stream, padding, endOfStream, promise);
385             queue = new CoalescingBufferQueue(promise.channel());
386             queue.add(buf, promise);
387             dataSize = queue.readableBytes();
388         }
389 
390         @Override
391         public int size() {
392             return dataSize + padding;
393         }
394 
395         @Override
396         public void error(ChannelHandlerContext ctx, Throwable cause) {
397             queue.releaseAndFailAll(cause);
398             // Don't update dataSize because we need to ensure the size() method returns a consistent size even after
399             // error so we don't invalidate flow control when returning bytes to flow control.
400             lifecycleManager.onError(ctx, true, cause);
401         }
402 
403         @Override
404         public void write(ChannelHandlerContext ctx, int allowedBytes) {
405             int queuedData = queue.readableBytes();
406             if (!endOfStream) {
407                 if (queuedData == 0) {
408                     // There's no need to write any data frames because there are only empty data frames in the queue
409                     // and it is not end of stream yet. Just complete their promises by getting the buffer corresponding
410                     // to 0 bytes and writing it to the channel (to preserve notification order).
411                     ChannelPromise writePromise = ctx.newPromise().addListener(this);
412                     ctx.write(queue.remove(0, writePromise), writePromise);
413                     return;
414                 }
415 
416                 if (allowedBytes == 0) {
417                     return;
418                 }
419             }
420 
421             // Determine how much data to write.
422             int writableData = min(queuedData, allowedBytes);
423             ChannelPromise writePromise = ctx.newPromise().addListener(this);
424             ByteBuf toWrite = queue.remove(writableData, writePromise);
425             dataSize = queue.readableBytes();
426 
427             // Determine how much padding to write.
428             int writablePadding = min(allowedBytes - writableData, padding);
429             padding -= writablePadding;
430 
431             // Write the frame(s).
432             frameWriter().writeData(ctx, stream.id(), toWrite, writablePadding,
433                     endOfStream && size() == 0, writePromise);
434         }
435 
436         @Override
437         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
438             FlowControlledData nextData;
439             if (FlowControlledData.class != next.getClass() ||
440                 MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
441                 return false;
442             }
443             nextData.queue.copyTo(queue);
444             dataSize = queue.readableBytes();
445             // Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
446             padding = Math.max(padding, nextData.padding);
447             endOfStream = nextData.endOfStream;
448             return true;
449         }
450     }
451 
452     private void notifyLifecycleManagerOnError(ChannelFuture future, final ChannelHandlerContext ctx) {
453         future.addListener(new ChannelFutureListener() {
454             @Override
455             public void operationComplete(ChannelFuture future) throws Exception {
456                 Throwable cause = future.cause();
457                 if (cause != null) {
458                     lifecycleManager.onError(ctx, true, cause);
459                 }
460             }
461         });
462     }
463 
464     /**
465      * Wrap headers so they can be written subject to flow-control. While headers do not have cost against the
466      * flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is
467      * blocked on flow-control a HEADER frame must wait until this frame has been written.
468      */
469     private final class FlowControlledHeaders extends FlowControlledBase {
470         private final Http2Headers headers;
471         private final int streamDependency;
472         private final short weight;
473         private final boolean exclusive;
474 
475         FlowControlledHeaders(Http2Stream stream, Http2Headers headers, int streamDependency, short weight,
476                 boolean exclusive, int padding, boolean endOfStream, ChannelPromise promise) {
477             super(stream, padding, endOfStream, promise.unvoid());
478             this.headers = headers;
479             this.streamDependency = streamDependency;
480             this.weight = weight;
481             this.exclusive = exclusive;
482         }
483 
484         @Override
485         public int size() {
486             return 0;
487         }
488 
489         @Override
490         public void error(ChannelHandlerContext ctx, Throwable cause) {
491             if (ctx != null) {
492                 lifecycleManager.onError(ctx, true, cause);
493             }
494             promise.tryFailure(cause);
495         }
496 
497         @Override
498         public void write(ChannelHandlerContext ctx, int allowedBytes) {
499             boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
500             // The code is currently requiring adding this listener before writing, in order to call onError() before
501             // closeStreamLocal().
502             promise.addListener(this);
503 
504             ChannelFuture f = frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
505                                                        padding, endOfStream, promise);
506             // Writing headers may fail during the encode state if they violate HPACK limits.
507             Throwable failureCause = f.cause();
508             if (failureCause == null) {
509                 // This just sets internal stream state which is used elsewhere in the codec and doesn't
510                 // necessarily mean the write will complete successfully.
511                 stream.headersSent(isInformational);
512             }
513         }
514 
515         @Override
516         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
517             return false;
518         }
519     }
520 
521     /**
522      * Common base type for payloads to deliver via flow-control.
523      */
524     public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
525             ChannelFutureListener {
526         protected final Http2Stream stream;
527         protected ChannelPromise promise;
528         protected boolean endOfStream;
529         protected int padding;
530 
531         FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream,
532                 final ChannelPromise promise) {
533             if (padding < 0) {
534                 throw new IllegalArgumentException("padding must be >= 0");
535             }
536             this.padding = padding;
537             this.endOfStream = endOfStream;
538             this.stream = stream;
539             this.promise = promise;
540         }
541 
542         @Override
543         public void writeComplete() {
544             if (endOfStream) {
545                 lifecycleManager.closeStreamLocal(stream, promise);
546             }
547         }
548 
549         @Override
550         public void operationComplete(ChannelFuture future) throws Exception {
551             if (!future.isSuccess()) {
552                 error(flowController().channelHandlerContext(), future.cause());
553             }
554         }
555     }
556 }