View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http;
16  
17  import io.netty.channel.ChannelHandlerContext;
18  import io.netty.channel.ChannelOutboundHandler;
19  import io.netty.channel.ChannelPromise;
20  import io.netty.util.AsciiString;
21  
22  import java.net.SocketAddress;
23  import java.util.Collection;
24  import java.util.LinkedHashSet;
25  import java.util.List;
26  import java.util.Set;
27  
28  import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
29  import static io.netty.util.ReferenceCountUtil.release;
30  
31  /**
32   * Client-side handler for handling an HTTP upgrade handshake to another protocol. When the first
33   * HTTP request is sent, this handler will add all appropriate headers to perform an upgrade to the
34   * new protocol. If the upgrade fails (i.e. response is not 101 Switching Protocols), this handler
35   * simply removes itself from the pipeline. If the upgrade is successful, upgrades the pipeline to
36   * the new protocol.
37   */
38  public class HttpClientUpgradeHandler extends HttpObjectAggregator implements ChannelOutboundHandler {
39  
40      /**
41       * User events that are fired to notify about upgrade status.
42       */
43      public enum UpgradeEvent {
44          /**
45           * The Upgrade request was sent to the server.
46           */
47          UPGRADE_ISSUED,
48  
49          /**
50           * The Upgrade to the new protocol was successful.
51           */
52          UPGRADE_SUCCESSFUL,
53  
54          /**
55           * The Upgrade was unsuccessful due to the server not issuing
56           * with a 101 Switching Protocols response.
57           */
58          UPGRADE_REJECTED
59      }
60  
61      /**
62       * The source codec that is used in the pipeline initially.
63       */
64      public interface SourceCodec {
65  
66          /**
67           * Removes or disables the encoder of this codec so that the {@link UpgradeCodec} can send an initial greeting
68           * (if any).
69           */
70          void prepareUpgradeFrom(ChannelHandlerContext ctx);
71  
72          /**
73           * Removes this codec (i.e. all associated handlers) from the pipeline.
74           */
75          void upgradeFrom(ChannelHandlerContext ctx);
76      }
77  
78      /**
79       * A codec that the source can be upgraded to.
80       */
81      public interface UpgradeCodec {
82          /**
83           * Returns the name of the protocol supported by this codec, as indicated by the {@code 'UPGRADE'} header.
84           */
85          CharSequence protocol();
86  
87          /**
88           * Sets any protocol-specific headers required to the upgrade request. Returns the names of
89           * all headers that were added. These headers will be used to populate the CONNECTION header.
90           */
91          Collection<CharSequence> setUpgradeHeaders(ChannelHandlerContext ctx, HttpRequest upgradeRequest);
92  
93          /**
94           * Performs an HTTP protocol upgrade from the source codec. This method is responsible for
95           * adding all handlers required for the new protocol.
96           *
97           * @param ctx the context for the current handler.
98           * @param upgradeResponse the 101 Switching Protocols response that indicates that the server
99           *            has switched to this protocol.
100          */
101         void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception;
102     }
103 
104     private final SourceCodec sourceCodec;
105     private final UpgradeCodec upgradeCodec;
106     private boolean upgradeRequested;
107 
108     /**
109      * Constructs the client upgrade handler.
110      *
111      * @param sourceCodec the codec that is being used initially.
112      * @param upgradeCodec the codec that the client would like to upgrade to.
113      * @param maxContentLength the maximum length of the aggregated content.
114      */
115     public HttpClientUpgradeHandler(SourceCodec sourceCodec, UpgradeCodec upgradeCodec,
116                                     int maxContentLength) {
117         super(maxContentLength);
118         if (sourceCodec == null) {
119             throw new NullPointerException("sourceCodec");
120         }
121         if (upgradeCodec == null) {
122             throw new NullPointerException("upgradeCodec");
123         }
124         this.sourceCodec = sourceCodec;
125         this.upgradeCodec = upgradeCodec;
126     }
127 
128     @Override
129     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
130         ctx.bind(localAddress, promise);
131     }
132 
133     @Override
134     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
135                         ChannelPromise promise) throws Exception {
136         ctx.connect(remoteAddress, localAddress, promise);
137     }
138 
139     @Override
140     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
141         ctx.disconnect(promise);
142     }
143 
144     @Override
145     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
146         ctx.close(promise);
147     }
148 
149     @Override
150     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
151         ctx.deregister(promise);
152     }
153 
154     @Override
155     public void read(ChannelHandlerContext ctx) throws Exception {
156         ctx.read();
157     }
158 
159     @Override
160     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
161             throws Exception {
162         if (!(msg instanceof HttpRequest)) {
163             ctx.write(msg, promise);
164             return;
165         }
166 
167         if (upgradeRequested) {
168             promise.setFailure(new IllegalStateException(
169                     "Attempting to write HTTP request with upgrade in progress"));
170             return;
171         }
172 
173         upgradeRequested = true;
174         setUpgradeRequestHeaders(ctx, (HttpRequest) msg);
175 
176         // Continue writing the request.
177         ctx.write(msg, promise);
178 
179         // Notify that the upgrade request was issued.
180         ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_ISSUED);
181         // Now we wait for the next HTTP response to see if we switch protocols.
182     }
183 
184     @Override
185     public void flush(ChannelHandlerContext ctx) throws Exception {
186         ctx.flush();
187     }
188 
189     @Override
190     protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
191             throws Exception {
192         FullHttpResponse response = null;
193         try {
194             if (!upgradeRequested) {
195                 throw new IllegalStateException("Read HTTP response without requesting protocol switch");
196             }
197 
198             if (msg instanceof HttpResponse) {
199                 HttpResponse rep = (HttpResponse) msg;
200                 if (!SWITCHING_PROTOCOLS.equals(rep.status())) {
201                     // The server does not support the requested protocol, just remove this handler
202                     // and continue processing HTTP.
203                     // NOTE: not releasing the response since we're letting it propagate to the
204                     // next handler.
205                     ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_REJECTED);
206                     removeThisHandler(ctx);
207                     ctx.fireChannelRead(msg);
208                     return;
209                 }
210             }
211 
212             if (msg instanceof FullHttpResponse) {
213                 response = (FullHttpResponse) msg;
214                 // Need to retain since the base class will release after returning from this method.
215                 response.retain();
216                 out.add(response);
217             } else {
218                 // Call the base class to handle the aggregation of the full request.
219                 super.decode(ctx, msg, out);
220                 if (out.isEmpty()) {
221                     // The full request hasn't been created yet, still awaiting more data.
222                     return;
223                 }
224 
225                 assert out.size() == 1;
226                 response = (FullHttpResponse) out.get(0);
227             }
228 
229             CharSequence upgradeHeader = response.headers().get(HttpHeaderNames.UPGRADE);
230             if (upgradeHeader != null && !AsciiString.contentEqualsIgnoreCase(upgradeCodec.protocol(), upgradeHeader)) {
231                 throw new IllegalStateException(
232                         "Switching Protocols response with unexpected UPGRADE protocol: " + upgradeHeader);
233             }
234 
235             // Upgrade to the new protocol.
236             sourceCodec.prepareUpgradeFrom(ctx);
237             upgradeCodec.upgradeTo(ctx, response);
238 
239             // Notify that the upgrade to the new protocol completed successfully.
240             ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_SUCCESSFUL);
241 
242             // We guarantee UPGRADE_SUCCESSFUL event will be arrived at the next handler
243             // before http2 setting frame and http response.
244             sourceCodec.upgradeFrom(ctx);
245 
246             // We switched protocols, so we're done with the upgrade response.
247             // Release it and clear it from the output.
248             response.release();
249             out.clear();
250             removeThisHandler(ctx);
251         } catch (Throwable t) {
252             release(response);
253             ctx.fireExceptionCaught(t);
254             removeThisHandler(ctx);
255         }
256     }
257 
258     private static void removeThisHandler(ChannelHandlerContext ctx) {
259         ctx.pipeline().remove(ctx.name());
260     }
261 
262     /**
263      * Adds all upgrade request headers necessary for an upgrade to the supported protocols.
264      */
265     private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) {
266         // Set the UPGRADE header on the request.
267         request.headers().set(HttpHeaderNames.UPGRADE, upgradeCodec.protocol());
268 
269         // Add all protocol-specific headers to the request.
270         Set<CharSequence> connectionParts = new LinkedHashSet<CharSequence>(2);
271         connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request));
272 
273         // Set the CONNECTION header from the set of all protocol-specific headers that were added.
274         StringBuilder builder = new StringBuilder();
275         for (CharSequence part : connectionParts) {
276             builder.append(part);
277             builder.append(',');
278         }
279         builder.append(HttpHeaderValues.UPGRADE);
280         request.headers().set(HttpHeaderNames.CONNECTION, builder.toString());
281     }
282 }