View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
18  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
19  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
20  import static io.netty.util.internal.ObjectUtil.checkNotNull;
21  import io.netty.buffer.ByteBuf;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelFutureListener;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.util.ReferenceCountUtil;
28  
29  import java.util.ArrayDeque;
30  
31  /**
32   * Default implementation of {@link Http2ConnectionEncoder}.
33   */
34  public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
35      private final Http2FrameWriter frameWriter;
36      private final Http2Connection connection;
37      private final Http2LifecycleManager lifecycleManager;
38      // We prefer ArrayDeque to LinkedList because later will produce more GC.
39      // This initial capacity is plenty for SETTINGS traffic.
40      private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
41  
42      /**
43       * Builder for new instances of {@link DefaultHttp2ConnectionEncoder}.
44       */
45      public static class Builder implements Http2ConnectionEncoder.Builder {
46          protected Http2FrameWriter frameWriter;
47          protected Http2Connection connection;
48          protected Http2LifecycleManager lifecycleManager;
49  
50          @Override
51          public Builder connection(
52                  Http2Connection connection) {
53              this.connection = connection;
54              return this;
55          }
56  
57          @Override
58          public Builder lifecycleManager(
59                  Http2LifecycleManager lifecycleManager) {
60              this.lifecycleManager = lifecycleManager;
61              return this;
62          }
63  
64          @Override
65          public Http2LifecycleManager lifecycleManager() {
66              return lifecycleManager;
67          }
68  
69          @Override
70          public Builder frameWriter(Http2FrameWriter frameWriter) {
71              this.frameWriter = frameWriter;
72              return this;
73          }
74  
75          @Override
76          public Http2ConnectionEncoder build() {
77              return new DefaultHttp2ConnectionEncoder(this);
78          }
79      }
80  
81      public static Builder newBuilder() {
82          return new Builder();
83      }
84  
85      protected DefaultHttp2ConnectionEncoder(Builder builder) {
86          connection = checkNotNull(builder.connection, "connection");
87          frameWriter = checkNotNull(builder.frameWriter, "frameWriter");
88          lifecycleManager = checkNotNull(builder.lifecycleManager, "lifecycleManager");
89          if (connection.remote().flowController() == null) {
90              connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
91          }
92      }
93  
94      @Override
95      public Http2FrameWriter frameWriter() {
96          return frameWriter;
97      }
98  
99      @Override
100     public Http2Connection connection() {
101         return connection;
102     }
103 
104     @Override
105     public final Http2RemoteFlowController flowController() {
106         return connection().remote().flowController();
107     }
108 
109     @Override
110     public void remoteSettings(Http2Settings settings) throws Http2Exception {
111         Boolean pushEnabled = settings.pushEnabled();
112         Http2FrameWriter.Configuration config = configuration();
113         Http2HeaderTable outboundHeaderTable = config.headerTable();
114         Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
115         if (pushEnabled != null) {
116             if (!connection.isServer()) {
117                 throw connectionError(PROTOCOL_ERROR, "Client received SETTINGS frame with ENABLE_PUSH specified");
118             }
119             connection.remote().allowPushTo(pushEnabled);
120         }
121 
122         Long maxConcurrentStreams = settings.maxConcurrentStreams();
123         if (maxConcurrentStreams != null) {
124             connection.local().maxStreams((int) Math.min(maxConcurrentStreams, Integer.MAX_VALUE));
125         }
126 
127         Long headerTableSize = settings.headerTableSize();
128         if (headerTableSize != null) {
129             outboundHeaderTable.maxHeaderTableSize((int) Math.min(headerTableSize, Integer.MAX_VALUE));
130         }
131 
132         Integer maxHeaderListSize = settings.maxHeaderListSize();
133         if (maxHeaderListSize != null) {
134             outboundHeaderTable.maxHeaderListSize(maxHeaderListSize);
135         }
136 
137         Integer maxFrameSize = settings.maxFrameSize();
138         if (maxFrameSize != null) {
139             outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
140         }
141 
142         Integer initialWindowSize = settings.initialWindowSize();
143         if (initialWindowSize != null) {
144             flowController().initialWindowSize(initialWindowSize);
145         }
146     }
147 
148     @Override
149     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
150             final boolean endOfStream, ChannelPromise promise) {
151         final Http2Stream stream;
152         try {
153             if (connection.isGoAway()) {
154                 throw new IllegalStateException("Sending data after connection going away.");
155             }
156 
157             stream = connection.requireStream(streamId);
158 
159             // Verify that the stream is in the appropriate state for sending DATA frames.
160             switch (stream.state()) {
161                 case OPEN:
162                 case HALF_CLOSED_REMOTE:
163                     // Allowed sending DATA frames in these states.
164                     break;
165                 default:
166                     throw new IllegalStateException(String.format(
167                             "Stream %d in unexpected state: %s", stream.id(), stream.state()));
168             }
169 
170             if (endOfStream) {
171                 lifecycleManager.closeLocalSide(stream, promise);
172             }
173         } catch (Throwable e) {
174             data.release();
175             return promise.setFailure(e);
176         }
177 
178         // Hand control of the frame to the flow controller.
179         flowController().sendFlowControlled(ctx, stream,
180                 new FlowControlledData(ctx, stream, data, padding, endOfStream, promise));
181         return promise;
182     }
183 
184     @Override
185     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
186             boolean endStream, ChannelPromise promise) {
187         return writeHeaders(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endStream, promise);
188     }
189 
190     @Override
191     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId,
192             final Http2Headers headers, final int streamDependency, final short weight,
193             final boolean exclusive, final int padding, final boolean endOfStream,
194             final ChannelPromise promise) {
195         try {
196             if (connection.isGoAway()) {
197                 throw connectionError(PROTOCOL_ERROR, "Sending headers after connection going away.");
198             }
199             Http2Stream stream = connection.stream(streamId);
200             if (stream == null) {
201                 stream = connection.createLocalStream(streamId);
202             }
203 
204             switch (stream.state()) {
205                 case RESERVED_LOCAL:
206                 case IDLE:
207                     stream.open(endOfStream);
208                     break;
209                 case OPEN:
210                 case HALF_CLOSED_REMOTE:
211                     // Allowed sending headers in these states.
212                     break;
213                 default:
214                     throw new IllegalStateException(String.format(
215                             "Stream %d in unexpected state: %s", stream.id(), stream.state()));
216             }
217 
218             // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
219             flowController().sendFlowControlled(ctx, stream,
220                     new FlowControlledHeaders(ctx, stream, headers, streamDependency, weight,
221                             exclusive, padding, endOfStream, promise));
222             if (endOfStream) {
223                 lifecycleManager.closeLocalSide(stream, promise);
224             }
225             return promise;
226         } catch (Http2NoMoreStreamIdsException e) {
227             lifecycleManager.onException(ctx, e);
228             return promise.setFailure(e);
229         } catch (Throwable e) {
230             return promise.setFailure(e);
231         }
232     }
233 
234     @Override
235     public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
236             boolean exclusive, ChannelPromise promise) {
237         try {
238             if (connection.isGoAway()) {
239                 throw connectionError(PROTOCOL_ERROR, "Sending priority after connection going away.");
240             }
241 
242             // Update the priority on this stream.
243             Http2Stream stream = connection.stream(streamId);
244             if (stream == null) {
245                 stream = connection.createLocalStream(streamId);
246             }
247 
248             stream.setPriority(streamDependency, weight, exclusive);
249         } catch (Throwable e) {
250             return promise.setFailure(e);
251         }
252 
253         ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
254         ctx.flush();
255         return future;
256     }
257 
258     @Override
259     public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
260             ChannelPromise promise) {
261         // Delegate to the lifecycle manager for proper updating of connection state.
262         return lifecycleManager.writeRstStream(ctx, streamId, errorCode, promise);
263     }
264 
265     /**
266      * Writes a RST_STREAM frame to the remote endpoint.
267      * @param ctx the context to use for writing.
268      * @param streamId the stream for which to send the frame.
269      * @param errorCode the error code indicating the nature of the failure.
270      * @param promise the promise for the write.
271      * @param writeIfNoStream
272      * <ul>
273      * <li>{@code true} will force a write of a RST_STREAM even if the stream object does not exist locally.</li>
274      * <li>{@code false} will only send a RST_STREAM only if the stream is known about locally</li>
275      * </ul>
276      * @return the future for the write.
277      */
278     public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
279             ChannelPromise promise, boolean writeIfNoStream) {
280         Http2Stream stream = connection.stream(streamId);
281         if (stream == null && !writeIfNoStream) {
282             // The stream may already have been closed ... ignore.
283             promise.setSuccess();
284             return promise;
285         }
286 
287         ChannelFuture future = frameWriter.writeRstStream(ctx, streamId, errorCode, promise);
288         ctx.flush();
289 
290         if (stream != null) {
291             stream.resetSent();
292             lifecycleManager.closeStream(stream, promise);
293         }
294 
295         return future;
296     }
297 
298     @Override
299     public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
300             ChannelPromise promise) {
301         outstandingLocalSettingsQueue.add(settings);
302         try {
303             if (connection.isGoAway()) {
304                 throw connectionError(PROTOCOL_ERROR, "Sending settings after connection going away.");
305             }
306 
307             Boolean pushEnabled = settings.pushEnabled();
308             if (pushEnabled != null && connection.isServer()) {
309                 throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
310             }
311         } catch (Throwable e) {
312             return promise.setFailure(e);
313         }
314 
315         ChannelFuture future = frameWriter.writeSettings(ctx, settings, promise);
316         ctx.flush();
317         return future;
318     }
319 
320     @Override
321     public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
322         ChannelFuture future = frameWriter.writeSettingsAck(ctx, promise);
323         ctx.flush();
324         return future;
325     }
326 
327     @Override
328     public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data,
329             ChannelPromise promise) {
330         if (connection.isGoAway()) {
331             data.release();
332             return promise.setFailure(connectionError(PROTOCOL_ERROR, "Sending ping after connection going away."));
333         }
334 
335         ChannelFuture future = frameWriter.writePing(ctx, ack, data, promise);
336         ctx.flush();
337         return future;
338     }
339 
340     @Override
341     public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
342             Http2Headers headers, int padding, ChannelPromise promise) {
343         try {
344             if (connection.isGoAway()) {
345                 throw connectionError(PROTOCOL_ERROR, "Sending push promise after connection going away.");
346             }
347 
348             // Reserve the promised stream.
349             Http2Stream stream = connection.requireStream(streamId);
350             connection.local().reservePushStream(promisedStreamId, stream);
351         } catch (Throwable e) {
352             return promise.setFailure(e);
353         }
354 
355         ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
356         ctx.flush();
357         return future;
358     }
359 
360     @Override
361     public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
362             ChannelPromise promise) {
363         return lifecycleManager.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
364     }
365 
366     @Override
367     public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
368             ChannelPromise promise) {
369         return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
370                 " objects to control window sizes"));
371     }
372 
373     @Override
374     public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
375             ByteBuf payload, ChannelPromise promise) {
376         return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise);
377     }
378 
379     @Override
380     public void close() {
381         frameWriter.close();
382     }
383 
384     @Override
385     public Http2Settings pollSentSettings() {
386         return outstandingLocalSettingsQueue.poll();
387     }
388 
389     @Override
390     public Configuration configuration() {
391         return frameWriter.configuration();
392     }
393 
394     /**
395      * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
396      * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
397      * {@link #size} calculation deterministic thereby greatly simplifying the implementation.
398      * <p>
399      * If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that
400      * the passed promise is not completed until last frame write.
401      * </p>
402      */
403     private final class FlowControlledData extends FlowControlledBase {
404         private ByteBuf data;
405         private int size;
406 
407         private FlowControlledData(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding,
408                                     boolean endOfStream, ChannelPromise promise) {
409             super(ctx, stream, padding, endOfStream, promise);
410             this.data = data;
411             size = data.readableBytes() + padding;
412         }
413 
414         @Override
415         public int size() {
416             return size;
417         }
418 
419         @Override
420         public void error(Throwable cause) {
421             ReferenceCountUtil.safeRelease(data);
422             lifecycleManager.onException(ctx, cause);
423             data = null;
424             size = 0;
425             promise.tryFailure(cause);
426         }
427 
428         @Override
429         public boolean write(int allowedBytes) {
430             if (data == null) {
431                 return false;
432             }
433             if (allowedBytes == 0 && size() != 0) {
434                 // No point writing an empty DATA frame, wait for a bigger allowance.
435                 return false;
436             }
437             int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize();
438             try {
439                 int bytesWritten = 0;
440                 do {
441                     int allowedFrameSize = Math.min(maxFrameSize, allowedBytes - bytesWritten);
442                     ByteBuf toWrite;
443                     // Let data consume the frame before padding.
444                     int writeableData = data.readableBytes();
445                     if (writeableData > allowedFrameSize) {
446                         writeableData = allowedFrameSize;
447                         toWrite = data.readSlice(writeableData).retain();
448                     } else {
449                         // We're going to write the full buffer which will cause it to be released, for subsequent
450                         // writes just use empty buffer to avoid over-releasing. Have to use an empty buffer
451                         // as we may continue to write padding in subsequent frames.
452                         toWrite = data;
453                         data = Unpooled.EMPTY_BUFFER;
454                     }
455                     int writeablePadding = Math.min(allowedFrameSize - writeableData, padding);
456                     padding -= writeablePadding;
457                     bytesWritten += writeableData + writeablePadding;
458                     ChannelPromise writePromise;
459                     if (size == bytesWritten) {
460                         // Can use the original promise if it's the last write
461                         writePromise = promise;
462                     } else {
463                         // Create a new promise and listen to it for failure
464                         writePromise = ctx.newPromise();
465                         writePromise.addListener(this);
466                     }
467                     frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
468                             size == bytesWritten && endOfStream, writePromise);
469                 } while (size != bytesWritten && allowedBytes > bytesWritten);
470                 size -= bytesWritten;
471                 return true;
472             } catch (Throwable e) {
473                 error(e);
474                 return false;
475             }
476         }
477     }
478 
479     /**
480      * Wrap headers so they can be written subject to flow-control. While headers do not have cost against the
481      * flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is
482      * blocked on flow-control a HEADER frame must wait until this frame has been written.
483      */
484     private final class FlowControlledHeaders extends FlowControlledBase {
485 
486         private final Http2Headers headers;
487         private final int streamDependency;
488         private final short weight;
489         private final boolean exclusive;
490 
491         private FlowControlledHeaders(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
492                 int streamDependency, short weight, boolean exclusive, int padding,
493                 boolean endOfStream, ChannelPromise promise) {
494             super(ctx, stream, padding, endOfStream, promise);
495             this.headers = headers;
496             this.streamDependency = streamDependency;
497             this.weight = weight;
498             this.exclusive = exclusive;
499         }
500 
501         @Override
502         public int size() {
503             return 0;
504         }
505 
506         @Override
507         public void error(Throwable cause) {
508             lifecycleManager.onException(ctx, cause);
509             promise.tryFailure(cause);
510         }
511 
512         @Override
513         public boolean write(int allowedBytes) {
514             frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
515                     padding, endOfStream, promise);
516             return true;
517         }
518     }
519 
520     /**
521      * Common base type for payloads to deliver via flow-control.
522      */
523     public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
524             ChannelFutureListener {
525         protected final ChannelHandlerContext ctx;
526         protected final Http2Stream stream;
527         protected final ChannelPromise promise;
528         protected final boolean endOfStream;
529         protected int padding;
530 
531         public FlowControlledBase(final ChannelHandlerContext ctx, final Http2Stream stream, int padding,
532                                   boolean endOfStream, final ChannelPromise promise) {
533             this.ctx = ctx;
534             if (padding < 0) {
535                 throw new IllegalArgumentException("padding must be >= 0");
536             }
537             this.padding = padding;
538             this.endOfStream = endOfStream;
539             this.stream = stream;
540             this.promise = promise;
541             promise.addListener(this);
542         }
543 
544         @Override
545         public void operationComplete(ChannelFuture future) throws Exception {
546             if (!future.isSuccess()) {
547                 error(future.cause());
548             }
549         }
550     }
551 }