1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.channel.nio.AbstractNioByteChannel;
20 import io.netty.channel.socket.SocketChannelConfig;
21
22 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
23 import java.util.IdentityHashMap;
24 import java.util.Map;
25 import java.util.Map.Entry;
26
27 import static io.netty.channel.ChannelOption.ALLOCATOR;
28 import static io.netty.channel.ChannelOption.AUTO_CLOSE;
29 import static io.netty.channel.ChannelOption.AUTO_READ;
30 import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
31 import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ;
32 import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
33 import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
34 import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
35 import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
36 import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
37 import static io.netty.channel.ChannelOption.WRITE_SPIN_COUNT;
38
39
40
41
42 public class DefaultChannelConfig implements ChannelConfig {
43
44 private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = AdaptiveRecvByteBufAllocator.DEFAULT;
45 private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
46
47 private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
48
49 private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER =
50 AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class, "autoRead");
51
52 protected final Channel channel;
53
54 private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
55 private volatile RecvByteBufAllocator rcvBufAllocator = DEFAULT_RCVBUF_ALLOCATOR;
56 private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
57
58 private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
59 private volatile int maxMessagesPerRead;
60 private volatile int writeSpinCount = 16;
61 @SuppressWarnings("FieldMayBeFinal")
62 private volatile int autoRead = 1;
63 private volatile boolean autoClose = true;
64 private volatile int writeBufferHighWaterMark = 64 * 1024;
65 private volatile int writeBufferLowWaterMark = 32 * 1024;
66 private volatile boolean pinEventExecutor = true;
67
68 public DefaultChannelConfig(Channel channel) {
69 if (channel == null) {
70 throw new NullPointerException("channel");
71 }
72 this.channel = channel;
73
74 if (channel instanceof ServerChannel || channel instanceof AbstractNioByteChannel) {
75
76
77
78
79 maxMessagesPerRead = 16;
80 } else {
81 maxMessagesPerRead = 1;
82 }
83 }
84
85 @Override
86 @SuppressWarnings("deprecation")
87 public Map<ChannelOption<?>, Object> getOptions() {
88 return getOptions(
89 null,
90 CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
91 ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK,
92 WRITE_BUFFER_LOW_WATER_MARK, MESSAGE_SIZE_ESTIMATOR, SINGLE_EVENTEXECUTOR_PER_GROUP);
93 }
94
95 protected Map<ChannelOption<?>, Object> getOptions(
96 Map<ChannelOption<?>, Object> result, ChannelOption<?>... options) {
97 if (result == null) {
98 result = new IdentityHashMap<ChannelOption<?>, Object>();
99 }
100 for (ChannelOption<?> o: options) {
101 result.put(o, getOption(o));
102 }
103 return result;
104 }
105
106 @SuppressWarnings("unchecked")
107 @Override
108 public boolean setOptions(Map<ChannelOption<?>, ?> options) {
109 if (options == null) {
110 throw new NullPointerException("options");
111 }
112
113 boolean setAllOptions = true;
114 for (Entry<ChannelOption<?>, ?> e: options.entrySet()) {
115 if (!setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
116 setAllOptions = false;
117 }
118 }
119
120 return setAllOptions;
121 }
122
123 @Override
124 @SuppressWarnings({ "unchecked", "deprecation" })
125 public <T> T getOption(ChannelOption<T> option) {
126 if (option == null) {
127 throw new NullPointerException("option");
128 }
129
130 if (option == CONNECT_TIMEOUT_MILLIS) {
131 return (T) Integer.valueOf(getConnectTimeoutMillis());
132 }
133 if (option == MAX_MESSAGES_PER_READ) {
134 return (T) Integer.valueOf(getMaxMessagesPerRead());
135 }
136 if (option == WRITE_SPIN_COUNT) {
137 return (T) Integer.valueOf(getWriteSpinCount());
138 }
139 if (option == ALLOCATOR) {
140 return (T) getAllocator();
141 }
142 if (option == RCVBUF_ALLOCATOR) {
143 return (T) getRecvByteBufAllocator();
144 }
145 if (option == AUTO_READ) {
146 return (T) Boolean.valueOf(isAutoRead());
147 }
148 if (option == AUTO_CLOSE) {
149 return (T) Boolean.valueOf(isAutoClose());
150 }
151 if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
152 return (T) Integer.valueOf(getWriteBufferHighWaterMark());
153 }
154 if (option == WRITE_BUFFER_LOW_WATER_MARK) {
155 return (T) Integer.valueOf(getWriteBufferLowWaterMark());
156 }
157 if (option == MESSAGE_SIZE_ESTIMATOR) {
158 return (T) getMessageSizeEstimator();
159 }
160 if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
161 return (T) Boolean.valueOf(getPinEventExecutorPerGroup());
162 }
163 return null;
164 }
165
166 @Override
167 @SuppressWarnings("deprecation")
168 public <T> boolean setOption(ChannelOption<T> option, T value) {
169 validate(option, value);
170
171 if (option == CONNECT_TIMEOUT_MILLIS) {
172 setConnectTimeoutMillis((Integer) value);
173 } else if (option == MAX_MESSAGES_PER_READ) {
174 setMaxMessagesPerRead((Integer) value);
175 } else if (option == WRITE_SPIN_COUNT) {
176 setWriteSpinCount((Integer) value);
177 } else if (option == ALLOCATOR) {
178 setAllocator((ByteBufAllocator) value);
179 } else if (option == RCVBUF_ALLOCATOR) {
180 setRecvByteBufAllocator((RecvByteBufAllocator) value);
181 } else if (option == AUTO_READ) {
182 setAutoRead((Boolean) value);
183 } else if (option == AUTO_CLOSE) {
184 setAutoClose((Boolean) value);
185 } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
186 setWriteBufferHighWaterMark((Integer) value);
187 } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
188 setWriteBufferLowWaterMark((Integer) value);
189 } else if (option == MESSAGE_SIZE_ESTIMATOR) {
190 setMessageSizeEstimator((MessageSizeEstimator) value);
191 } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
192 setPinEventExecutorPerGroup((Boolean) value);
193 } else {
194 return false;
195 }
196
197 return true;
198 }
199
200 protected <T> void validate(ChannelOption<T> option, T value) {
201 if (option == null) {
202 throw new NullPointerException("option");
203 }
204 option.validate(value);
205 }
206
207 @Override
208 public int getConnectTimeoutMillis() {
209 return connectTimeoutMillis;
210 }
211
212 @Override
213 public ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
214 if (connectTimeoutMillis < 0) {
215 throw new IllegalArgumentException(String.format(
216 "connectTimeoutMillis: %d (expected: >= 0)", connectTimeoutMillis));
217 }
218 this.connectTimeoutMillis = connectTimeoutMillis;
219 return this;
220 }
221
222 @Override
223 public int getMaxMessagesPerRead() {
224 return maxMessagesPerRead;
225 }
226
227 @Override
228 public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
229 if (maxMessagesPerRead <= 0) {
230 throw new IllegalArgumentException("maxMessagesPerRead: " + maxMessagesPerRead + " (expected: > 0)");
231 }
232 this.maxMessagesPerRead = maxMessagesPerRead;
233 return this;
234 }
235
236 @Override
237 public int getWriteSpinCount() {
238 return writeSpinCount;
239 }
240
241 @Override
242 public ChannelConfig setWriteSpinCount(int writeSpinCount) {
243 if (writeSpinCount <= 0) {
244 throw new IllegalArgumentException(
245 "writeSpinCount must be a positive integer.");
246 }
247 this.writeSpinCount = writeSpinCount;
248 return this;
249 }
250
251 @Override
252 public ByteBufAllocator getAllocator() {
253 return allocator;
254 }
255
256 @Override
257 public ChannelConfig setAllocator(ByteBufAllocator allocator) {
258 if (allocator == null) {
259 throw new NullPointerException("allocator");
260 }
261 this.allocator = allocator;
262 return this;
263 }
264
265 @Override
266 public RecvByteBufAllocator getRecvByteBufAllocator() {
267 return rcvBufAllocator;
268 }
269
270 @Override
271 public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
272 if (allocator == null) {
273 throw new NullPointerException("allocator");
274 }
275 rcvBufAllocator = allocator;
276 return this;
277 }
278
279 @Override
280 public boolean isAutoRead() {
281 return autoRead == 1;
282 }
283
284 @Override
285 public ChannelConfig setAutoRead(boolean autoRead) {
286 boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
287 if (autoRead && !oldAutoRead) {
288 channel.read();
289 } else if (!autoRead && oldAutoRead) {
290 autoReadCleared();
291 }
292 return this;
293 }
294
295
296
297
298
299 protected void autoReadCleared() { }
300
301 @Override
302 public boolean isAutoClose() {
303 return autoClose;
304 }
305
306 @Override
307 public ChannelConfig setAutoClose(boolean autoClose) {
308 this.autoClose = autoClose;
309 return this;
310 }
311
312 @Override
313 public int getWriteBufferHighWaterMark() {
314 return writeBufferHighWaterMark;
315 }
316
317 @Override
318 public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
319 if (writeBufferHighWaterMark < getWriteBufferLowWaterMark()) {
320 throw new IllegalArgumentException(
321 "writeBufferHighWaterMark cannot be less than " +
322 "writeBufferLowWaterMark (" + getWriteBufferLowWaterMark() + "): " +
323 writeBufferHighWaterMark);
324 }
325 if (writeBufferHighWaterMark < 0) {
326 throw new IllegalArgumentException(
327 "writeBufferHighWaterMark must be >= 0");
328 }
329 this.writeBufferHighWaterMark = writeBufferHighWaterMark;
330 return this;
331 }
332
333 @Override
334 public int getWriteBufferLowWaterMark() {
335 return writeBufferLowWaterMark;
336 }
337
338 @Override
339 public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
340 if (writeBufferLowWaterMark > getWriteBufferHighWaterMark()) {
341 throw new IllegalArgumentException(
342 "writeBufferLowWaterMark cannot be greater than " +
343 "writeBufferHighWaterMark (" + getWriteBufferHighWaterMark() + "): " +
344 writeBufferLowWaterMark);
345 }
346 if (writeBufferLowWaterMark < 0) {
347 throw new IllegalArgumentException(
348 "writeBufferLowWaterMark must be >= 0");
349 }
350 this.writeBufferLowWaterMark = writeBufferLowWaterMark;
351 return this;
352 }
353
354 @Override
355 public MessageSizeEstimator getMessageSizeEstimator() {
356 return msgSizeEstimator;
357 }
358
359 @Override
360 public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
361 if (estimator == null) {
362 throw new NullPointerException("estimator");
363 }
364 msgSizeEstimator = estimator;
365 return this;
366 }
367
368 private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) {
369 this.pinEventExecutor = pinEventExecutor;
370 return this;
371 }
372
373 private boolean getPinEventExecutorPerGroup() {
374 return pinEventExecutor;
375 }
376
377 }