View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  
20  import java.util.IdentityHashMap;
21  import java.util.Map;
22  import java.util.Map.Entry;
23  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
24  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
25  
26  import static io.netty.channel.ChannelOption.ALLOCATOR;
27  import static io.netty.channel.ChannelOption.AUTO_CLOSE;
28  import static io.netty.channel.ChannelOption.AUTO_READ;
29  import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
30  import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ;
31  import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
32  import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
33  import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
34  import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
35  import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
36  import static io.netty.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
37  import static io.netty.channel.ChannelOption.WRITE_SPIN_COUNT;
38  import static io.netty.util.internal.ObjectUtil.checkNotNull;
39  import static io.netty.util.internal.ObjectUtil.checkPositive;
40  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
41  
42  /**
43   * The default {@link ChannelConfig} implementation.
44   */
45  public class DefaultChannelConfig implements ChannelConfig {
46      private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
47  
48      private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
49  
50      private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER =
51              AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class, "autoRead");
52      private static final AtomicReferenceFieldUpdater<DefaultChannelConfig, WriteBufferWaterMark> WATERMARK_UPDATER =
53              AtomicReferenceFieldUpdater.newUpdater(
54                      DefaultChannelConfig.class, WriteBufferWaterMark.class, "writeBufferWaterMark");
55  
56      protected final Channel channel;
57  
58      private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
59      private volatile RecvByteBufAllocator rcvBufAllocator;
60      private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
61  
62      private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
63      private volatile int writeSpinCount = 16;
64      @SuppressWarnings("FieldMayBeFinal")
65      private volatile int autoRead = 1;
66      private volatile boolean autoClose = true;
67      private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
68      private volatile boolean pinEventExecutor = true;
69  
70      public DefaultChannelConfig(Channel channel) {
71          this(channel, new AdaptiveRecvByteBufAllocator());
72      }
73  
74      protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
75          setRecvByteBufAllocator(allocator, channel.metadata());
76          this.channel = channel;
77      }
78  
79      @Override
80      @SuppressWarnings("deprecation")
81      public Map<ChannelOption<?>, Object> getOptions() {
82          return getOptions(
83                  null,
84                  CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
85                  ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK,
86                  WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR,
87                  SINGLE_EVENTEXECUTOR_PER_GROUP);
88      }
89  
90      protected Map<ChannelOption<?>, Object> getOptions(
91              Map<ChannelOption<?>, Object> result, ChannelOption<?>... options) {
92          if (result == null) {
93              result = new IdentityHashMap<ChannelOption<?>, Object>();
94          }
95          for (ChannelOption<?> o: options) {
96              result.put(o, getOption(o));
97          }
98          return result;
99      }
100 
101     @SuppressWarnings("unchecked")
102     @Override
103     public boolean setOptions(Map<ChannelOption<?>, ?> options) {
104         if (options == null) {
105             throw new NullPointerException("options");
106         }
107 
108         boolean setAllOptions = true;
109         for (Entry<ChannelOption<?>, ?> e: options.entrySet()) {
110             if (!setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
111                 setAllOptions = false;
112             }
113         }
114 
115         return setAllOptions;
116     }
117 
118     @Override
119     @SuppressWarnings({ "unchecked", "deprecation" })
120     public <T> T getOption(ChannelOption<T> option) {
121         if (option == null) {
122             throw new NullPointerException("option");
123         }
124 
125         if (option == CONNECT_TIMEOUT_MILLIS) {
126             return (T) Integer.valueOf(getConnectTimeoutMillis());
127         }
128         if (option == MAX_MESSAGES_PER_READ) {
129             return (T) Integer.valueOf(getMaxMessagesPerRead());
130         }
131         if (option == WRITE_SPIN_COUNT) {
132             return (T) Integer.valueOf(getWriteSpinCount());
133         }
134         if (option == ALLOCATOR) {
135             return (T) getAllocator();
136         }
137         if (option == RCVBUF_ALLOCATOR) {
138             return (T) getRecvByteBufAllocator();
139         }
140         if (option == AUTO_READ) {
141             return (T) Boolean.valueOf(isAutoRead());
142         }
143         if (option == AUTO_CLOSE) {
144             return (T) Boolean.valueOf(isAutoClose());
145         }
146         if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
147             return (T) Integer.valueOf(getWriteBufferHighWaterMark());
148         }
149         if (option == WRITE_BUFFER_LOW_WATER_MARK) {
150             return (T) Integer.valueOf(getWriteBufferLowWaterMark());
151         }
152         if (option == WRITE_BUFFER_WATER_MARK) {
153             return (T) getWriteBufferWaterMark();
154         }
155         if (option == MESSAGE_SIZE_ESTIMATOR) {
156             return (T) getMessageSizeEstimator();
157         }
158         if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
159             return (T) Boolean.valueOf(getPinEventExecutorPerGroup());
160         }
161         return null;
162     }
163 
164     @Override
165     @SuppressWarnings("deprecation")
166     public <T> boolean setOption(ChannelOption<T> option, T value) {
167         validate(option, value);
168 
169         if (option == CONNECT_TIMEOUT_MILLIS) {
170             setConnectTimeoutMillis((Integer) value);
171         } else if (option == MAX_MESSAGES_PER_READ) {
172             setMaxMessagesPerRead((Integer) value);
173         } else if (option == WRITE_SPIN_COUNT) {
174             setWriteSpinCount((Integer) value);
175         } else if (option == ALLOCATOR) {
176             setAllocator((ByteBufAllocator) value);
177         } else if (option == RCVBUF_ALLOCATOR) {
178             setRecvByteBufAllocator((RecvByteBufAllocator) value);
179         } else if (option == AUTO_READ) {
180             setAutoRead((Boolean) value);
181         } else if (option == AUTO_CLOSE) {
182             setAutoClose((Boolean) value);
183         } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
184             setWriteBufferHighWaterMark((Integer) value);
185         } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
186             setWriteBufferLowWaterMark((Integer) value);
187         } else if (option == WRITE_BUFFER_WATER_MARK) {
188             setWriteBufferWaterMark((WriteBufferWaterMark) value);
189         } else if (option == MESSAGE_SIZE_ESTIMATOR) {
190             setMessageSizeEstimator((MessageSizeEstimator) value);
191         } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
192             setPinEventExecutorPerGroup((Boolean) value);
193         } else {
194             return false;
195         }
196 
197         return true;
198     }
199 
200     protected <T> void validate(ChannelOption<T> option, T value) {
201         if (option == null) {
202             throw new NullPointerException("option");
203         }
204         option.validate(value);
205     }
206 
207     @Override
208     public int getConnectTimeoutMillis() {
209         return connectTimeoutMillis;
210     }
211 
212     @Override
213     public ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
214         checkPositiveOrZero(connectTimeoutMillis, "connectTimeoutMillis");
215         this.connectTimeoutMillis = connectTimeoutMillis;
216         return this;
217     }
218 
219     /**
220      * {@inheritDoc}
221      * <p>
222      * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
223      * {@link MaxMessagesRecvByteBufAllocator}.
224      */
225     @Override
226     @Deprecated
227     public int getMaxMessagesPerRead() {
228         try {
229             MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
230             return allocator.maxMessagesPerRead();
231         } catch (ClassCastException e) {
232             throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
233                     "MaxMessagesRecvByteBufAllocator", e);
234         }
235     }
236 
237     /**
238      * {@inheritDoc}
239      * <p>
240      * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
241      * {@link MaxMessagesRecvByteBufAllocator}.
242      */
243     @Override
244     @Deprecated
245     public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
246         try {
247             MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
248             allocator.maxMessagesPerRead(maxMessagesPerRead);
249             return this;
250         } catch (ClassCastException e) {
251             throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
252                     "MaxMessagesRecvByteBufAllocator", e);
253         }
254     }
255 
256     @Override
257     public int getWriteSpinCount() {
258         return writeSpinCount;
259     }
260 
261     @Override
262     public ChannelConfig setWriteSpinCount(int writeSpinCount) {
263         checkPositive(writeSpinCount, "writeSpinCount");
264         // Integer.MAX_VALUE is used as a special value in the channel implementations to indicate the channel cannot
265         // accept any more data, and results in the writeOp being set on the selector (or execute a runnable which tries
266         // to flush later because the writeSpinCount quantum has been exhausted). This strategy prevents additional
267         // conditional logic in the channel implementations, and shouldn't be noticeable in practice.
268         if (writeSpinCount == Integer.MAX_VALUE) {
269             --writeSpinCount;
270         }
271         this.writeSpinCount = writeSpinCount;
272         return this;
273     }
274 
275     @Override
276     public ByteBufAllocator getAllocator() {
277         return allocator;
278     }
279 
280     @Override
281     public ChannelConfig setAllocator(ByteBufAllocator allocator) {
282         if (allocator == null) {
283             throw new NullPointerException("allocator");
284         }
285         this.allocator = allocator;
286         return this;
287     }
288 
289     @SuppressWarnings("unchecked")
290     @Override
291     public <T extends RecvByteBufAllocator> T getRecvByteBufAllocator() {
292         return (T) rcvBufAllocator;
293     }
294 
295     @Override
296     public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
297         rcvBufAllocator = checkNotNull(allocator, "allocator");
298         return this;
299     }
300 
301     /**
302      * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
303      * @param allocator the allocator to set.
304      * @param metadata Used to set the {@link ChannelMetadata#defaultMaxMessagesPerRead()} if {@code allocator}
305      * is of type {@link MaxMessagesRecvByteBufAllocator}.
306      */
307     private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
308         if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
309             ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
310         } else if (allocator == null) {
311             throw new NullPointerException("allocator");
312         }
313         setRecvByteBufAllocator(allocator);
314     }
315 
316     @Override
317     public boolean isAutoRead() {
318         return autoRead == 1;
319     }
320 
321     @Override
322     public ChannelConfig setAutoRead(boolean autoRead) {
323         boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
324         if (autoRead && !oldAutoRead) {
325             channel.read();
326         } else if (!autoRead && oldAutoRead) {
327             autoReadCleared();
328         }
329         return this;
330     }
331 
332     /**
333      * Is called once {@link #setAutoRead(boolean)} is called with {@code false} and {@link #isAutoRead()} was
334      * {@code true} before.
335      */
336     protected void autoReadCleared() { }
337 
338     @Override
339     public boolean isAutoClose() {
340         return autoClose;
341     }
342 
343     @Override
344     public ChannelConfig setAutoClose(boolean autoClose) {
345         this.autoClose = autoClose;
346         return this;
347     }
348 
349     @Override
350     public int getWriteBufferHighWaterMark() {
351         return writeBufferWaterMark.high();
352     }
353 
354     @Override
355     public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
356         checkPositiveOrZero(writeBufferHighWaterMark, "writeBufferHighWaterMark");
357         for (;;) {
358             WriteBufferWaterMark waterMark = writeBufferWaterMark;
359             if (writeBufferHighWaterMark < waterMark.low()) {
360                 throw new IllegalArgumentException(
361                         "writeBufferHighWaterMark cannot be less than " +
362                                 "writeBufferLowWaterMark (" + waterMark.low() + "): " +
363                                 writeBufferHighWaterMark);
364             }
365             if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
366                     new WriteBufferWaterMark(waterMark.low(), writeBufferHighWaterMark, false))) {
367                 return this;
368             }
369         }
370     }
371 
372     @Override
373     public int getWriteBufferLowWaterMark() {
374         return writeBufferWaterMark.low();
375     }
376 
377     @Override
378     public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
379         checkPositiveOrZero(writeBufferLowWaterMark, "writeBufferLowWaterMark");
380         for (;;) {
381             WriteBufferWaterMark waterMark = writeBufferWaterMark;
382             if (writeBufferLowWaterMark > waterMark.high()) {
383                 throw new IllegalArgumentException(
384                         "writeBufferLowWaterMark cannot be greater than " +
385                                 "writeBufferHighWaterMark (" + waterMark.high() + "): " +
386                                 writeBufferLowWaterMark);
387             }
388             if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
389                     new WriteBufferWaterMark(writeBufferLowWaterMark, waterMark.high(), false))) {
390                 return this;
391             }
392         }
393     }
394 
395     @Override
396     public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
397         this.writeBufferWaterMark = checkNotNull(writeBufferWaterMark, "writeBufferWaterMark");
398         return this;
399     }
400 
401     @Override
402     public WriteBufferWaterMark getWriteBufferWaterMark() {
403         return writeBufferWaterMark;
404     }
405 
406     @Override
407     public MessageSizeEstimator getMessageSizeEstimator() {
408         return msgSizeEstimator;
409     }
410 
411     @Override
412     public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
413         if (estimator == null) {
414             throw new NullPointerException("estimator");
415         }
416         msgSizeEstimator = estimator;
417         return this;
418     }
419 
420     private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) {
421         this.pinEventExecutor = pinEventExecutor;
422         return this;
423     }
424 
425     private boolean getPinEventExecutorPerGroup() {
426         return pinEventExecutor;
427     }
428 
429 }