View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.channel.ChannelFuture;
19  import io.netty.channel.ChannelFutureListener;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.CoalescingBufferQueue;
23  import io.netty.handler.codec.http.HttpStatusClass;
24  import io.netty.util.internal.UnstableApi;
25  
26  import java.util.ArrayDeque;
27  
28  import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
29  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
30  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
31  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
32  import static io.netty.util.internal.ObjectUtil.checkNotNull;
33  import static java.lang.Integer.MAX_VALUE;
34  import static java.lang.Math.min;
35  
36  /**
37   * Default implementation of {@link Http2ConnectionEncoder}.
38   */
39  @UnstableApi
40  public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
41      private final Http2FrameWriter frameWriter;
42      private final Http2Connection connection;
43      private Http2LifecycleManager lifecycleManager;
44      // We prefer ArrayDeque to LinkedList because later will produce more GC.
45      // This initial capacity is plenty for SETTINGS traffic.
46      private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
47  
48      public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) {
49          this.connection = checkNotNull(connection, "connection");
50          this.frameWriter = checkNotNull(frameWriter, "frameWriter");
51          if (connection.remote().flowController() == null) {
52              connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
53          }
54      }
55  
56      @Override
57      public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
58          this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
59      }
60  
61      @Override
62      public Http2FrameWriter frameWriter() {
63          return frameWriter;
64      }
65  
66      @Override
67      public Http2Connection connection() {
68          return connection;
69      }
70  
71      @Override
72      public final Http2RemoteFlowController flowController() {
73          return connection().remote().flowController();
74      }
75  
76      @Override
77      public void remoteSettings(Http2Settings settings) throws Http2Exception {
78          Boolean pushEnabled = settings.pushEnabled();
79          Http2FrameWriter.Configuration config = configuration();
80          Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
81          Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
82          if (pushEnabled != null) {
83              if (!connection.isServer() && pushEnabled) {
84                  throw connectionError(PROTOCOL_ERROR,
85                      "Client received a value of ENABLE_PUSH specified to other than 0");
86              }
87              connection.remote().allowPushTo(pushEnabled);
88          }
89  
90          Long maxConcurrentStreams = settings.maxConcurrentStreams();
91          if (maxConcurrentStreams != null) {
92              connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
93          }
94  
95          Long headerTableSize = settings.headerTableSize();
96          if (headerTableSize != null) {
97              outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE));
98          }
99  
100         Long maxHeaderListSize = settings.maxHeaderListSize();
101         if (maxHeaderListSize != null) {
102             outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
103         }
104 
105         Integer maxFrameSize = settings.maxFrameSize();
106         if (maxFrameSize != null) {
107             outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
108         }
109 
110         Integer initialWindowSize = settings.initialWindowSize();
111         if (initialWindowSize != null) {
112             flowController().initialWindowSize(initialWindowSize);
113         }
114     }
115 
116     @Override
117     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
118             final boolean endOfStream, ChannelPromise promise) {
119         final Http2Stream stream;
120         try {
121             stream = requireStream(streamId);
122 
123             // Verify that the stream is in the appropriate state for sending DATA frames.
124             switch (stream.state()) {
125                 case OPEN:
126                 case HALF_CLOSED_REMOTE:
127                     // Allowed sending DATA frames in these states.
128                     break;
129                 default:
130                     throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " + stream.state());
131             }
132         } catch (Throwable e) {
133             data.release();
134             return promise.setFailure(e);
135         }
136 
137         // Hand control of the frame to the flow controller.
138         flowController().addFlowControlled(stream,
139                 new FlowControlledData(stream, data, padding, endOfStream, promise));
140         return promise;
141     }
142 
143     @Override
144     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
145             boolean endStream, ChannelPromise promise) {
146         return writeHeaders(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endStream, promise);
147     }
148 
149     private static boolean validateHeadersSentState(Http2Stream stream, Http2Headers headers, boolean isServer,
150                                                     boolean endOfStream) {
151         boolean isInformational = isServer && HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
152         if ((isInformational || !endOfStream) && stream.isHeadersSent() || stream.isTrailersSent()) {
153             throw new IllegalStateException("Stream " + stream.id() + " sent too many headers EOS: " + endOfStream);
154         }
155         return isInformational;
156     }
157 
158     @Override
159     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId,
160             final Http2Headers headers, final int streamDependency, final short weight,
161             final boolean exclusive, final int padding, final boolean endOfStream, ChannelPromise promise) {
162         try {
163             Http2Stream stream = connection.stream(streamId);
164             if (stream == null) {
165                 try {
166                     stream = connection.local().createStream(streamId, endOfStream);
167                 } catch (Http2Exception cause) {
168                     if (connection.remote().mayHaveCreatedStream(streamId)) {
169                         promise.tryFailure(new IllegalStateException("Stream no longer exists: " + streamId, cause));
170                         return promise;
171                     }
172                     throw cause;
173                 }
174             } else {
175                 switch (stream.state()) {
176                     case RESERVED_LOCAL:
177                         stream.open(endOfStream);
178                         break;
179                     case OPEN:
180                     case HALF_CLOSED_REMOTE:
181                         // Allowed sending headers in these states.
182                         break;
183                     default:
184                         throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " +
185                                                         stream.state());
186                 }
187             }
188 
189             // Trailing headers must go through flow control if there are other frames queued in flow control
190             // for this stream.
191             Http2RemoteFlowController flowController = flowController();
192             if (!endOfStream || !flowController.hasFlowControlled(stream)) {
193                 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
194                 if (endOfStream) {
195                     final Http2Stream finalStream = stream;
196                     final ChannelFutureListener closeStreamLocalListener = new ChannelFutureListener() {
197                         @Override
198                         public void operationComplete(ChannelFuture future) throws Exception {
199                             lifecycleManager.closeStreamLocal(finalStream, future);
200                         }
201                     };
202                     promise = promise.unvoid().addListener(closeStreamLocalListener);
203                 }
204 
205                 ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
206                                                                 weight, exclusive, padding, endOfStream, promise);
207                 // Writing headers may fail during the encode state if they violate HPACK limits.
208                 Throwable failureCause = future.cause();
209                 if (failureCause == null) {
210                     // Synchronously set the headersSent flag to ensure that we do not subsequently write
211                     // other headers containing pseudo-header fields.
212                     //
213                     // This just sets internal stream state which is used elsewhere in the codec and doesn't
214                     // necessarily mean the write will complete successfully.
215                     stream.headersSent(isInformational);
216 
217                     if (!future.isSuccess()) {
218                         // Either the future is not done or failed in the meantime.
219                         notifyLifecycleManagerOnError(future, ctx);
220                     }
221                 } else {
222                     lifecycleManager.onError(ctx, true, failureCause);
223                 }
224 
225                 return future;
226             } else {
227                 // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
228                 flowController.addFlowControlled(stream,
229                         new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding,
230                                                   true, promise));
231                 return promise;
232             }
233         } catch (Throwable t) {
234             lifecycleManager.onError(ctx, true, t);
235             promise.tryFailure(t);
236             return promise;
237         }
238     }
239 
240     @Override
241     public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
242             boolean exclusive, ChannelPromise promise) {
243         return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
244     }
245 
246     @Override
247     public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
248             ChannelPromise promise) {
249         // Delegate to the lifecycle manager for proper updating of connection state.
250         return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);
251     }
252 
253     @Override
254     public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
255             ChannelPromise promise) {
256         outstandingLocalSettingsQueue.add(settings);
257         try {
258             Boolean pushEnabled = settings.pushEnabled();
259             if (pushEnabled != null && connection.isServer()) {
260                 throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
261             }
262         } catch (Throwable e) {
263             return promise.setFailure(e);
264         }
265 
266         return frameWriter.writeSettings(ctx, settings, promise);
267     }
268 
269     @Override
270     public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
271         return frameWriter.writeSettingsAck(ctx, promise);
272     }
273 
274     @Override
275     public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
276         return frameWriter.writePing(ctx, ack, data, promise);
277     }
278 
279     @Override
280     public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
281             Http2Headers headers, int padding, ChannelPromise promise) {
282         try {
283             if (connection.goAwayReceived()) {
284                 throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
285             }
286 
287             Http2Stream stream = requireStream(streamId);
288             // Reserve the promised stream.
289             connection.local().reservePushStream(promisedStreamId, stream);
290 
291             ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
292                                                                 promise);
293             // Writing headers may fail during the encode state if they violate HPACK limits.
294             Throwable failureCause = future.cause();
295             if (failureCause == null) {
296                 // This just sets internal stream state which is used elsewhere in the codec and doesn't
297                 // necessarily mean the write will complete successfully.
298                 stream.pushPromiseSent();
299 
300                 if (!future.isSuccess()) {
301                     // Either the future is not done or failed in the meantime.
302                     notifyLifecycleManagerOnError(future, ctx);
303                 }
304             } else {
305                 lifecycleManager.onError(ctx, true, failureCause);
306             }
307             return future;
308         } catch (Throwable t) {
309             lifecycleManager.onError(ctx, true, t);
310             promise.tryFailure(t);
311             return promise;
312         }
313     }
314 
315     @Override
316     public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
317             ChannelPromise promise) {
318         return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);
319     }
320 
321     @Override
322     public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
323             ChannelPromise promise) {
324         return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
325                 " objects to control window sizes"));
326     }
327 
328     @Override
329     public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
330             ByteBuf payload, ChannelPromise promise) {
331         return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise);
332     }
333 
334     @Override
335     public void close() {
336         frameWriter.close();
337     }
338 
339     @Override
340     public Http2Settings pollSentSettings() {
341         return outstandingLocalSettingsQueue.poll();
342     }
343 
344     @Override
345     public Configuration configuration() {
346         return frameWriter.configuration();
347     }
348 
349     private Http2Stream requireStream(int streamId) {
350         Http2Stream stream = connection.stream(streamId);
351         if (stream == null) {
352             final String message;
353             if (connection.streamMayHaveExisted(streamId)) {
354                 message = "Stream no longer exists: " + streamId;
355             } else {
356                 message = "Stream does not exist: " + streamId;
357             }
358             throw new IllegalArgumentException(message);
359         }
360         return stream;
361     }
362 
363     /**
364      * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
365      * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
366      * {@link #size} calculation deterministic thereby greatly simplifying the implementation.
367      * <p>
368      * If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that
369      * the passed promise is not completed until last frame write.
370      * </p>
371      */
372     private final class FlowControlledData extends FlowControlledBase {
373         private final CoalescingBufferQueue queue;
374         private int dataSize;
375 
376         FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream,
377                                    ChannelPromise promise) {
378             super(stream, padding, endOfStream, promise);
379             queue = new CoalescingBufferQueue(promise.channel());
380             queue.add(buf, promise);
381             dataSize = queue.readableBytes();
382         }
383 
384         @Override
385         public int size() {
386             return dataSize + padding;
387         }
388 
389         @Override
390         public void error(ChannelHandlerContext ctx, Throwable cause) {
391             queue.releaseAndFailAll(cause);
392             // Don't update dataSize because we need to ensure the size() method returns a consistent size even after
393             // error so we don't invalidate flow control when returning bytes to flow control.
394             lifecycleManager.onError(ctx, true, cause);
395         }
396 
397         @Override
398         public void write(ChannelHandlerContext ctx, int allowedBytes) {
399             int queuedData = queue.readableBytes();
400             if (!endOfStream) {
401                 if (queuedData == 0) {
402                     // There's no need to write any data frames because there are only empty data frames in the queue
403                     // and it is not end of stream yet. Just complete their promises by getting the buffer corresponding
404                     // to 0 bytes and writing it to the channel (to preserve notification order).
405                     ChannelPromise writePromise = ctx.newPromise().addListener(this);
406                     ctx.write(queue.remove(0, writePromise), writePromise);
407                     return;
408                 }
409 
410                 if (allowedBytes == 0) {
411                     return;
412                 }
413             }
414 
415             // Determine how much data to write.
416             int writableData = min(queuedData, allowedBytes);
417             ChannelPromise writePromise = ctx.newPromise().addListener(this);
418             ByteBuf toWrite = queue.remove(writableData, writePromise);
419             dataSize = queue.readableBytes();
420 
421             // Determine how much padding to write.
422             int writablePadding = min(allowedBytes - writableData, padding);
423             padding -= writablePadding;
424 
425             // Write the frame(s).
426             frameWriter().writeData(ctx, stream.id(), toWrite, writablePadding,
427                     endOfStream && size() == 0, writePromise);
428         }
429 
430         @Override
431         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
432             FlowControlledData nextData;
433             if (FlowControlledData.class != next.getClass() ||
434                 MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
435                 return false;
436             }
437             nextData.queue.copyTo(queue);
438             dataSize = queue.readableBytes();
439             // Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
440             padding = Math.max(padding, nextData.padding);
441             endOfStream = nextData.endOfStream;
442             return true;
443         }
444     }
445 
446     private void notifyLifecycleManagerOnError(ChannelFuture future, final ChannelHandlerContext ctx) {
447         future.addListener(new ChannelFutureListener() {
448             @Override
449             public void operationComplete(ChannelFuture future) throws Exception {
450                 Throwable cause = future.cause();
451                 if (cause != null) {
452                     lifecycleManager.onError(ctx, true, cause);
453                 }
454             }
455         });
456     }
457 
458     /**
459      * Wrap headers so they can be written subject to flow-control. While headers do not have cost against the
460      * flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is
461      * blocked on flow-control a HEADER frame must wait until this frame has been written.
462      */
463     private final class FlowControlledHeaders extends FlowControlledBase {
464         private final Http2Headers headers;
465         private final int streamDependency;
466         private final short weight;
467         private final boolean exclusive;
468 
469         FlowControlledHeaders(Http2Stream stream, Http2Headers headers, int streamDependency, short weight,
470                 boolean exclusive, int padding, boolean endOfStream, ChannelPromise promise) {
471             super(stream, padding, endOfStream, promise);
472             this.headers = headers;
473             this.streamDependency = streamDependency;
474             this.weight = weight;
475             this.exclusive = exclusive;
476         }
477 
478         @Override
479         public int size() {
480             return 0;
481         }
482 
483         @Override
484         public void error(ChannelHandlerContext ctx, Throwable cause) {
485             if (ctx != null) {
486                 lifecycleManager.onError(ctx, true, cause);
487             }
488             promise.tryFailure(cause);
489         }
490 
491         @Override
492         public void write(ChannelHandlerContext ctx, int allowedBytes) {
493             boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
494             if (promise.isVoid()) {
495                 promise = ctx.newPromise();
496             }
497             promise.addListener(this);
498 
499             ChannelFuture f = frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
500                                                        padding, endOfStream, promise);
501             // Writing headers may fail during the encode state if they violate HPACK limits.
502             Throwable failureCause = f.cause();
503             if (failureCause == null) {
504                 // This just sets internal stream state which is used elsewhere in the codec and doesn't
505                 // necessarily mean the write will complete successfully.
506                 stream.headersSent(isInformational);
507             }
508         }
509 
510         @Override
511         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
512             return false;
513         }
514     }
515 
516     /**
517      * Common base type for payloads to deliver via flow-control.
518      */
519     public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
520             ChannelFutureListener {
521         protected final Http2Stream stream;
522         protected ChannelPromise promise;
523         protected boolean endOfStream;
524         protected int padding;
525 
526         FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream,
527                 final ChannelPromise promise) {
528             if (padding < 0) {
529                 throw new IllegalArgumentException("padding must be >= 0");
530             }
531             this.padding = padding;
532             this.endOfStream = endOfStream;
533             this.stream = stream;
534             this.promise = promise;
535         }
536 
537         @Override
538         public void writeComplete() {
539             if (endOfStream) {
540                 lifecycleManager.closeStreamLocal(stream, promise);
541             }
542         }
543 
544         @Override
545         public void operationComplete(ChannelFuture future) throws Exception {
546             if (!future.isSuccess()) {
547                 error(flowController().channelHandlerContext(), future.cause());
548             }
549         }
550     }
551 }