1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty5.handler.codec.http;
16
17 import io.netty5.channel.internal.DelegatingChannelHandlerContext;
18 import io.netty5.util.Send;
19 import io.netty5.channel.ChannelHandlerContext;
20 import io.netty5.util.AsciiString;
21 import io.netty5.util.concurrent.Future;
22
23 import java.util.Collection;
24 import java.util.LinkedHashSet;
25 import java.util.Set;
26
27 import static io.netty5.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
28 import static java.util.Objects.requireNonNull;
29
30
31
32
33
34
35
36
37 public class HttpClientUpgradeHandler<C extends HttpContent<C>> extends HttpObjectAggregator<C> {
38
39
40
41
42 public enum UpgradeEvent {
43
44
45
46 UPGRADE_ISSUED,
47
48
49
50
51 UPGRADE_SUCCESSFUL,
52
53
54
55
56
57 UPGRADE_REJECTED
58 }
59
60
61
62
63 public interface SourceCodec {
64
65
66
67
68
69 void prepareUpgradeFrom(ChannelHandlerContext ctx);
70
71
72
73
74 void upgradeFrom(ChannelHandlerContext ctx);
75 }
76
77
78
79
80 public interface UpgradeCodec {
81
82
83
84 CharSequence protocol();
85
86
87
88
89
90 Collection<CharSequence> setUpgradeHeaders(ChannelHandlerContext ctx, HttpRequest upgradeRequest);
91
92
93
94
95
96
97
98
99
100 void upgradeTo(ChannelHandlerContext ctx, Send<FullHttpResponse> upgradeResponse) throws Exception;
101 }
102
103 private final SourceCodec sourceCodec;
104 private final UpgradeCodec upgradeCodec;
105 private boolean upgradeRequested;
106
107
108
109
110
111
112
113
114 public HttpClientUpgradeHandler(SourceCodec sourceCodec, UpgradeCodec upgradeCodec,
115 int maxContentLength) {
116 super(maxContentLength);
117 requireNonNull(sourceCodec, "sourceCodec");
118 requireNonNull(upgradeCodec, "upgradeCodec");
119 this.sourceCodec = sourceCodec;
120 this.upgradeCodec = upgradeCodec;
121 }
122
123 @Override
124 public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
125 if (!(msg instanceof HttpRequest)) {
126 return ctx.write(msg);
127 }
128
129 if (upgradeRequested) {
130 return ctx.newFailedFuture(new IllegalStateException(
131 "Attempting to write HTTP request with upgrade in progress"));
132 }
133
134 upgradeRequested = true;
135 setUpgradeRequestHeaders(ctx, (HttpRequest) msg);
136
137
138 Future<Void> f = ctx.write(msg);
139
140
141 ctx.fireChannelInboundEvent(UpgradeEvent.UPGRADE_ISSUED);
142
143 return f;
144 }
145
146 @Override
147 protected void decode(final ChannelHandlerContext ctx, HttpObject msg)
148 throws Exception {
149 FullHttpResponse response = null;
150 try {
151 if (!upgradeRequested) {
152 throw new IllegalStateException("Read HTTP response without requesting protocol switch");
153 }
154
155 if (msg instanceof HttpResponse) {
156 HttpResponse rep = (HttpResponse) msg;
157 if (!SWITCHING_PROTOCOLS.equals(rep.status())) {
158
159
160
161
162 ctx.fireChannelInboundEvent(UpgradeEvent.UPGRADE_REJECTED);
163 ctx.fireChannelRead(msg);
164 removeThisHandler(ctx);
165 return;
166 }
167 }
168
169 if (msg instanceof FullHttpResponse) {
170 response = (FullHttpResponse) msg;
171
172 tryUpgrade(ctx, response);
173 } else {
174
175 super.decode(new DelegatingChannelHandlerContext(ctx) {
176 @Override
177 public ChannelHandlerContext fireChannelRead(Object msg) {
178 FullHttpResponse response = (FullHttpResponse) msg;
179 tryUpgrade(ctx, response);
180 return this;
181 }
182 }, msg);
183 }
184
185 } catch (Throwable t) {
186 if (response != null && response.isAccessible()) {
187 response.close();
188 }
189 ctx.fireChannelExceptionCaught(t);
190 removeThisHandler(ctx);
191 }
192 }
193
194 private void tryUpgrade(ChannelHandlerContext ctx, FullHttpResponse response) {
195 try (response) {
196 CharSequence upgradeHeader = response.headers().get(HttpHeaderNames.UPGRADE);
197 if (upgradeHeader != null && !AsciiString.contentEqualsIgnoreCase(upgradeCodec.protocol(), upgradeHeader)) {
198 throw new IllegalStateException(
199 "Switching Protocols response with unexpected UPGRADE protocol: " + upgradeHeader);
200 }
201
202
203 sourceCodec.prepareUpgradeFrom(ctx);
204 upgradeCodec.upgradeTo(ctx, response.send());
205
206
207 ctx.fireChannelInboundEvent(UpgradeEvent.UPGRADE_SUCCESSFUL);
208
209
210
211 sourceCodec.upgradeFrom(ctx);
212 removeThisHandler(ctx);
213 } catch (Throwable t) {
214 ctx.fireChannelExceptionCaught(t);
215 removeThisHandler(ctx);
216 }
217 }
218
219 private static void removeThisHandler(ChannelHandlerContext ctx) {
220 ctx.pipeline().remove(ctx.name());
221 }
222
223
224
225
226 private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) {
227
228 request.headers().set(HttpHeaderNames.UPGRADE, upgradeCodec.protocol());
229
230
231 Set<CharSequence> connectionParts = new LinkedHashSet<>(2);
232 connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request));
233
234
235 StringBuilder builder = new StringBuilder();
236 for (CharSequence part : connectionParts) {
237 builder.append(part);
238 builder.append(',');
239 }
240 builder.append(HttpHeaderValues.UPGRADE);
241 request.headers().add(HttpHeaderNames.CONNECTION, builder.toString());
242 }
243 }