View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.ReferenceCounted;
24  
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.List;
28  
29  import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
30  import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
31  import static io.netty.util.AsciiString.containsAllContentEqualsIgnoreCase;
32  import static io.netty.util.AsciiString.containsContentEqualsIgnoreCase;
33  import static io.netty.util.internal.ObjectUtil.checkNotNull;
34  import static io.netty.util.internal.StringUtil.COMMA;
35  
36  /**
37   * A server-side handler that receives HTTP requests and optionally performs a protocol switch if
38   * the requested protocol is supported. Once an upgrade is performed, this handler removes itself
39   * from the pipeline.
40   */
41  public class HttpServerUpgradeHandler extends HttpObjectAggregator {
42  
43      /**
44       * The source codec that is used in the pipeline initially.
45       */
46      public interface SourceCodec {
47          /**
48           * Removes this codec (i.e. all associated handlers) from the pipeline.
49           */
50          void upgradeFrom(ChannelHandlerContext ctx);
51      }
52  
53      /**
54       * A codec that the source can be upgraded to.
55       */
56      public interface UpgradeCodec {
57          /**
58           * Gets all protocol-specific headers required by this protocol for a successful upgrade.
59           * Any supplied header will be required to appear in the {@link HttpHeaderNames#CONNECTION} header as well.
60           */
61          Collection<CharSequence> requiredUpgradeHeaders();
62  
63          /**
64           * Prepares the {@code upgradeHeaders} for a protocol update based upon the contents of {@code upgradeRequest}.
65           * This method returns a boolean value to proceed or abort the upgrade in progress. If {@code false} is
66           * returned, the upgrade is aborted and the {@code upgradeRequest} will be passed through the inbound pipeline
67           * as if no upgrade was performed. If {@code true} is returned, the upgrade will proceed to the next
68           * step which invokes {@link #upgradeTo}. When returning {@code true}, you can add headers to
69           * the {@code upgradeHeaders} so that they are added to the 101 Switching protocols response.
70           */
71          boolean prepareUpgradeResponse(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
72                                      HttpHeaders upgradeHeaders);
73  
74          /**
75           * Performs an HTTP protocol upgrade from the source codec. This method is responsible for
76           * adding all handlers required for the new protocol.
77           *
78           * @param ctx the context for the current handler.
79           * @param upgradeRequest the request that triggered the upgrade to this protocol.
80           */
81          void upgradeTo(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest);
82      }
83  
84      /**
85       * Creates a new {@link UpgradeCodec} for the requested protocol name.
86       */
87      public interface UpgradeCodecFactory {
88          /**
89           * Invoked by {@link HttpServerUpgradeHandler} for all the requested protocol names in the order of
90           * the client preference. The first non-{@code null} {@link UpgradeCodec} returned by this method
91           * will be selected.
92           *
93           * @return a new {@link UpgradeCodec}, or {@code null} if the specified protocol name is not supported
94           */
95          UpgradeCodec newUpgradeCodec(CharSequence protocol);
96      }
97  
98      /**
99       * User event that is fired to notify about the completion of an HTTP upgrade
100      * to another protocol. Contains the original upgrade request so that the response
101      * (if required) can be sent using the new protocol.
102      */
103     public static final class UpgradeEvent implements ReferenceCounted {
104         private final CharSequence protocol;
105         private final FullHttpRequest upgradeRequest;
106 
107         UpgradeEvent(CharSequence protocol, FullHttpRequest upgradeRequest) {
108             this.protocol = protocol;
109             this.upgradeRequest = upgradeRequest;
110         }
111 
112         /**
113          * The protocol that the channel has been upgraded to.
114          */
115         public CharSequence protocol() {
116             return protocol;
117         }
118 
119         /**
120          * Gets the request that triggered the protocol upgrade.
121          */
122         public FullHttpRequest upgradeRequest() {
123             return upgradeRequest;
124         }
125 
126         @Override
127         public int refCnt() {
128             return upgradeRequest.refCnt();
129         }
130 
131         @Override
132         public UpgradeEvent retain() {
133             upgradeRequest.retain();
134             return this;
135         }
136 
137         @Override
138         public UpgradeEvent retain(int increment) {
139             upgradeRequest.retain(increment);
140             return this;
141         }
142 
143         @Override
144         public UpgradeEvent touch() {
145             upgradeRequest.touch();
146             return this;
147         }
148 
149         @Override
150         public UpgradeEvent touch(Object hint) {
151             upgradeRequest.touch(hint);
152             return this;
153         }
154 
155         @Override
156         public boolean release() {
157             return upgradeRequest.release();
158         }
159 
160         @Override
161         public boolean release(int decrement) {
162             return upgradeRequest.release(decrement);
163         }
164 
165         @Override
166         public String toString() {
167             return "UpgradeEvent [protocol=" + protocol + ", upgradeRequest=" + upgradeRequest + ']';
168         }
169     }
170 
171     private final SourceCodec sourceCodec;
172     private final UpgradeCodecFactory upgradeCodecFactory;
173     private final HttpHeadersFactory headersFactory;
174     private final HttpHeadersFactory trailersFactory;
175     private boolean handlingUpgrade;
176     private boolean failedAggregationStart;
177 
178     /**
179      * Constructs the upgrader with the supported codecs.
180      * <p>
181      * The handler instantiated by this constructor will reject an upgrade request with non-empty content.
182      * It should not be a concern because an upgrade request is most likely a GET request.
183      * If you have a client that sends a non-GET upgrade request, please consider using
184      * {@link #HttpServerUpgradeHandler(SourceCodec, UpgradeCodecFactory, int)} to specify the maximum
185      * length of the content of an upgrade request.
186      * </p>
187      *
188      * @param sourceCodec the codec that is being used initially
189      * @param upgradeCodecFactory the factory that creates a new upgrade codec
190      *                            for one of the requested upgrade protocols
191      */
192     public HttpServerUpgradeHandler(SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory) {
193         this(sourceCodec, upgradeCodecFactory, 0,
194                 DefaultHttpHeadersFactory.headersFactory(), DefaultHttpHeadersFactory.trailersFactory());
195     }
196 
197     /**
198      * Constructs the upgrader with the supported codecs.
199      *
200      * @param sourceCodec the codec that is being used initially
201      * @param upgradeCodecFactory the factory that creates a new upgrade codec
202      *                            for one of the requested upgrade protocols
203      * @param maxContentLength the maximum length of the content of an upgrade request
204      */
205     public HttpServerUpgradeHandler(
206             SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory, int maxContentLength) {
207         this(sourceCodec, upgradeCodecFactory, maxContentLength,
208                 DefaultHttpHeadersFactory.headersFactory(), DefaultHttpHeadersFactory.trailersFactory());
209     }
210 
211     /**
212      * Constructs the upgrader with the supported codecs.
213      *
214      * @param sourceCodec the codec that is being used initially
215      * @param upgradeCodecFactory the factory that creates a new upgrade codec
216      *                            for one of the requested upgrade protocols
217      * @param maxContentLength the maximum length of the content of an upgrade request
218      * @param validateHeaders validate the header names and values of the upgrade response.
219      */
220     public HttpServerUpgradeHandler(SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory,
221                                     int maxContentLength, boolean validateHeaders) {
222         this(sourceCodec, upgradeCodecFactory, maxContentLength,
223                 DefaultHttpHeadersFactory.headersFactory().withValidation(validateHeaders),
224                 DefaultHttpHeadersFactory.trailersFactory().withValidation(validateHeaders));
225     }
226 
227     /**
228      * Constructs the upgrader with the supported codecs.
229      *
230      * @param sourceCodec the codec that is being used initially
231      * @param upgradeCodecFactory the factory that creates a new upgrade codec
232      *                            for one of the requested upgrade protocols
233      * @param maxContentLength the maximum length of the content of an upgrade request
234      * @param headersFactory The {@link HttpHeadersFactory} to use for headers.
235      * The recommended default factory is {@link DefaultHttpHeadersFactory#headersFactory()}.
236      * @param trailersFactory The {@link HttpHeadersFactory} to use for trailers.
237      * The recommended default factory is {@link DefaultHttpHeadersFactory#trailersFactory()}.
238      */
239     public HttpServerUpgradeHandler(
240             SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory, int maxContentLength,
241             HttpHeadersFactory headersFactory, HttpHeadersFactory trailersFactory) {
242         super(maxContentLength);
243 
244         this.sourceCodec = checkNotNull(sourceCodec, "sourceCodec");
245         this.upgradeCodecFactory = checkNotNull(upgradeCodecFactory, "upgradeCodecFactory");
246         this.headersFactory = checkNotNull(headersFactory, "headersFactory");
247         this.trailersFactory = checkNotNull(trailersFactory, "trailersFactory");
248     }
249 
250     @Override
251     protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
252             throws Exception {
253 
254         if (!handlingUpgrade) {
255             // Not handling an upgrade request yet. Check if we received a new upgrade request.
256             if (msg instanceof HttpRequest) {
257                 HttpRequest req = (HttpRequest) msg;
258                 if (req.headers().contains(HttpHeaderNames.UPGRADE) &&
259                     shouldHandleUpgradeRequest(req)) {
260                     handlingUpgrade = true;
261                     failedAggregationStart = true; // reset if beginAggregation is called
262                 } else {
263                     ReferenceCountUtil.retain(msg);
264                     ctx.fireChannelRead(msg);
265                     return;
266                 }
267             } else {
268                 ReferenceCountUtil.retain(msg);
269                 ctx.fireChannelRead(msg);
270                 return;
271             }
272         }
273 
274         FullHttpRequest fullRequest;
275         if (msg instanceof FullHttpRequest) {
276             fullRequest = (FullHttpRequest) msg;
277             ReferenceCountUtil.retain(msg);
278             out.add(msg);
279         } else {
280             // Call the base class to handle the aggregation of the full request.
281             super.decode(ctx, msg, out);
282             if (out.isEmpty()) {
283                 if (msg instanceof LastHttpContent || failedAggregationStart) {
284                     // request failed to aggregate, try with the next request
285                     handlingUpgrade = false;
286                     releaseCurrentMessage();
287                 }
288 
289                 // The full request hasn't been created yet, still awaiting more data.
290                 return;
291             }
292 
293             // Finished aggregating the full request, get it from the output list.
294             assert out.size() == 1;
295             handlingUpgrade = false;
296             fullRequest = (FullHttpRequest) out.get(0);
297         }
298 
299         if (upgrade(ctx, fullRequest)) {
300             // The upgrade was successful, remove the message from the output list
301             // so that it's not propagated to the next handler. This request will
302             // be propagated as a user event instead.
303             out.clear();
304         }
305 
306         // The upgrade did not succeed, just allow the full request to propagate to the
307         // next handler.
308     }
309 
310     @Override
311     protected FullHttpMessage beginAggregation(HttpMessage start, ByteBuf content) throws Exception {
312         failedAggregationStart = false;
313         return super.beginAggregation(start, content);
314     }
315 
316     /**
317      * Determines whether the specified upgrade {@link HttpRequest} should be handled by this handler or not.
318      * This method will be invoked only when the request contains an {@code Upgrade} header.
319      * It always returns {@code true} by default, which means any request with an {@code Upgrade} header
320      * will be handled. You can override this method to ignore certain {@code Upgrade} headers, for example:
321      * <pre>{@code
322      * @Override
323      * protected boolean isUpgradeRequest(HttpRequest req) {
324      *   // Do not handle WebSocket upgrades.
325      *   return !req.headers().contains(HttpHeaderNames.UPGRADE, "websocket", false);
326      * }
327      * }</pre>
328      */
329     protected boolean shouldHandleUpgradeRequest(HttpRequest req) {
330         return true;
331     }
332 
333     /**
334      * Attempts to upgrade to the protocol(s) identified by the {@link HttpHeaderNames#UPGRADE} header (if provided
335      * in the request).
336      *
337      * @param ctx the context for this handler.
338      * @param request the HTTP request.
339      * @return {@code true} if the upgrade occurred, otherwise {@code false}.
340      */
341     private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) {
342         // Select the best protocol based on those requested in the UPGRADE header.
343         final List<CharSequence> requestedProtocols = splitHeader(request.headers().get(HttpHeaderNames.UPGRADE));
344         final int numRequestedProtocols = requestedProtocols.size();
345         UpgradeCodec upgradeCodec = null;
346         CharSequence upgradeProtocol = null;
347         for (int i = 0; i < numRequestedProtocols; i ++) {
348             final CharSequence p = requestedProtocols.get(i);
349             final UpgradeCodec c = upgradeCodecFactory.newUpgradeCodec(p);
350             if (c != null) {
351                 upgradeProtocol = p;
352                 upgradeCodec = c;
353                 break;
354             }
355         }
356 
357         if (upgradeCodec == null) {
358             // None of the requested protocols are supported, don't upgrade.
359             return false;
360         }
361 
362         // Make sure the CONNECTION header is present.
363         List<String> connectionHeaderValues = request.headers().getAll(HttpHeaderNames.CONNECTION);
364 
365         if (connectionHeaderValues == null || connectionHeaderValues.isEmpty()) {
366             return false;
367         }
368 
369         final StringBuilder concatenatedConnectionValue = new StringBuilder(connectionHeaderValues.size() * 10);
370         for (CharSequence connectionHeaderValue : connectionHeaderValues) {
371             concatenatedConnectionValue.append(connectionHeaderValue).append(COMMA);
372         }
373         concatenatedConnectionValue.setLength(concatenatedConnectionValue.length() - 1);
374 
375         // Make sure the CONNECTION header contains UPGRADE as well as all protocol-specific headers.
376         Collection<CharSequence> requiredHeaders = upgradeCodec.requiredUpgradeHeaders();
377         List<CharSequence> values = splitHeader(concatenatedConnectionValue);
378         if (!containsContentEqualsIgnoreCase(values, HttpHeaderNames.UPGRADE) ||
379                 !containsAllContentEqualsIgnoreCase(values, requiredHeaders)) {
380             return false;
381         }
382 
383         // Ensure that all required protocol-specific headers are found in the request.
384         for (CharSequence requiredHeader : requiredHeaders) {
385             if (!request.headers().contains(requiredHeader)) {
386                 return false;
387             }
388         }
389 
390         // Prepare and send the upgrade response. Wait for this write to complete before upgrading,
391         // since we need the old codec in-place to properly encode the response.
392         final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeProtocol);
393         if (!upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse.headers())) {
394             return false;
395         }
396 
397         // Create the user event to be fired once the upgrade completes.
398         final UpgradeEvent event = new UpgradeEvent(upgradeProtocol, request);
399 
400         // After writing the upgrade response we immediately prepare the
401         // pipeline for the next protocol to avoid a race between completion
402         // of the write future and receiving data before the pipeline is
403         // restructured.
404         try {
405             final ChannelFuture writeComplete = ctx.writeAndFlush(upgradeResponse);
406             // Perform the upgrade to the new protocol.
407             sourceCodec.upgradeFrom(ctx);
408             upgradeCodec.upgradeTo(ctx, request);
409 
410             // Remove this handler from the pipeline.
411             ctx.pipeline().remove(HttpServerUpgradeHandler.this);
412 
413             // Notify that the upgrade has occurred. Retain the event to offset
414             // the release() in the finally block.
415             ctx.fireUserEventTriggered(event.retain());
416 
417             // Add the listener last to avoid firing upgrade logic after
418             // the channel is already closed since the listener may fire
419             // immediately if the write failed eagerly.
420             writeComplete.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
421         } finally {
422             // Release the event if the upgrade event wasn't fired.
423             event.release();
424         }
425         return true;
426     }
427 
428     /**
429      * Creates the 101 Switching Protocols response message.
430      */
431     private FullHttpResponse createUpgradeResponse(CharSequence upgradeProtocol) {
432         DefaultFullHttpResponse res = new DefaultFullHttpResponse(
433                 HTTP_1_1, SWITCHING_PROTOCOLS, Unpooled.EMPTY_BUFFER, headersFactory, trailersFactory);
434         res.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE);
435         res.headers().add(HttpHeaderNames.UPGRADE, upgradeProtocol);
436         return res;
437     }
438 
439     /**
440      * Splits a comma-separated header value. The returned set is case-insensitive and contains each
441      * part with whitespace removed.
442      */
443     private static List<CharSequence> splitHeader(CharSequence header) {
444         final StringBuilder builder = new StringBuilder(header.length());
445         final List<CharSequence> protocols = new ArrayList<CharSequence>(4);
446         for (int i = 0; i < header.length(); ++i) {
447             char c = header.charAt(i);
448             if (Character.isWhitespace(c)) {
449                 // Don't include any whitespace.
450                 continue;
451             }
452             if (c == ',') {
453                 // Add the string and reset the builder for the next protocol.
454                 protocols.add(builder.toString());
455                 builder.setLength(0);
456             } else {
457                 builder.append(c);
458             }
459         }
460 
461         // Add the last protocol
462         if (builder.length() > 0) {
463             protocols.add(builder.toString());
464         }
465 
466         return protocols;
467     }
468 }