View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  
20  import java.util.IdentityHashMap;
21  import java.util.Map;
22  import java.util.Map.Entry;
23  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
24  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
25  
26  import static io.netty.channel.ChannelOption.ALLOCATOR;
27  import static io.netty.channel.ChannelOption.AUTO_CLOSE;
28  import static io.netty.channel.ChannelOption.AUTO_READ;
29  import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
30  import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ;
31  import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
32  import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
33  import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
34  import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
35  import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
36  import static io.netty.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
37  import static io.netty.channel.ChannelOption.WRITE_SPIN_COUNT;
38  import static io.netty.util.internal.ObjectUtil.checkNotNull;
39  
40  /**
41   * The default {@link ChannelConfig} implementation.
42   */
43  public class DefaultChannelConfig implements ChannelConfig {
44      private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
45  
46      private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
47  
48      private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER =
49              AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class, "autoRead");
50      private static final AtomicReferenceFieldUpdater<DefaultChannelConfig, WriteBufferWaterMark> WATERMARK_UPDATER =
51              AtomicReferenceFieldUpdater.newUpdater(
52                      DefaultChannelConfig.class, WriteBufferWaterMark.class, "writeBufferWaterMark");
53  
54      protected final Channel channel;
55  
56      private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
57      private volatile RecvByteBufAllocator rcvBufAllocator;
58      private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
59  
60      private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
61      private volatile int writeSpinCount = 16;
62      @SuppressWarnings("FieldMayBeFinal")
63      private volatile int autoRead = 1;
64      private volatile boolean autoClose = true;
65      private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
66      private volatile boolean pinEventExecutor = true;
67  
68      public DefaultChannelConfig(Channel channel) {
69          this(channel, new AdaptiveRecvByteBufAllocator());
70      }
71  
72      protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
73          setRecvByteBufAllocator(allocator, channel.metadata());
74          this.channel = channel;
75      }
76  
77      @Override
78      @SuppressWarnings("deprecation")
79      public Map<ChannelOption<?>, Object> getOptions() {
80          return getOptions(
81                  null,
82                  CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
83                  ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK,
84                  WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR,
85                  SINGLE_EVENTEXECUTOR_PER_GROUP);
86      }
87  
88      protected Map<ChannelOption<?>, Object> getOptions(
89              Map<ChannelOption<?>, Object> result, ChannelOption<?>... options) {
90          if (result == null) {
91              result = new IdentityHashMap<ChannelOption<?>, Object>();
92          }
93          for (ChannelOption<?> o: options) {
94              result.put(o, getOption(o));
95          }
96          return result;
97      }
98  
99      @SuppressWarnings("unchecked")
100     @Override
101     public boolean setOptions(Map<ChannelOption<?>, ?> options) {
102         if (options == null) {
103             throw new NullPointerException("options");
104         }
105 
106         boolean setAllOptions = true;
107         for (Entry<ChannelOption<?>, ?> e: options.entrySet()) {
108             if (!setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
109                 setAllOptions = false;
110             }
111         }
112 
113         return setAllOptions;
114     }
115 
116     @Override
117     @SuppressWarnings({ "unchecked", "deprecation" })
118     public <T> T getOption(ChannelOption<T> option) {
119         if (option == null) {
120             throw new NullPointerException("option");
121         }
122 
123         if (option == CONNECT_TIMEOUT_MILLIS) {
124             return (T) Integer.valueOf(getConnectTimeoutMillis());
125         }
126         if (option == MAX_MESSAGES_PER_READ) {
127             return (T) Integer.valueOf(getMaxMessagesPerRead());
128         }
129         if (option == WRITE_SPIN_COUNT) {
130             return (T) Integer.valueOf(getWriteSpinCount());
131         }
132         if (option == ALLOCATOR) {
133             return (T) getAllocator();
134         }
135         if (option == RCVBUF_ALLOCATOR) {
136             return (T) getRecvByteBufAllocator();
137         }
138         if (option == AUTO_READ) {
139             return (T) Boolean.valueOf(isAutoRead());
140         }
141         if (option == AUTO_CLOSE) {
142             return (T) Boolean.valueOf(isAutoClose());
143         }
144         if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
145             return (T) Integer.valueOf(getWriteBufferHighWaterMark());
146         }
147         if (option == WRITE_BUFFER_LOW_WATER_MARK) {
148             return (T) Integer.valueOf(getWriteBufferLowWaterMark());
149         }
150         if (option == WRITE_BUFFER_WATER_MARK) {
151             return (T) getWriteBufferWaterMark();
152         }
153         if (option == MESSAGE_SIZE_ESTIMATOR) {
154             return (T) getMessageSizeEstimator();
155         }
156         if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
157             return (T) Boolean.valueOf(getPinEventExecutorPerGroup());
158         }
159         return null;
160     }
161 
162     @Override
163     @SuppressWarnings("deprecation")
164     public <T> boolean setOption(ChannelOption<T> option, T value) {
165         validate(option, value);
166 
167         if (option == CONNECT_TIMEOUT_MILLIS) {
168             setConnectTimeoutMillis((Integer) value);
169         } else if (option == MAX_MESSAGES_PER_READ) {
170             setMaxMessagesPerRead((Integer) value);
171         } else if (option == WRITE_SPIN_COUNT) {
172             setWriteSpinCount((Integer) value);
173         } else if (option == ALLOCATOR) {
174             setAllocator((ByteBufAllocator) value);
175         } else if (option == RCVBUF_ALLOCATOR) {
176             setRecvByteBufAllocator((RecvByteBufAllocator) value);
177         } else if (option == AUTO_READ) {
178             setAutoRead((Boolean) value);
179         } else if (option == AUTO_CLOSE) {
180             setAutoClose((Boolean) value);
181         } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
182             setWriteBufferHighWaterMark((Integer) value);
183         } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
184             setWriteBufferLowWaterMark((Integer) value);
185         } else if (option == WRITE_BUFFER_WATER_MARK) {
186             setWriteBufferWaterMark((WriteBufferWaterMark) value);
187         } else if (option == MESSAGE_SIZE_ESTIMATOR) {
188             setMessageSizeEstimator((MessageSizeEstimator) value);
189         } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
190             setPinEventExecutorPerGroup((Boolean) value);
191         } else {
192             return false;
193         }
194 
195         return true;
196     }
197 
198     protected <T> void validate(ChannelOption<T> option, T value) {
199         if (option == null) {
200             throw new NullPointerException("option");
201         }
202         option.validate(value);
203     }
204 
205     @Override
206     public int getConnectTimeoutMillis() {
207         return connectTimeoutMillis;
208     }
209 
210     @Override
211     public ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
212         if (connectTimeoutMillis < 0) {
213             throw new IllegalArgumentException(String.format(
214                     "connectTimeoutMillis: %d (expected: >= 0)", connectTimeoutMillis));
215         }
216         this.connectTimeoutMillis = connectTimeoutMillis;
217         return this;
218     }
219 
220     /**
221      * {@inheritDoc}
222      * <p>
223      * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
224      * {@link MaxMessagesRecvByteBufAllocator}.
225      */
226     @Override
227     @Deprecated
228     public int getMaxMessagesPerRead() {
229         try {
230             MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
231             return allocator.maxMessagesPerRead();
232         } catch (ClassCastException e) {
233             throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
234                     "MaxMessagesRecvByteBufAllocator", e);
235         }
236     }
237 
238     /**
239      * {@inheritDoc}
240      * <p>
241      * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
242      * {@link MaxMessagesRecvByteBufAllocator}.
243      */
244     @Override
245     @Deprecated
246     public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
247         try {
248             MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
249             allocator.maxMessagesPerRead(maxMessagesPerRead);
250             return this;
251         } catch (ClassCastException e) {
252             throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
253                     "MaxMessagesRecvByteBufAllocator", e);
254         }
255     }
256 
257     @Override
258     public int getWriteSpinCount() {
259         return writeSpinCount;
260     }
261 
262     @Override
263     public ChannelConfig setWriteSpinCount(int writeSpinCount) {
264         if (writeSpinCount <= 0) {
265             throw new IllegalArgumentException(
266                     "writeSpinCount must be a positive integer.");
267         }
268         // Integer.MAX_VALUE is used as a special value in the channel implementations to indicate the channel cannot
269         // accept any more data, and results in the writeOp being set on the selector (or execute a runnable which tries
270         // to flush later because the writeSpinCount quantum has been exhausted). This strategy prevents additional
271         // conditional logic in the channel implementations, and shouldn't be noticeable in practice.
272         if (writeSpinCount == Integer.MAX_VALUE) {
273             --writeSpinCount;
274         }
275         this.writeSpinCount = writeSpinCount;
276         return this;
277     }
278 
279     @Override
280     public ByteBufAllocator getAllocator() {
281         return allocator;
282     }
283 
284     @Override
285     public ChannelConfig setAllocator(ByteBufAllocator allocator) {
286         if (allocator == null) {
287             throw new NullPointerException("allocator");
288         }
289         this.allocator = allocator;
290         return this;
291     }
292 
293     @SuppressWarnings("unchecked")
294     @Override
295     public <T extends RecvByteBufAllocator> T getRecvByteBufAllocator() {
296         return (T) rcvBufAllocator;
297     }
298 
299     @Override
300     public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
301         rcvBufAllocator = checkNotNull(allocator, "allocator");
302         return this;
303     }
304 
305     /**
306      * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
307      * @param allocator the allocator to set.
308      * @param metadata Used to set the {@link ChannelMetadata#defaultMaxMessagesPerRead()} if {@code allocator}
309      * is of type {@link MaxMessagesRecvByteBufAllocator}.
310      */
311     private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
312         if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
313             ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
314         } else if (allocator == null) {
315             throw new NullPointerException("allocator");
316         }
317         setRecvByteBufAllocator(allocator);
318     }
319 
320     @Override
321     public boolean isAutoRead() {
322         return autoRead == 1;
323     }
324 
325     @Override
326     public ChannelConfig setAutoRead(boolean autoRead) {
327         boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
328         if (autoRead && !oldAutoRead) {
329             channel.read();
330         } else if (!autoRead && oldAutoRead) {
331             autoReadCleared();
332         }
333         return this;
334     }
335 
336     /**
337      * Is called once {@link #setAutoRead(boolean)} is called with {@code false} and {@link #isAutoRead()} was
338      * {@code true} before.
339      */
340     protected void autoReadCleared() { }
341 
342     @Override
343     public boolean isAutoClose() {
344         return autoClose;
345     }
346 
347     @Override
348     public ChannelConfig setAutoClose(boolean autoClose) {
349         this.autoClose = autoClose;
350         return this;
351     }
352 
353     @Override
354     public int getWriteBufferHighWaterMark() {
355         return writeBufferWaterMark.high();
356     }
357 
358     @Override
359     public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
360         if (writeBufferHighWaterMark < 0) {
361             throw new IllegalArgumentException(
362                     "writeBufferHighWaterMark must be >= 0");
363         }
364         for (;;) {
365             WriteBufferWaterMark waterMark = writeBufferWaterMark;
366             if (writeBufferHighWaterMark < waterMark.low()) {
367                 throw new IllegalArgumentException(
368                         "writeBufferHighWaterMark cannot be less than " +
369                                 "writeBufferLowWaterMark (" + waterMark.low() + "): " +
370                                 writeBufferHighWaterMark);
371             }
372             if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
373                     new WriteBufferWaterMark(waterMark.low(), writeBufferHighWaterMark, false))) {
374                 return this;
375             }
376         }
377     }
378 
379     @Override
380     public int getWriteBufferLowWaterMark() {
381         return writeBufferWaterMark.low();
382     }
383 
384     @Override
385     public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
386         if (writeBufferLowWaterMark < 0) {
387             throw new IllegalArgumentException(
388                     "writeBufferLowWaterMark must be >= 0");
389         }
390         for (;;) {
391             WriteBufferWaterMark waterMark = writeBufferWaterMark;
392             if (writeBufferLowWaterMark > waterMark.high()) {
393                 throw new IllegalArgumentException(
394                         "writeBufferLowWaterMark cannot be greater than " +
395                                 "writeBufferHighWaterMark (" + waterMark.high() + "): " +
396                                 writeBufferLowWaterMark);
397             }
398             if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
399                     new WriteBufferWaterMark(writeBufferLowWaterMark, waterMark.high(), false))) {
400                 return this;
401             }
402         }
403     }
404 
405     @Override
406     public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
407         this.writeBufferWaterMark = checkNotNull(writeBufferWaterMark, "writeBufferWaterMark");
408         return this;
409     }
410 
411     @Override
412     public WriteBufferWaterMark getWriteBufferWaterMark() {
413         return writeBufferWaterMark;
414     }
415 
416     @Override
417     public MessageSizeEstimator getMessageSizeEstimator() {
418         return msgSizeEstimator;
419     }
420 
421     @Override
422     public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
423         if (estimator == null) {
424             throw new NullPointerException("estimator");
425         }
426         msgSizeEstimator = estimator;
427         return this;
428     }
429 
430     private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) {
431         this.pinEventExecutor = pinEventExecutor;
432         return this;
433     }
434 
435     private boolean getPinEventExecutorPerGroup() {
436         return pinEventExecutor;
437     }
438 
439 }