1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.example.http2.helloworld.client;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.channel.SimpleChannelInboundHandler;
22 import io.netty.handler.codec.http.FullHttpResponse;
23 import io.netty.handler.codec.http2.HttpConversionUtil;
24 import io.netty.util.CharsetUtil;
25 import io.netty.util.internal.PlatformDependent;
26
27 import java.util.AbstractMap.SimpleEntry;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.concurrent.TimeUnit;
32
33
34
35
36 public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
37
38 private final Map<Integer, Entry<ChannelFuture, ChannelPromise>> streamidPromiseMap;
39
40 public HttpResponseHandler() {
41
42
43 streamidPromiseMap = PlatformDependent.newConcurrentHashMap();
44 }
45
46
47
48
49
50
51
52
53
54
55 public Entry<ChannelFuture, ChannelPromise> put(int streamId, ChannelFuture writeFuture, ChannelPromise promise) {
56 return streamidPromiseMap.put(streamId, new SimpleEntry<ChannelFuture, ChannelPromise>(writeFuture, promise));
57 }
58
59
60
61
62
63
64
65
66 public void awaitResponses(long timeout, TimeUnit unit) {
67 Iterator<Entry<Integer, Entry<ChannelFuture, ChannelPromise>>> itr = streamidPromiseMap.entrySet().iterator();
68 while (itr.hasNext()) {
69 Entry<Integer, Entry<ChannelFuture, ChannelPromise>> entry = itr.next();
70 ChannelFuture writeFuture = entry.getValue().getKey();
71 if (!writeFuture.awaitUninterruptibly(timeout, unit)) {
72 throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey());
73 }
74 if (!writeFuture.isSuccess()) {
75 throw new RuntimeException(writeFuture.cause());
76 }
77 ChannelPromise promise = entry.getValue().getValue();
78 if (!promise.awaitUninterruptibly(timeout, unit)) {
79 throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey());
80 }
81 if (!promise.isSuccess()) {
82 throw new RuntimeException(promise.cause());
83 }
84 System.out.println("---Stream id: " + entry.getKey() + " received---");
85 itr.remove();
86 }
87 }
88
89 @Override
90 protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
91 Integer streamId = msg.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
92 if (streamId == null) {
93 System.err.println("HttpResponseHandler unexpected message received: " + msg);
94 return;
95 }
96
97 Entry<ChannelFuture, ChannelPromise> entry = streamidPromiseMap.get(streamId);
98 if (entry == null) {
99 System.err.println("Message received for unknown stream id " + streamId);
100 } else {
101
102 ByteBuf content = msg.content();
103 if (content.isReadable()) {
104 int contentLength = content.readableBytes();
105 byte[] arr = new byte[contentLength];
106 content.readBytes(arr);
107 System.out.println(new String(arr, 0, contentLength, CharsetUtil.UTF_8));
108 }
109
110 entry.getValue().setSuccess();
111 }
112 }
113 }