View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import java.util.ArrayList;
19  import java.util.List;
20  
21  import static io.netty.util.internal.ObjectUtil.checkPositive;
22  import static java.lang.Math.max;
23  import static java.lang.Math.min;
24  
25  /**
26   * The {@link RecvByteBufAllocator} that automatically increases and
27   * decreases the predicted buffer size on feed back.
28   * <p>
29   * It gradually increases the expected number of readable bytes if the previous
30   * read fully filled the allocated buffer.  It gradually decreases the expected
31   * number of readable bytes if the read operation was not able to fill a certain
32   * amount of the allocated buffer two times consecutively.  Otherwise, it keeps
33   * returning the same prediction.
34   */
35  public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
36  
37      static final int DEFAULT_MINIMUM = 64;
38      static final int DEFAULT_INITIAL = 1024;
39      static final int DEFAULT_MAXIMUM = 65536;
40  
41      private static final int INDEX_INCREMENT = 4;
42      private static final int INDEX_DECREMENT = 1;
43  
44      private static final int[] SIZE_TABLE;
45  
46      static {
47          List<Integer> sizeTable = new ArrayList<Integer>();
48          for (int i = 16; i < 512; i += 16) {
49              sizeTable.add(i);
50          }
51  
52          for (int i = 512; i > 0; i <<= 1) {
53              sizeTable.add(i);
54          }
55  
56          SIZE_TABLE = new int[sizeTable.size()];
57          for (int i = 0; i < SIZE_TABLE.length; i ++) {
58              SIZE_TABLE[i] = sizeTable.get(i);
59          }
60      }
61  
62      /**
63       * @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
64       */
65      @Deprecated
66      public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
67  
68      private static int getSizeTableIndex(final int size) {
69          for (int low = 0, high = SIZE_TABLE.length - 1;;) {
70              if (high < low) {
71                  return low;
72              }
73              if (high == low) {
74                  return high;
75              }
76  
77              int mid = low + high >>> 1;
78              int a = SIZE_TABLE[mid];
79              int b = SIZE_TABLE[mid + 1];
80              if (size > b) {
81                  low = mid + 1;
82              } else if (size < a) {
83                  high = mid - 1;
84              } else if (size == a) {
85                  return mid;
86              } else {
87                  return mid + 1;
88              }
89          }
90      }
91  
92      private final class HandleImpl extends MaxMessageHandle {
93          private final int minIndex;
94          private final int maxIndex;
95          private int index;
96          private int nextReceiveBufferSize;
97          private boolean decreaseNow;
98  
99          HandleImpl(int minIndex, int maxIndex, int initial) {
100             this.minIndex = minIndex;
101             this.maxIndex = maxIndex;
102 
103             index = getSizeTableIndex(initial);
104             nextReceiveBufferSize = SIZE_TABLE[index];
105         }
106 
107         @Override
108         public void lastBytesRead(int bytes) {
109             // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
110             // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
111             // the selector to check for more data. Going back to the selector can add significant latency for large
112             // data transfers.
113             if (bytes == attemptedBytesRead()) {
114                 record(bytes);
115             }
116             super.lastBytesRead(bytes);
117         }
118 
119         @Override
120         public int guess() {
121             return nextReceiveBufferSize;
122         }
123 
124         private void record(int actualReadBytes) {
125             if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
126                 if (decreaseNow) {
127                     index = max(index - INDEX_DECREMENT, minIndex);
128                     nextReceiveBufferSize = SIZE_TABLE[index];
129                     decreaseNow = false;
130                 } else {
131                     decreaseNow = true;
132                 }
133             } else if (actualReadBytes >= nextReceiveBufferSize) {
134                 index = min(index + INDEX_INCREMENT, maxIndex);
135                 nextReceiveBufferSize = SIZE_TABLE[index];
136                 decreaseNow = false;
137             }
138         }
139 
140         @Override
141         public void readComplete() {
142             record(totalBytesRead());
143         }
144     }
145 
146     private final int minIndex;
147     private final int maxIndex;
148     private final int initial;
149 
150     /**
151      * Creates a new predictor with the default parameters.  With the default
152      * parameters, the expected buffer size starts from {@code 1024}, does not
153      * go down below {@code 64}, and does not go up above {@code 65536}.
154      */
155     public AdaptiveRecvByteBufAllocator() {
156         this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
157     }
158 
159     /**
160      * Creates a new predictor with the specified parameters.
161      *
162      * @param minimum  the inclusive lower bound of the expected buffer size
163      * @param initial  the initial buffer size when no feed back was received
164      * @param maximum  the inclusive upper bound of the expected buffer size
165      */
166     public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
167         checkPositive(minimum, "minimum");
168         if (initial < minimum) {
169             throw new IllegalArgumentException("initial: " + initial);
170         }
171         if (maximum < initial) {
172             throw new IllegalArgumentException("maximum: " + maximum);
173         }
174 
175         int minIndex = getSizeTableIndex(minimum);
176         if (SIZE_TABLE[minIndex] < minimum) {
177             this.minIndex = minIndex + 1;
178         } else {
179             this.minIndex = minIndex;
180         }
181 
182         int maxIndex = getSizeTableIndex(maximum);
183         if (SIZE_TABLE[maxIndex] > maximum) {
184             this.maxIndex = maxIndex - 1;
185         } else {
186             this.maxIndex = maxIndex;
187         }
188 
189         this.initial = initial;
190     }
191 
192     @SuppressWarnings("deprecation")
193     @Override
194     public Handle newHandle() {
195         return new HandleImpl(minIndex, maxIndex, initial);
196     }
197 
198     @Override
199     public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
200         super.respectMaybeMoreData(respectMaybeMoreData);
201         return this;
202     }
203 }