View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.internal.ObjectUtil;
20  
21  import java.util.IdentityHashMap;
22  import java.util.Map;
23  import java.util.Map.Entry;
24  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
25  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
26  
27  import static io.netty.channel.ChannelOption.ALLOCATOR;
28  import static io.netty.channel.ChannelOption.AUTO_CLOSE;
29  import static io.netty.channel.ChannelOption.AUTO_READ;
30  import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
31  import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ;
32  import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_WRITE;
33  import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
34  import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
35  import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
36  import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
37  import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
38  import static io.netty.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
39  import static io.netty.channel.ChannelOption.WRITE_SPIN_COUNT;
40  import static io.netty.util.internal.ObjectUtil.checkNotNull;
41  import static io.netty.util.internal.ObjectUtil.checkPositive;
42  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
43  
44  /**
45   * The default {@link ChannelConfig} implementation.
46   */
47  public class DefaultChannelConfig implements ChannelConfig {
48      private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
49  
50      private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
51  
52      private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER =
53              AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class, "autoRead");
54      private static final AtomicReferenceFieldUpdater<DefaultChannelConfig, WriteBufferWaterMark> WATERMARK_UPDATER =
55              AtomicReferenceFieldUpdater.newUpdater(
56                      DefaultChannelConfig.class, WriteBufferWaterMark.class, "writeBufferWaterMark");
57  
58      protected final Channel channel;
59  
60      private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
61      private volatile RecvByteBufAllocator rcvBufAllocator;
62      private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
63  
64      private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
65      private volatile int writeSpinCount = 16;
66      private volatile int maxMessagesPerWrite = Integer.MAX_VALUE;
67  
68      @SuppressWarnings("FieldMayBeFinal")
69      private volatile int autoRead = 1;
70      private volatile boolean autoClose = true;
71      private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
72      private volatile boolean pinEventExecutor = true;
73  
74      public DefaultChannelConfig(Channel channel) {
75          this(channel, new AdaptiveRecvByteBufAllocator());
76      }
77  
78      protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
79          setRecvByteBufAllocator(allocator, channel.metadata());
80          this.channel = channel;
81      }
82  
83      @Override
84      @SuppressWarnings("deprecation")
85      public Map<ChannelOption<?>, Object> getOptions() {
86          return getOptions(
87                  null,
88                  CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
89                  ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK,
90                  WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR,
91                  SINGLE_EVENTEXECUTOR_PER_GROUP, MAX_MESSAGES_PER_WRITE);
92      }
93  
94      protected Map<ChannelOption<?>, Object> getOptions(
95              Map<ChannelOption<?>, Object> result, ChannelOption<?>... options) {
96          if (result == null) {
97              result = new IdentityHashMap<ChannelOption<?>, Object>();
98          }
99          for (ChannelOption<?> o: options) {
100             result.put(o, getOption(o));
101         }
102         return result;
103     }
104 
105     @SuppressWarnings("unchecked")
106     @Override
107     public boolean setOptions(Map<ChannelOption<?>, ?> options) {
108         ObjectUtil.checkNotNull(options, "options");
109 
110         boolean setAllOptions = true;
111         for (Entry<ChannelOption<?>, ?> e: options.entrySet()) {
112             if (!setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
113                 setAllOptions = false;
114             }
115         }
116 
117         return setAllOptions;
118     }
119 
120     @Override
121     @SuppressWarnings({ "unchecked", "deprecation" })
122     public <T> T getOption(ChannelOption<T> option) {
123         ObjectUtil.checkNotNull(option, "option");
124 
125         if (option == CONNECT_TIMEOUT_MILLIS) {
126             return (T) Integer.valueOf(getConnectTimeoutMillis());
127         }
128         if (option == MAX_MESSAGES_PER_READ) {
129             return (T) Integer.valueOf(getMaxMessagesPerRead());
130         }
131         if (option == WRITE_SPIN_COUNT) {
132             return (T) Integer.valueOf(getWriteSpinCount());
133         }
134         if (option == ALLOCATOR) {
135             return (T) getAllocator();
136         }
137         if (option == RCVBUF_ALLOCATOR) {
138             return (T) getRecvByteBufAllocator();
139         }
140         if (option == AUTO_READ) {
141             return (T) Boolean.valueOf(isAutoRead());
142         }
143         if (option == AUTO_CLOSE) {
144             return (T) Boolean.valueOf(isAutoClose());
145         }
146         if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
147             return (T) Integer.valueOf(getWriteBufferHighWaterMark());
148         }
149         if (option == WRITE_BUFFER_LOW_WATER_MARK) {
150             return (T) Integer.valueOf(getWriteBufferLowWaterMark());
151         }
152         if (option == WRITE_BUFFER_WATER_MARK) {
153             return (T) getWriteBufferWaterMark();
154         }
155         if (option == MESSAGE_SIZE_ESTIMATOR) {
156             return (T) getMessageSizeEstimator();
157         }
158         if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
159             return (T) Boolean.valueOf(getPinEventExecutorPerGroup());
160         }
161         if (option == MAX_MESSAGES_PER_WRITE) {
162             return (T) Integer.valueOf(getMaxMessagesPerWrite());
163         }
164         return null;
165     }
166 
167     @Override
168     @SuppressWarnings("deprecation")
169     public <T> boolean setOption(ChannelOption<T> option, T value) {
170         validate(option, value);
171 
172         if (option == CONNECT_TIMEOUT_MILLIS) {
173             setConnectTimeoutMillis((Integer) value);
174         } else if (option == MAX_MESSAGES_PER_READ) {
175             setMaxMessagesPerRead((Integer) value);
176         } else if (option == WRITE_SPIN_COUNT) {
177             setWriteSpinCount((Integer) value);
178         } else if (option == ALLOCATOR) {
179             setAllocator((ByteBufAllocator) value);
180         } else if (option == RCVBUF_ALLOCATOR) {
181             setRecvByteBufAllocator((RecvByteBufAllocator) value);
182         } else if (option == AUTO_READ) {
183             setAutoRead((Boolean) value);
184         } else if (option == AUTO_CLOSE) {
185             setAutoClose((Boolean) value);
186         } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
187             setWriteBufferHighWaterMark((Integer) value);
188         } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
189             setWriteBufferLowWaterMark((Integer) value);
190         } else if (option == WRITE_BUFFER_WATER_MARK) {
191             setWriteBufferWaterMark((WriteBufferWaterMark) value);
192         } else if (option == MESSAGE_SIZE_ESTIMATOR) {
193             setMessageSizeEstimator((MessageSizeEstimator) value);
194         } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
195             setPinEventExecutorPerGroup((Boolean) value);
196         } else if (option == MAX_MESSAGES_PER_WRITE) {
197             setMaxMessagesPerWrite((Integer) value);
198         } else {
199             return false;
200         }
201 
202         return true;
203     }
204 
205     protected <T> void validate(ChannelOption<T> option, T value) {
206         ObjectUtil.checkNotNull(option, "option").validate(value);
207     }
208 
209     @Override
210     public int getConnectTimeoutMillis() {
211         return connectTimeoutMillis;
212     }
213 
214     @Override
215     public ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
216         checkPositiveOrZero(connectTimeoutMillis, "connectTimeoutMillis");
217         this.connectTimeoutMillis = connectTimeoutMillis;
218         return this;
219     }
220 
221     /**
222      * {@inheritDoc}
223      * <p>
224      * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
225      * {@link MaxMessagesRecvByteBufAllocator}.
226      */
227     @Override
228     @Deprecated
229     public int getMaxMessagesPerRead() {
230         try {
231             MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
232             return allocator.maxMessagesPerRead();
233         } catch (ClassCastException e) {
234             throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
235                     "MaxMessagesRecvByteBufAllocator", e);
236         }
237     }
238 
239     /**
240      * {@inheritDoc}
241      * <p>
242      * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
243      * {@link MaxMessagesRecvByteBufAllocator}.
244      */
245     @Override
246     @Deprecated
247     public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
248         try {
249             MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
250             allocator.maxMessagesPerRead(maxMessagesPerRead);
251             return this;
252         } catch (ClassCastException e) {
253             throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
254                     "MaxMessagesRecvByteBufAllocator", e);
255         }
256     }
257 
258     /**
259      * Get the maximum number of message to write per eventloop run. Once this limit is
260      * reached we will continue to process other events before trying to write the remaining messages.
261      */
262     public int getMaxMessagesPerWrite() {
263         return maxMessagesPerWrite;
264     }
265 
266      /**
267      * Set the maximum number of message to write per eventloop run. Once this limit is
268      * reached we will continue to process other events before trying to write the remaining messages.
269      */
270     public ChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) {
271         this.maxMessagesPerWrite = ObjectUtil.checkPositive(maxMessagesPerWrite, "maxMessagesPerWrite");
272         return this;
273     }
274 
275     @Override
276     public int getWriteSpinCount() {
277         return writeSpinCount;
278     }
279 
280     @Override
281     public ChannelConfig setWriteSpinCount(int writeSpinCount) {
282         checkPositive(writeSpinCount, "writeSpinCount");
283         // Integer.MAX_VALUE is used as a special value in the channel implementations to indicate the channel cannot
284         // accept any more data, and results in the writeOp being set on the selector (or execute a runnable which tries
285         // to flush later because the writeSpinCount quantum has been exhausted). This strategy prevents additional
286         // conditional logic in the channel implementations, and shouldn't be noticeable in practice.
287         if (writeSpinCount == Integer.MAX_VALUE) {
288             --writeSpinCount;
289         }
290         this.writeSpinCount = writeSpinCount;
291         return this;
292     }
293 
294     @Override
295     public ByteBufAllocator getAllocator() {
296         return allocator;
297     }
298 
299     @Override
300     public ChannelConfig setAllocator(ByteBufAllocator allocator) {
301         this.allocator = ObjectUtil.checkNotNull(allocator, "allocator");
302         return this;
303     }
304 
305     @SuppressWarnings("unchecked")
306     @Override
307     public <T extends RecvByteBufAllocator> T getRecvByteBufAllocator() {
308         return (T) rcvBufAllocator;
309     }
310 
311     @Override
312     public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
313         rcvBufAllocator = checkNotNull(allocator, "allocator");
314         return this;
315     }
316 
317     /**
318      * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
319      * @param allocator the allocator to set.
320      * @param metadata Used to set the {@link ChannelMetadata#defaultMaxMessagesPerRead()} if {@code allocator}
321      * is of type {@link MaxMessagesRecvByteBufAllocator}.
322      */
323     private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
324         checkNotNull(allocator, "allocator");
325         checkNotNull(metadata, "metadata");
326         if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
327             ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
328         }
329         setRecvByteBufAllocator(allocator);
330     }
331 
332     @Override
333     public boolean isAutoRead() {
334         return autoRead == 1;
335     }
336 
337     @Override
338     public ChannelConfig setAutoRead(boolean autoRead) {
339         boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
340         if (autoRead && !oldAutoRead) {
341             channel.read();
342         } else if (!autoRead && oldAutoRead) {
343             autoReadCleared();
344         }
345         return this;
346     }
347 
348     /**
349      * Is called once {@link #setAutoRead(boolean)} is called with {@code false} and {@link #isAutoRead()} was
350      * {@code true} before.
351      */
352     protected void autoReadCleared() { }
353 
354     @Override
355     public boolean isAutoClose() {
356         return autoClose;
357     }
358 
359     @Override
360     public ChannelConfig setAutoClose(boolean autoClose) {
361         this.autoClose = autoClose;
362         return this;
363     }
364 
365     @Override
366     public int getWriteBufferHighWaterMark() {
367         return writeBufferWaterMark.high();
368     }
369 
370     @Override
371     public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
372         checkPositiveOrZero(writeBufferHighWaterMark, "writeBufferHighWaterMark");
373         for (;;) {
374             WriteBufferWaterMark waterMark = writeBufferWaterMark;
375             if (writeBufferHighWaterMark < waterMark.low()) {
376                 throw new IllegalArgumentException(
377                         "writeBufferHighWaterMark cannot be less than " +
378                                 "writeBufferLowWaterMark (" + waterMark.low() + "): " +
379                                 writeBufferHighWaterMark);
380             }
381             if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
382                     new WriteBufferWaterMark(waterMark.low(), writeBufferHighWaterMark, false))) {
383                 return this;
384             }
385         }
386     }
387 
388     @Override
389     public int getWriteBufferLowWaterMark() {
390         return writeBufferWaterMark.low();
391     }
392 
393     @Override
394     public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
395         checkPositiveOrZero(writeBufferLowWaterMark, "writeBufferLowWaterMark");
396         for (;;) {
397             WriteBufferWaterMark waterMark = writeBufferWaterMark;
398             if (writeBufferLowWaterMark > waterMark.high()) {
399                 throw new IllegalArgumentException(
400                         "writeBufferLowWaterMark cannot be greater than " +
401                                 "writeBufferHighWaterMark (" + waterMark.high() + "): " +
402                                 writeBufferLowWaterMark);
403             }
404             if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
405                     new WriteBufferWaterMark(writeBufferLowWaterMark, waterMark.high(), false))) {
406                 return this;
407             }
408         }
409     }
410 
411     @Override
412     public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
413         this.writeBufferWaterMark = checkNotNull(writeBufferWaterMark, "writeBufferWaterMark");
414         return this;
415     }
416 
417     @Override
418     public WriteBufferWaterMark getWriteBufferWaterMark() {
419         return writeBufferWaterMark;
420     }
421 
422     @Override
423     public MessageSizeEstimator getMessageSizeEstimator() {
424         return msgSizeEstimator;
425     }
426 
427     @Override
428     public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
429         this.msgSizeEstimator = ObjectUtil.checkNotNull(estimator, "estimator");
430         return this;
431     }
432 
433     private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) {
434         this.pinEventExecutor = pinEventExecutor;
435         return this;
436     }
437 
438     private boolean getPinEventExecutorPerGroup() {
439         return pinEventExecutor;
440     }
441 
442 }