View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http3;
17  
18  import io.netty.channel.ChannelHandler;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.channel.ChannelInboundHandler;
21  import io.netty.channel.ChannelInboundHandlerAdapter;
22  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
23  import io.netty.handler.codec.quic.QuicChannel;
24  import io.netty.handler.codec.quic.QuicStreamChannel;
25  import io.netty.handler.codec.quic.QuicStreamChannelBootstrap;
26  import io.netty.handler.codec.quic.QuicStreamType;
27  import io.netty.util.ReferenceCountUtil;
28  import io.netty.util.concurrent.Future;
29  import io.netty.util.concurrent.Promise;
30  import org.jetbrains.annotations.Nullable;
31  
32  import java.util.concurrent.ConcurrentMap;
33  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
34  import java.util.function.UnaryOperator;
35  
36  import static io.netty.handler.codec.http3.Http3.maxPushIdReceived;
37  import static io.netty.handler.codec.http3.Http3CodecUtils.connectionError;
38  import static io.netty.handler.codec.http3.Http3ErrorCode.H3_ID_ERROR;
39  import static io.netty.util.internal.PlatformDependent.newConcurrentHashMap;
40  import static java.util.Objects.requireNonNull;
41  import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
42  
43  /**
44   * A manager for <a href="https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-push-streams">push streams</a>
45   * for a server. New push streams can be initiated using the various {@code newPushStream} methods. It is required to
46   * add the {@link ChannelHandler} returned from {@link #controlStreamListener()} to the {@link QuicChannel} associated
47   * with this manager.
48   */
49  public final class Http3ServerPushStreamManager {
50      private static final AtomicLongFieldUpdater<Http3ServerPushStreamManager> nextIdUpdater =
51              newUpdater(Http3ServerPushStreamManager.class, "nextId");
52      private static final Object CANCELLED_STREAM = new Object();
53      private static final Object PUSH_ID_GENERATED = new Object();
54      private static final Object AWAITING_STREAM_ESTABLISHMENT = new Object();
55  
56      private final QuicChannel channel;
57      private final ConcurrentMap<Long, Object> pushStreams;
58      private final ChannelInboundHandler controlStreamListener;
59  
60      private volatile long nextId;
61  
62      /**
63       * Creates a new instance.
64       *
65       * @param channel for which this manager is created.
66       */
67      public Http3ServerPushStreamManager(QuicChannel channel) {
68          this(channel, 8);
69      }
70  
71      /**
72       * Creates a new instance.
73       *
74       * @param channel for which this manager is created.
75       * @param initialPushStreamsCountHint a hint for the number of push streams that may be created.
76       */
77      public Http3ServerPushStreamManager(QuicChannel channel, int initialPushStreamsCountHint) {
78          this.channel = requireNonNull(channel, "channel");
79          pushStreams = newConcurrentHashMap(initialPushStreamsCountHint);
80          controlStreamListener = new ChannelInboundHandlerAdapter() {
81              @Override
82              public void channelRead(ChannelHandlerContext ctx, Object msg) {
83                  if (msg instanceof Http3CancelPushFrame) {
84                      final long pushId = ((Http3CancelPushFrame) msg).id();
85                      if (pushId >= nextId) {
86                          connectionError(ctx, H3_ID_ERROR, "CANCEL_PUSH id greater than the last known id", true);
87                          return;
88                      }
89  
90                      pushStreams.computeIfPresent(pushId, (id, existing) -> {
91                          if (existing == AWAITING_STREAM_ESTABLISHMENT) {
92                              return CANCELLED_STREAM;
93                          }
94                          if (existing == PUSH_ID_GENERATED) {
95                              throw new IllegalStateException("Unexpected push stream state " + existing +
96                                      " for pushId: " + id);
97                          }
98                          assert existing instanceof QuicStreamChannel;
99                          ((QuicStreamChannel) existing).close();
100                         // remove the push stream from the map.
101                         return null;
102                     });
103                 }
104                 ReferenceCountUtil.release(msg);
105             }
106         };
107     }
108 
109     /**
110      * Returns {@code true} if server push is allowed at this point.
111      *
112      * @return {@code true} if server push is allowed at this point.
113      */
114     public boolean isPushAllowed() {
115         return isPushAllowed(maxPushIdReceived(channel));
116     }
117 
118     /**
119      * Reserves a push ID to be used to create a new push stream subsequently. A push ID can only be used to create
120      * exactly one push stream.
121      *
122      * @return Next push ID.
123      * @throws IllegalStateException If it is not allowed to create any more push streams on the associated
124      * {@link QuicChannel}. Use {@link #isPushAllowed()} to check if server push is allowed.
125      */
126     public long reserveNextPushId() {
127         final long maxPushId = maxPushIdReceived(channel);
128         if (isPushAllowed(maxPushId)) {
129             return nextPushId();
130         }
131         throw new IllegalStateException("MAX allowed push ID: " + maxPushId + ", next push ID: " + nextId);
132     }
133 
134     /**
135      * Returns a new HTTP/3 push-stream that will use the given {@link ChannelHandler}
136      * to dispatch {@link Http3PushStreamFrame}s too. The needed HTTP/3 codecs are automatically added to the
137      * pipeline as well.
138      *
139      * @param pushId for the push stream. This MUST be obtained using {@link #reserveNextPushId()}.
140      * @param handler the {@link ChannelHandler} to add. Can be {@code null}.
141      * @return the {@link Future} that will be notified once the push-stream was opened.
142      */
143     public Future<QuicStreamChannel> newPushStream(long pushId, @Nullable ChannelHandler handler) {
144         final Promise<QuicStreamChannel> promise = channel.eventLoop().newPromise();
145         newPushStream(pushId, handler, promise);
146         return promise;
147     }
148 
149     /**
150      * Returns a new HTTP/3 push-stream that will use the given {@link ChannelHandler}
151      * to dispatch {@link Http3PushStreamFrame}s too. The needed HTTP/3 codecs are automatically added to the
152      * pipeline as well.
153      *
154      * @param pushId for the push stream. This MUST be obtained using {@link #reserveNextPushId()}.
155      * @param handler the {@link ChannelHandler} to add. Can be {@code null}.
156      * @param promise to indicate creation of the push stream.
157      */
158     public void newPushStream(long pushId, @Nullable ChannelHandler handler, Promise<QuicStreamChannel> promise) {
159         validatePushId(pushId);
160         channel.createStream(QuicStreamType.UNIDIRECTIONAL, pushStreamInitializer(pushId, handler), promise);
161         setupCancelPushIfStreamCreationFails(pushId, promise, channel);
162     }
163 
164     /**
165      * Returns a new HTTP/3 push-stream that will use the given {@link ChannelHandler}
166      * to dispatch {@link Http3PushStreamFrame}s too. The needed HTTP/3 codecs are automatically added to the
167      * pipeline as well.
168      *
169      * @param pushId for the push stream. This MUST be obtained using {@link #reserveNextPushId()}.
170      * @param handler the {@link ChannelHandler} to add. Can be {@code null}.
171      * @param bootstrapConfigurator {@link UnaryOperator} to configure the {@link QuicStreamChannelBootstrap} used.
172      * @param promise to indicate creation of the push stream.
173      */
174     public void newPushStream(long pushId, @Nullable ChannelHandler handler,
175                               UnaryOperator<QuicStreamChannelBootstrap> bootstrapConfigurator,
176                               Promise<QuicStreamChannel> promise) {
177         validatePushId(pushId);
178         QuicStreamChannelBootstrap bootstrap = bootstrapConfigurator.apply(channel.newStreamBootstrap());
179         bootstrap.type(QuicStreamType.UNIDIRECTIONAL)
180                 .handler(pushStreamInitializer(pushId, handler))
181                 .create(promise);
182         setupCancelPushIfStreamCreationFails(pushId, promise, channel);
183     }
184 
185     /**
186      * A {@link ChannelInboundHandler} to be added to the {@link QuicChannel} associated with this
187      * {@link Http3ServerPushStreamManager} to listen to control stream frames.
188      *
189      * @return {@link ChannelInboundHandler} to be added to the {@link QuicChannel} associated with this
190      * {@link Http3ServerPushStreamManager} to listen to control stream frames.
191      */
192     public ChannelInboundHandler controlStreamListener() {
193         return controlStreamListener;
194     }
195 
196     private boolean isPushAllowed(long maxPushId) {
197         return nextId <= maxPushId;
198     }
199 
200     private long nextPushId() {
201         final long pushId = nextIdUpdater.getAndIncrement(this);
202         pushStreams.put(pushId, PUSH_ID_GENERATED);
203         return pushId;
204     }
205 
206     private void validatePushId(long pushId) {
207         if (!pushStreams.replace(pushId, PUSH_ID_GENERATED, AWAITING_STREAM_ESTABLISHMENT)) {
208             throw new IllegalArgumentException("Unknown push ID: " + pushId);
209         }
210     }
211 
212     private Http3PushStreamServerInitializer pushStreamInitializer(long pushId, @Nullable ChannelHandler handler) {
213         final Http3PushStreamServerInitializer initializer;
214         if (handler instanceof Http3PushStreamServerInitializer) {
215             initializer = (Http3PushStreamServerInitializer) handler;
216         } else {
217             initializer = null;
218         }
219         return new Http3PushStreamServerInitializer(pushId) {
220             @Override
221             protected void initPushStream(QuicStreamChannel ch) {
222                 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
223                     private boolean stateUpdated;
224 
225                     @Override
226                     public void channelActive(ChannelHandlerContext ctx) {
227                         if (!stateUpdated) {
228                             updatePushStreamsMap();
229                         }
230                     }
231 
232                     @Override
233                     public void handlerAdded(ChannelHandlerContext ctx) {
234                         if (!stateUpdated && ctx.channel().isActive()) {
235                             updatePushStreamsMap();
236                         }
237                     }
238 
239                     private void updatePushStreamsMap() {
240                         assert !stateUpdated;
241                         stateUpdated = true;
242                         pushStreams.compute(pushId, (id, existing) -> {
243                             if (existing == AWAITING_STREAM_ESTABLISHMENT) {
244                                 return ch;
245                             }
246                             if (existing == CANCELLED_STREAM) {
247                                 ch.close();
248                                 return null; // remove push stream.
249                             }
250                             throw new IllegalStateException("Unexpected push stream state " +
251                                     existing + " for pushId: " + id);
252                         });
253                     }
254 
255                     @Override
256                     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
257                         if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
258                             pushStreams.remove(pushId);
259                         }
260                         ctx.fireUserEventTriggered(evt);
261                     }
262                 });
263                 if (initializer != null) {
264                     initializer.initPushStream(ch);
265                 } else if (handler != null) {
266                     ch.pipeline().addLast(handler);
267                 }
268             }
269         };
270     }
271 
272     private static void setupCancelPushIfStreamCreationFails(long pushId, Future<QuicStreamChannel> future,
273                                                              QuicChannel channel) {
274         if (future.isDone()) {
275             sendCancelPushIfFailed(future, pushId, channel);
276         } else {
277             future.addListener(f -> sendCancelPushIfFailed(future, pushId, channel));
278         }
279     }
280 
281     private static void sendCancelPushIfFailed(Future<QuicStreamChannel> future, long pushId, QuicChannel channel) {
282         // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-cancel_push
283         // If we can not establish the stream, we can not send the promised push response, so send a CANCEL_PUSH
284         if (!future.isSuccess()) {
285             final QuicStreamChannel localControlStream = Http3.getLocalControlStream(channel);
286             assert localControlStream != null;
287             localControlStream.writeAndFlush(new DefaultHttp3CancelPushFrame(pushId));
288         }
289     }
290 }