1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.spdy;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.channel.ChannelDownstreamHandler;
20 import org.jboss.netty.channel.ChannelEvent;
21 import org.jboss.netty.channel.ChannelFuture;
22 import org.jboss.netty.channel.ChannelFutureListener;
23 import org.jboss.netty.channel.ChannelHandlerContext;
24 import org.jboss.netty.channel.Channels;
25 import org.jboss.netty.channel.DownstreamMessageEvent;
26 import org.jboss.netty.channel.MessageEvent;
27 import org.jboss.netty.handler.codec.http.HttpChunk;
28 import org.jboss.netty.handler.codec.http.HttpChunkTrailer;
29 import org.jboss.netty.handler.codec.http.HttpHeaders;
30 import org.jboss.netty.handler.codec.http.HttpMessage;
31 import org.jboss.netty.handler.codec.http.HttpRequest;
32 import org.jboss.netty.handler.codec.http.HttpResponse;
33
34 import java.net.SocketAddress;
35 import java.util.List;
36 import java.util.Map;
37
38 import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 public class SpdyHttpEncoder implements ChannelDownstreamHandler {
130
131 private final int spdyVersion;
132 private volatile int currentStreamId;
133
134
135
136
137
138
139 public SpdyHttpEncoder(SpdyVersion spdyVersion) {
140 if (spdyVersion == null) {
141 throw new NullPointerException("spdyVersion");
142 }
143 this.spdyVersion = spdyVersion.getVersion();
144 }
145
146 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
147 throws Exception {
148 if (!(evt instanceof MessageEvent)) {
149 ctx.sendDownstream(evt);
150 return;
151 }
152
153 MessageEvent e = (MessageEvent) evt;
154 Object msg = e.getMessage();
155
156 if (msg instanceof HttpRequest) {
157
158 HttpRequest httpRequest = (HttpRequest) msg;
159 SpdySynStreamFrame spdySynStreamFrame = createSynStreamFrame(httpRequest);
160 currentStreamId = spdySynStreamFrame.getStreamId();
161 ChannelFuture future = getMessageFuture(ctx, e, currentStreamId, httpRequest);
162 Channels.write(ctx, future, spdySynStreamFrame, e.getRemoteAddress());
163
164 } else if (msg instanceof HttpResponse) {
165
166 HttpResponse httpResponse = (HttpResponse) msg;
167 if (httpResponse.headers().contains(SpdyHttpHeaders.Names.ASSOCIATED_TO_STREAM_ID)) {
168 SpdySynStreamFrame spdySynStreamFrame = createSynStreamFrame(httpResponse);
169 currentStreamId = spdySynStreamFrame.getStreamId();
170 ChannelFuture future = getMessageFuture(ctx, e, currentStreamId, httpResponse);
171 Channels.write(ctx, future, spdySynStreamFrame, e.getRemoteAddress());
172 } else {
173 SpdySynReplyFrame spdySynReplyFrame = createSynReplyFrame(httpResponse);
174 currentStreamId = spdySynReplyFrame.getStreamId();
175 ChannelFuture future = getMessageFuture(ctx, e, currentStreamId, httpResponse);
176 Channels.write(ctx, future, spdySynReplyFrame, e.getRemoteAddress());
177 }
178
179 } else if (msg instanceof HttpChunk) {
180
181 HttpChunk chunk = (HttpChunk) msg;
182 writeChunk(ctx, e.getFuture(), currentStreamId, chunk, e.getRemoteAddress());
183 } else {
184
185 ctx.sendDownstream(evt);
186 }
187 }
188
189
190
191
192 protected void writeChunk(
193 ChannelHandlerContext ctx, ChannelFuture future,
194 int streamId, HttpChunk chunk, SocketAddress remoteAddress) {
195
196 if (chunk.isLast()) {
197 if (chunk instanceof HttpChunkTrailer) {
198 HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
199 HttpHeaders trailers = trailer.trailingHeaders();
200 if (trailers.isEmpty()) {
201 SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId);
202 spdyDataFrame.setLast(true);
203 Channels.write(ctx, future, spdyDataFrame, remoteAddress);
204 } else {
205
206 SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(streamId);
207 spdyHeadersFrame.setLast(true);
208 for (Map.Entry<String, String> entry: trailers) {
209 spdyHeadersFrame.headers().add(entry.getKey(), entry.getValue());
210 }
211 Channels.write(ctx, future, spdyHeadersFrame, remoteAddress);
212 }
213 } else {
214 SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId);
215 spdyDataFrame.setLast(true);
216 Channels.write(ctx, future, spdyDataFrame, remoteAddress);
217 }
218 } else {
219 SpdyDataFrame[] spdyDataFrames = createSpdyDataFrames(streamId, chunk.getContent());
220 ChannelFuture dataFuture = getDataFuture(ctx, future, spdyDataFrames, remoteAddress);
221
222
223 dataFuture.setSuccess();
224 }
225 }
226
227 private ChannelFuture getMessageFuture(
228 ChannelHandlerContext ctx, MessageEvent e, int streamId, HttpMessage httpMessage) {
229 if (!httpMessage.getContent().readable()) {
230 return e.getFuture();
231 }
232
233
234 SpdyDataFrame[] spdyDataFrames = createSpdyDataFrames(streamId, httpMessage.getContent());
235 if (spdyDataFrames.length > 0) {
236 spdyDataFrames[spdyDataFrames.length - 1].setLast(true);
237 }
238
239 return getDataFuture(ctx, e.getFuture(), spdyDataFrames, e.getRemoteAddress());
240 }
241
242 private static ChannelFuture getDataFuture(
243 ChannelHandlerContext ctx, ChannelFuture future,
244 SpdyDataFrame[] spdyDataFrames, SocketAddress remoteAddress) {
245
246 ChannelFuture dataFuture = future;
247 for (int i = spdyDataFrames.length; --i >= 0;) {
248 future = Channels.future(ctx.getChannel());
249 future.addListener(new SpdyFrameWriter(ctx, new DownstreamMessageEvent(
250 ctx.getChannel(), dataFuture, spdyDataFrames[i], remoteAddress)));
251 dataFuture = future;
252 }
253 return dataFuture;
254 }
255
256 private static class SpdyFrameWriter implements ChannelFutureListener {
257
258 private final ChannelHandlerContext ctx;
259 private final MessageEvent e;
260
261 SpdyFrameWriter(ChannelHandlerContext ctx, MessageEvent e) {
262 this.ctx = ctx;
263 this.e = e;
264 }
265
266 public void operationComplete(ChannelFuture future) throws Exception {
267 if (future.isSuccess()) {
268 ctx.sendDownstream(e);
269 } else if (future.isCancelled()) {
270 e.getFuture().cancel();
271 } else {
272 e.getFuture().setFailure(future.getCause());
273 }
274 }
275 }
276
277 private SpdySynStreamFrame createSynStreamFrame(HttpMessage httpMessage)
278 throws Exception {
279 boolean chunked = httpMessage.isChunked();
280
281
282 int streamId = SpdyHttpHeaders.getStreamId(httpMessage);
283 int associatedToStreamId = SpdyHttpHeaders.getAssociatedToStreamId(httpMessage);
284 byte priority = SpdyHttpHeaders.getPriority(httpMessage);
285 String URL = SpdyHttpHeaders.getUrl(httpMessage);
286 String scheme = SpdyHttpHeaders.getScheme(httpMessage);
287 SpdyHttpHeaders.removeStreamId(httpMessage);
288 SpdyHttpHeaders.removeAssociatedToStreamId(httpMessage);
289 SpdyHttpHeaders.removePriority(httpMessage);
290 SpdyHttpHeaders.removeUrl(httpMessage);
291 SpdyHttpHeaders.removeScheme(httpMessage);
292
293
294
295 httpMessage.headers().remove(HttpHeaders.Names.CONNECTION);
296 httpMessage.headers().remove("Keep-Alive");
297 httpMessage.headers().remove("Proxy-Connection");
298 httpMessage.headers().remove(HttpHeaders.Names.TRANSFER_ENCODING);
299
300 SpdySynStreamFrame spdySynStreamFrame =
301 new DefaultSpdySynStreamFrame(streamId, associatedToStreamId, priority);
302 spdySynStreamFrame.setLast(!chunked && !httpMessage.getContent().readable());
303
304
305 if (httpMessage instanceof HttpRequest) {
306 HttpRequest httpRequest = (HttpRequest) httpMessage;
307 SpdyHeaders.setMethod(spdyVersion, spdySynStreamFrame, httpRequest.getMethod());
308 SpdyHeaders.setUrl(spdyVersion, spdySynStreamFrame, httpRequest.getUri());
309 SpdyHeaders.setVersion(spdyVersion, spdySynStreamFrame, httpMessage.getProtocolVersion());
310 }
311 if (httpMessage instanceof HttpResponse) {
312 HttpResponse httpResponse = (HttpResponse) httpMessage;
313 SpdyHeaders.setStatus(spdyVersion, spdySynStreamFrame, httpResponse.getStatus());
314 SpdyHeaders.setUrl(spdyVersion, spdySynStreamFrame, URL);
315 SpdyHeaders.setVersion(spdyVersion, spdySynStreamFrame, httpMessage.getProtocolVersion());
316 spdySynStreamFrame.setUnidirectional(true);
317 }
318
319
320 String host = HttpHeaders.getHost(httpMessage);
321 httpMessage.headers().remove(HttpHeaders.Names.HOST);
322 SpdyHeaders.setHost(spdySynStreamFrame, host);
323
324
325 if (scheme == null) {
326 scheme = "https";
327 }
328 SpdyHeaders.setScheme(spdyVersion, spdySynStreamFrame, scheme);
329
330
331 for (Map.Entry<String, String> entry: httpMessage.headers()) {
332 spdySynStreamFrame.headers().add(entry.getKey(), entry.getValue());
333 }
334
335 return spdySynStreamFrame;
336 }
337
338 private SpdySynReplyFrame createSynReplyFrame(HttpResponse httpResponse)
339 throws Exception {
340 boolean chunked = httpResponse.isChunked();
341
342
343 int streamId = SpdyHttpHeaders.getStreamId(httpResponse);
344 SpdyHttpHeaders.removeStreamId(httpResponse);
345
346
347
348 httpResponse.headers().remove(HttpHeaders.Names.CONNECTION);
349 httpResponse.headers().remove("Keep-Alive");
350 httpResponse.headers().remove("Proxy-Connection");
351 httpResponse.headers().remove(HttpHeaders.Names.TRANSFER_ENCODING);
352
353 SpdySynReplyFrame spdySynReplyFrame = new DefaultSpdySynReplyFrame(streamId);
354 spdySynReplyFrame.setLast(!chunked && !httpResponse.getContent().readable());
355
356
357 SpdyHeaders.setStatus(spdyVersion, spdySynReplyFrame, httpResponse.getStatus());
358 SpdyHeaders.setVersion(spdyVersion, spdySynReplyFrame, httpResponse.getProtocolVersion());
359
360
361 for (Map.Entry<String, String> entry: httpResponse.headers()) {
362 spdySynReplyFrame.headers().add(entry.getKey(), entry.getValue());
363 }
364
365 return spdySynReplyFrame;
366 }
367
368 private SpdyDataFrame[] createSpdyDataFrames(int streamId, ChannelBuffer content) {
369 int readableBytes = content.readableBytes();
370 int count = readableBytes / SPDY_MAX_LENGTH;
371 if (readableBytes % SPDY_MAX_LENGTH > 0) {
372 count++;
373 }
374 SpdyDataFrame[] spdyDataFrames = new SpdyDataFrame[count];
375 for (int i = 0; i < count; i ++) {
376 SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId);
377 int dataSize = Math.min(content.readableBytes(), SPDY_MAX_LENGTH);
378 spdyDataFrame.setData(content.readSlice(dataSize));
379 spdyDataFrames[i] = spdyDataFrame;
380 }
381 return spdyDataFrames;
382 }
383 }