View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.channel.ChannelFuture;
19  import io.netty.channel.ChannelFutureListener;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.CoalescingBufferQueue;
23  import io.netty.handler.codec.http.HttpStatusClass;
24  import io.netty.util.internal.UnstableApi;
25  
26  import java.util.ArrayDeque;
27  
28  import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
29  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
30  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
31  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
32  import static io.netty.util.internal.ObjectUtil.checkNotNull;
33  import static java.lang.Integer.MAX_VALUE;
34  import static java.lang.Math.min;
35  
36  /**
37   * Default implementation of {@link Http2ConnectionEncoder}.
38   */
39  @UnstableApi
40  public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
41      private final Http2FrameWriter frameWriter;
42      private final Http2Connection connection;
43      private Http2LifecycleManager lifecycleManager;
44      // We prefer ArrayDeque to LinkedList because later will produce more GC.
45      // This initial capacity is plenty for SETTINGS traffic.
46      private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
47  
48      public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) {
49          this.connection = checkNotNull(connection, "connection");
50          this.frameWriter = checkNotNull(frameWriter, "frameWriter");
51          if (connection.remote().flowController() == null) {
52              connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
53          }
54      }
55  
56      @Override
57      public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
58          this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
59      }
60  
61      @Override
62      public Http2FrameWriter frameWriter() {
63          return frameWriter;
64      }
65  
66      @Override
67      public Http2Connection connection() {
68          return connection;
69      }
70  
71      @Override
72      public final Http2RemoteFlowController flowController() {
73          return connection().remote().flowController();
74      }
75  
76      @Override
77      public void remoteSettings(Http2Settings settings) throws Http2Exception {
78          Boolean pushEnabled = settings.pushEnabled();
79          Http2FrameWriter.Configuration config = configuration();
80          Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
81          Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
82          if (pushEnabled != null) {
83              if (!connection.isServer() && pushEnabled) {
84                  throw connectionError(PROTOCOL_ERROR,
85                      "Client received a value of ENABLE_PUSH specified to other than 0");
86              }
87              connection.remote().allowPushTo(pushEnabled);
88          }
89  
90          Long maxConcurrentStreams = settings.maxConcurrentStreams();
91          if (maxConcurrentStreams != null) {
92              connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
93          }
94  
95          Long headerTableSize = settings.headerTableSize();
96          if (headerTableSize != null) {
97              outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE));
98          }
99  
100         Long maxHeaderListSize = settings.maxHeaderListSize();
101         if (maxHeaderListSize != null) {
102             outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
103         }
104 
105         Integer maxFrameSize = settings.maxFrameSize();
106         if (maxFrameSize != null) {
107             outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
108         }
109 
110         Integer initialWindowSize = settings.initialWindowSize();
111         if (initialWindowSize != null) {
112             flowController().initialWindowSize(initialWindowSize);
113         }
114     }
115 
116     @Override
117     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
118             final boolean endOfStream, ChannelPromise promise) {
119         final Http2Stream stream;
120         try {
121             stream = requireStream(streamId);
122 
123             // Verify that the stream is in the appropriate state for sending DATA frames.
124             switch (stream.state()) {
125                 case OPEN:
126                 case HALF_CLOSED_REMOTE:
127                     // Allowed sending DATA frames in these states.
128                     break;
129                 default:
130                     throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " + stream.state());
131             }
132         } catch (Throwable e) {
133             data.release();
134             return promise.setFailure(e);
135         }
136 
137         // Hand control of the frame to the flow controller.
138         flowController().addFlowControlled(stream,
139                 new FlowControlledData(stream, data, padding, endOfStream, promise));
140         return promise;
141     }
142 
143     @Override
144     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
145             boolean endStream, ChannelPromise promise) {
146         return writeHeaders(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endStream, promise);
147     }
148 
149     private static boolean validateHeadersSentState(Http2Stream stream, Http2Headers headers, boolean isServer,
150                                                     boolean endOfStream) {
151         boolean isInformational = isServer && HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
152         if ((isInformational || !endOfStream) && stream.isHeadersSent() || stream.isTrailersSent()) {
153             throw new IllegalStateException("Stream " + stream.id() + " sent too many headers EOS: " + endOfStream);
154         }
155         return isInformational;
156     }
157 
158     @Override
159     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId,
160             final Http2Headers headers, final int streamDependency, final short weight,
161             final boolean exclusive, final int padding, final boolean endOfStream, ChannelPromise promise) {
162         try {
163             Http2Stream stream = connection.stream(streamId);
164             if (stream == null) {
165                 try {
166                     stream = connection.local().createStream(streamId, endOfStream);
167                 } catch (Http2Exception cause) {
168                     if (connection.remote().mayHaveCreatedStream(streamId)) {
169                         promise.tryFailure(new IllegalStateException("Stream no longer exists: " + streamId, cause));
170                         return promise;
171                     }
172                     throw cause;
173                 }
174             } else {
175                 switch (stream.state()) {
176                     case RESERVED_LOCAL:
177                         stream.open(endOfStream);
178                         break;
179                     case OPEN:
180                     case HALF_CLOSED_REMOTE:
181                         // Allowed sending headers in these states.
182                         break;
183                     default:
184                         throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " +
185                                                         stream.state());
186                 }
187             }
188 
189             // Trailing headers must go through flow control if there are other frames queued in flow control
190             // for this stream.
191             Http2RemoteFlowController flowController = flowController();
192             if (!endOfStream || !flowController.hasFlowControlled(stream)) {
193                 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
194                 if (endOfStream) {
195                     final Http2Stream finalStream = stream;
196                     final ChannelFutureListener closeStreamLocalListener = new ChannelFutureListener() {
197                         @Override
198                         public void operationComplete(ChannelFuture future) throws Exception {
199                             lifecycleManager.closeStreamLocal(finalStream, future);
200                         }
201                     };
202                     promise = promise.unvoid().addListener(closeStreamLocalListener);
203                 }
204 
205                 ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
206                                                                 weight, exclusive, padding, endOfStream, promise);
207                 // Writing headers may fail during the encode state if they violate HPACK limits.
208                 Throwable failureCause = future.cause();
209                 if (failureCause == null) {
210                     // Synchronously set the headersSent flag to ensure that we do not subsequently write
211                     // other headers containing pseudo-header fields.
212                     stream.headersSent(isInformational);
213                 } else {
214                     lifecycleManager.onError(ctx, failureCause);
215                 }
216 
217                 return future;
218             } else {
219                 // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
220                 flowController.addFlowControlled(stream,
221                         new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding,
222                                                   true, promise));
223                 return promise;
224             }
225         } catch (Throwable t) {
226             lifecycleManager.onError(ctx, t);
227             promise.tryFailure(t);
228             return promise;
229         }
230     }
231 
232     @Override
233     public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
234             boolean exclusive, ChannelPromise promise) {
235         return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
236     }
237 
238     @Override
239     public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
240             ChannelPromise promise) {
241         // Delegate to the lifecycle manager for proper updating of connection state.
242         return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);
243     }
244 
245     @Override
246     public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
247             ChannelPromise promise) {
248         outstandingLocalSettingsQueue.add(settings);
249         try {
250             Boolean pushEnabled = settings.pushEnabled();
251             if (pushEnabled != null && connection.isServer()) {
252                 throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
253             }
254         } catch (Throwable e) {
255             return promise.setFailure(e);
256         }
257 
258         return frameWriter.writeSettings(ctx, settings, promise);
259     }
260 
261     @Override
262     public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
263         return frameWriter.writeSettingsAck(ctx, promise);
264     }
265 
266     @Override
267     public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) {
268         return frameWriter.writePing(ctx, ack, data, promise);
269     }
270 
271     @Override
272     public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
273             Http2Headers headers, int padding, ChannelPromise promise) {
274         try {
275             if (connection.goAwayReceived()) {
276                 throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
277             }
278 
279             Http2Stream stream = requireStream(streamId);
280             // Reserve the promised stream.
281             connection.local().reservePushStream(promisedStreamId, stream);
282 
283             ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
284                                                                 promise);
285             // Writing headers may fail during the encode state if they violate HPACK limits.
286             Throwable failureCause = future.cause();
287             if (failureCause == null) {
288                 stream.pushPromiseSent();
289             } else {
290                 lifecycleManager.onError(ctx, failureCause);
291             }
292             return future;
293         } catch (Throwable t) {
294             lifecycleManager.onError(ctx, t);
295             promise.tryFailure(t);
296             return promise;
297         }
298     }
299 
300     @Override
301     public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
302             ChannelPromise promise) {
303         return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);
304     }
305 
306     @Override
307     public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
308             ChannelPromise promise) {
309         return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
310                 " objects to control window sizes"));
311     }
312 
313     @Override
314     public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
315             ByteBuf payload, ChannelPromise promise) {
316         return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise);
317     }
318 
319     @Override
320     public void close() {
321         frameWriter.close();
322     }
323 
324     @Override
325     public Http2Settings pollSentSettings() {
326         return outstandingLocalSettingsQueue.poll();
327     }
328 
329     @Override
330     public Configuration configuration() {
331         return frameWriter.configuration();
332     }
333 
334     private Http2Stream requireStream(int streamId) {
335         Http2Stream stream = connection.stream(streamId);
336         if (stream == null) {
337             final String message;
338             if (connection.streamMayHaveExisted(streamId)) {
339                 message = "Stream no longer exists: " + streamId;
340             } else {
341                 message = "Stream does not exist: " + streamId;
342             }
343             throw new IllegalArgumentException(message);
344         }
345         return stream;
346     }
347 
348     /**
349      * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
350      * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
351      * {@link #size} calculation deterministic thereby greatly simplifying the implementation.
352      * <p>
353      * If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that
354      * the passed promise is not completed until last frame write.
355      * </p>
356      */
357     private final class FlowControlledData extends FlowControlledBase {
358         private final CoalescingBufferQueue queue;
359         private int dataSize;
360 
361         FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream,
362                                    ChannelPromise promise) {
363             super(stream, padding, endOfStream, promise);
364             queue = new CoalescingBufferQueue(promise.channel());
365             queue.add(buf, promise);
366             dataSize = queue.readableBytes();
367         }
368 
369         @Override
370         public int size() {
371             return dataSize + padding;
372         }
373 
374         @Override
375         public void error(ChannelHandlerContext ctx, Throwable cause) {
376             queue.releaseAndFailAll(cause);
377             // Don't update dataSize because we need to ensure the size() method returns a consistent size even after
378             // error so we don't invalidate flow control when returning bytes to flow control.
379             lifecycleManager.onError(ctx, cause);
380         }
381 
382         @Override
383         public void write(ChannelHandlerContext ctx, int allowedBytes) {
384             int queuedData = queue.readableBytes();
385             if (!endOfStream) {
386                 if (queuedData == 0) {
387                     // There's no need to write any data frames because there are only empty data frames in the queue
388                     // and it is not end of stream yet. Just complete their promises by getting the buffer corresponding
389                     // to 0 bytes and writing it to the channel (to preserve notification order).
390                     ChannelPromise writePromise = ctx.newPromise().addListener(this);
391                     ctx.write(queue.remove(0, writePromise), writePromise);
392                     return;
393                 }
394 
395                 if (allowedBytes == 0) {
396                     return;
397                 }
398             }
399 
400             // Determine how much data to write.
401             int writableData = min(queuedData, allowedBytes);
402             ChannelPromise writePromise = ctx.newPromise().addListener(this);
403             ByteBuf toWrite = queue.remove(writableData, writePromise);
404             dataSize = queue.readableBytes();
405 
406             // Determine how much padding to write.
407             int writablePadding = min(allowedBytes - writableData, padding);
408             padding -= writablePadding;
409 
410             // Write the frame(s).
411             frameWriter().writeData(ctx, stream.id(), toWrite, writablePadding,
412                     endOfStream && size() == 0, writePromise);
413         }
414 
415         @Override
416         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
417             FlowControlledData nextData;
418             if (FlowControlledData.class != next.getClass() ||
419                 MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
420                 return false;
421             }
422             nextData.queue.copyTo(queue);
423             dataSize = queue.readableBytes();
424             // Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
425             padding = Math.max(padding, nextData.padding);
426             endOfStream = nextData.endOfStream;
427             return true;
428         }
429     }
430 
431     /**
432      * Wrap headers so they can be written subject to flow-control. While headers do not have cost against the
433      * flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is
434      * blocked on flow-control a HEADER frame must wait until this frame has been written.
435      */
436     private final class FlowControlledHeaders extends FlowControlledBase {
437         private final Http2Headers headers;
438         private final int streamDependency;
439         private final short weight;
440         private final boolean exclusive;
441 
442         FlowControlledHeaders(Http2Stream stream, Http2Headers headers, int streamDependency, short weight,
443                 boolean exclusive, int padding, boolean endOfStream, ChannelPromise promise) {
444             super(stream, padding, endOfStream, promise);
445             this.headers = headers;
446             this.streamDependency = streamDependency;
447             this.weight = weight;
448             this.exclusive = exclusive;
449         }
450 
451         @Override
452         public int size() {
453             return 0;
454         }
455 
456         @Override
457         public void error(ChannelHandlerContext ctx, Throwable cause) {
458             if (ctx != null) {
459                 lifecycleManager.onError(ctx, cause);
460             }
461             promise.tryFailure(cause);
462         }
463 
464         @Override
465         public void write(ChannelHandlerContext ctx, int allowedBytes) {
466             boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
467             if (promise.isVoid()) {
468                 promise = ctx.newPromise();
469             }
470             promise.addListener(this);
471 
472             ChannelFuture f = frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
473                                                        padding, endOfStream, promise);
474             // Writing headers may fail during the encode state if they violate HPACK limits.
475             Throwable failureCause = f.cause();
476             if (failureCause == null) {
477                 stream.headersSent(isInformational);
478             } else {
479                 lifecycleManager.onError(ctx, failureCause);
480             }
481         }
482 
483         @Override
484         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
485             return false;
486         }
487     }
488 
489     /**
490      * Common base type for payloads to deliver via flow-control.
491      */
492     public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
493             ChannelFutureListener {
494         protected final Http2Stream stream;
495         protected ChannelPromise promise;
496         protected boolean endOfStream;
497         protected int padding;
498 
499         FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream,
500                 final ChannelPromise promise) {
501             if (padding < 0) {
502                 throw new IllegalArgumentException("padding must be >= 0");
503             }
504             this.padding = padding;
505             this.endOfStream = endOfStream;
506             this.stream = stream;
507             this.promise = promise;
508         }
509 
510         @Override
511         public void writeComplete() {
512             if (endOfStream) {
513                 lifecycleManager.closeStreamLocal(stream, promise);
514             }
515         }
516 
517         @Override
518         public void operationComplete(ChannelFuture future) throws Exception {
519             if (!future.isSuccess()) {
520                 error(flowController().channelHandlerContext(), future.cause());
521             }
522         }
523     }
524 }