1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http2;
17
18 import io.netty5.channel.Channel;
19 import io.netty5.channel.ChannelHandler;
20 import io.netty5.channel.ChannelHandlerContext;
21 import io.netty5.channel.ChannelOption;
22 import io.netty5.channel.ChannelPipeline;
23 import io.netty5.util.AttributeKey;
24 import io.netty5.util.concurrent.EventExecutor;
25 import io.netty5.util.concurrent.Future;
26 import io.netty5.util.concurrent.Promise;
27 import io.netty5.util.internal.StringUtil;
28 import io.netty5.util.internal.UnstableApi;
29 import io.netty5.util.internal.logging.InternalLogger;
30 import io.netty5.util.internal.logging.InternalLoggerFactory;
31
32 import java.nio.channels.ClosedChannelException;
33 import java.util.LinkedHashMap;
34 import java.util.Map;
35 import java.util.concurrent.ConcurrentHashMap;
36
37 import static java.util.Objects.requireNonNull;
38
39 @UnstableApi
40 public final class Http2StreamChannelBootstrap {
41 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2StreamChannelBootstrap.class);
42 @SuppressWarnings("unchecked")
43 private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];
44 @SuppressWarnings("unchecked")
45 private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];
46
47
48
49 private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<>();
50 private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<>();
51 private final Channel channel;
52 private volatile ChannelHandler handler;
53
54
55 private volatile ChannelHandlerContext multiplexCtx;
56
57 public Http2StreamChannelBootstrap(Channel channel) {
58 this.channel = requireNonNull(channel, "channel");
59 }
60
61
62
63
64
65 public <T> Http2StreamChannelBootstrap option(ChannelOption<T> option, T value) {
66 requireNonNull(option, "option");
67
68 synchronized (options) {
69 if (value == null) {
70 options.remove(option);
71 } else {
72 options.put(option, value);
73 }
74 }
75 return this;
76 }
77
78
79
80
81
82 public <T> Http2StreamChannelBootstrap attr(AttributeKey<T> key, T value) {
83 requireNonNull(key, "key");
84 if (value == null) {
85 attrs.remove(key);
86 } else {
87 attrs.put(key, value);
88 }
89 return this;
90 }
91
92
93
94
95 public Http2StreamChannelBootstrap handler(ChannelHandler handler) {
96 this.handler = requireNonNull(handler, "handler");
97 return this;
98 }
99
100
101
102
103
104 public Future<Http2StreamChannel> open() {
105 return open(channel.executor().newPromise());
106 }
107
108
109
110
111
112 public Future<Http2StreamChannel> open(final Promise<Http2StreamChannel> promise) {
113 try {
114 ChannelHandlerContext ctx = findCtx();
115 EventExecutor executor = ctx.executor();
116 if (executor.inEventLoop()) {
117 open0(ctx, promise);
118 } else {
119 final ChannelHandlerContext finalCtx = ctx;
120 executor.execute(() -> {
121 if (channel.isActive()) {
122 open0(finalCtx, promise);
123 } else {
124 promise.setFailure(new ClosedChannelException());
125 }
126 });
127 }
128 } catch (Throwable cause) {
129 promise.setFailure(cause);
130 }
131 return promise.asFuture();
132 }
133
134 @SuppressWarnings("deprecation")
135 private ChannelHandlerContext findCtx() throws ClosedChannelException {
136
137 ChannelHandlerContext ctx = multiplexCtx;
138 if (ctx != null && !ctx.isRemoved()) {
139 return ctx;
140 }
141 ChannelPipeline pipeline = channel.pipeline();
142 ctx = pipeline.context(Http2MultiplexHandler.class);
143 if (ctx == null) {
144 if (channel.isActive()) {
145 throw new IllegalStateException(StringUtil.simpleClassName(Http2MultiplexHandler.class)
146 + " must be in the ChannelPipeline of Channel " + channel);
147 } else {
148 throw new ClosedChannelException();
149 }
150 }
151 multiplexCtx = ctx;
152 return ctx;
153 }
154
155
156
157
158 @Deprecated
159 public void open0(ChannelHandlerContext ctx, final Promise<Http2StreamChannel> promise) {
160 assert ctx.executor().inEventLoop();
161 if (!promise.setUncancellable()) {
162 return;
163 }
164 final Http2StreamChannel streamChannel;
165 try {
166 streamChannel = ((Http2MultiplexHandler) ctx.handler()).newOutboundStream();
167 } catch (Exception e) {
168 promise.setFailure(e);
169 return;
170 }
171 try {
172 init(streamChannel);
173 } catch (Exception e) {
174 streamChannel.close();
175 promise.setFailure(e);
176 return;
177 }
178
179 Future<Void> future = streamChannel.register();
180 future.addListener(future1 -> {
181 if (future1.isSuccess()) {
182 promise.setSuccess(streamChannel);
183 } else if (future1.isCancelled()) {
184 promise.cancel();
185 } else {
186 streamChannel.close();
187
188 promise.setFailure(future1.cause());
189 }
190 });
191 }
192
193 private void init(Channel channel) {
194 ChannelPipeline p = channel.pipeline();
195 ChannelHandler handler = this.handler;
196 if (handler != null) {
197 p.addLast(handler);
198 }
199 final Map.Entry<ChannelOption<?>, Object> [] optionArray;
200 synchronized (options) {
201 optionArray = options.entrySet().toArray(EMPTY_OPTION_ARRAY);
202 }
203
204 setChannelOptions(channel, optionArray);
205 setAttributes(channel, attrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
206 }
207
208 private static void setChannelOptions(
209 Channel channel, Map.Entry<ChannelOption<?>, Object>[] options) {
210 for (Map.Entry<ChannelOption<?>, Object> e: options) {
211 setChannelOption(channel, e.getKey(), e.getValue());
212 }
213 }
214
215 private static void setChannelOption(
216 Channel channel, ChannelOption<?> option, Object value) {
217 try {
218 @SuppressWarnings("unchecked")
219 ChannelOption<Object> opt = (ChannelOption<Object>) option;
220 if (!channel.isOptionSupported(option)) {
221 logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
222 } else {
223 channel.setOption(opt, value);
224 }
225 } catch (Throwable t) {
226 logger.warn(
227 "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
228 }
229 }
230
231 private static void setAttributes(
232 Channel channel, Map.Entry<AttributeKey<?>, Object>[] options) {
233 for (Map.Entry<AttributeKey<?>, Object> e: options) {
234 @SuppressWarnings("unchecked")
235 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
236 channel.attr(key).set(e.getValue());
237 }
238 }
239 }