View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import java.util.ArrayList;
19  import java.util.List;
20  
21  import static io.netty.util.internal.ObjectUtil.checkPositive;
22  import static java.lang.Math.max;
23  import static java.lang.Math.min;
24  
25  /**
26   * The {@link RecvByteBufAllocator} that automatically increases and
27   * decreases the predicted buffer size on feed back.
28   * <p>
29   * It gradually increases the expected number of readable bytes if the previous
30   * read fully filled the allocated buffer.  It gradually decreases the expected
31   * number of readable bytes if the read operation was not able to fill a certain
32   * amount of the allocated buffer two times consecutively.  Otherwise, it keeps
33   * returning the same prediction.
34   */
35  public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
36  
37      static final int DEFAULT_MINIMUM = 64;
38      // Use an initial value that is bigger than the common MTU of 1500
39      static final int DEFAULT_INITIAL = 2048;
40      static final int DEFAULT_MAXIMUM = 65536;
41  
42      private static final int INDEX_INCREMENT = 4;
43      private static final int INDEX_DECREMENT = 1;
44  
45      private static final int[] SIZE_TABLE;
46  
47      static {
48          List<Integer> sizeTable = new ArrayList<Integer>();
49          for (int i = 16; i < 512; i += 16) {
50              sizeTable.add(i);
51          }
52  
53          // Suppress a warning since i becomes negative when an integer overflow happens
54          for (int i = 512; i > 0; i <<= 1) { // lgtm[java/constant-comparison]
55              sizeTable.add(i);
56          }
57  
58          SIZE_TABLE = new int[sizeTable.size()];
59          for (int i = 0; i < SIZE_TABLE.length; i ++) {
60              SIZE_TABLE[i] = sizeTable.get(i);
61          }
62      }
63  
64      /**
65       * @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
66       */
67      @Deprecated
68      public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
69  
70      private static int getSizeTableIndex(final int size) {
71          for (int low = 0, high = SIZE_TABLE.length - 1;;) {
72              if (high < low) {
73                  return low;
74              }
75              if (high == low) {
76                  return high;
77              }
78  
79              int mid = low + high >>> 1;
80              int a = SIZE_TABLE[mid];
81              int b = SIZE_TABLE[mid + 1];
82              if (size > b) {
83                  low = mid + 1;
84              } else if (size < a) {
85                  high = mid - 1;
86              } else if (size == a) {
87                  return mid;
88              } else {
89                  return mid + 1;
90              }
91          }
92      }
93  
94      private final class HandleImpl extends MaxMessageHandle {
95          private final int minIndex;
96          private final int maxIndex;
97          private int index;
98          private int nextReceiveBufferSize;
99          private boolean decreaseNow;
100 
101         HandleImpl(int minIndex, int maxIndex, int initial) {
102             this.minIndex = minIndex;
103             this.maxIndex = maxIndex;
104 
105             index = getSizeTableIndex(initial);
106             nextReceiveBufferSize = SIZE_TABLE[index];
107         }
108 
109         @Override
110         public void lastBytesRead(int bytes) {
111             // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
112             // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
113             // the selector to check for more data. Going back to the selector can add significant latency for large
114             // data transfers.
115             if (bytes == attemptedBytesRead()) {
116                 record(bytes);
117             }
118             super.lastBytesRead(bytes);
119         }
120 
121         @Override
122         public int guess() {
123             return nextReceiveBufferSize;
124         }
125 
126         private void record(int actualReadBytes) {
127             if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
128                 if (decreaseNow) {
129                     index = max(index - INDEX_DECREMENT, minIndex);
130                     nextReceiveBufferSize = SIZE_TABLE[index];
131                     decreaseNow = false;
132                 } else {
133                     decreaseNow = true;
134                 }
135             } else if (actualReadBytes >= nextReceiveBufferSize) {
136                 index = min(index + INDEX_INCREMENT, maxIndex);
137                 nextReceiveBufferSize = SIZE_TABLE[index];
138                 decreaseNow = false;
139             }
140         }
141 
142         @Override
143         public void readComplete() {
144             record(totalBytesRead());
145         }
146     }
147 
148     private final int minIndex;
149     private final int maxIndex;
150     private final int initial;
151 
152     /**
153      * Creates a new predictor with the default parameters.  With the default
154      * parameters, the expected buffer size starts from {@code 1024}, does not
155      * go down below {@code 64}, and does not go up above {@code 65536}.
156      */
157     public AdaptiveRecvByteBufAllocator() {
158         this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
159     }
160 
161     /**
162      * Creates a new predictor with the specified parameters.
163      *
164      * @param minimum  the inclusive lower bound of the expected buffer size
165      * @param initial  the initial buffer size when no feed back was received
166      * @param maximum  the inclusive upper bound of the expected buffer size
167      */
168     public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
169         checkPositive(minimum, "minimum");
170         if (initial < minimum) {
171             throw new IllegalArgumentException("initial: " + initial);
172         }
173         if (maximum < initial) {
174             throw new IllegalArgumentException("maximum: " + maximum);
175         }
176 
177         int minIndex = getSizeTableIndex(minimum);
178         if (SIZE_TABLE[minIndex] < minimum) {
179             this.minIndex = minIndex + 1;
180         } else {
181             this.minIndex = minIndex;
182         }
183 
184         int maxIndex = getSizeTableIndex(maximum);
185         if (SIZE_TABLE[maxIndex] > maximum) {
186             this.maxIndex = maxIndex - 1;
187         } else {
188             this.maxIndex = maxIndex;
189         }
190 
191         this.initial = initial;
192     }
193 
194     @SuppressWarnings("deprecation")
195     @Override
196     public Handle newHandle() {
197         return new HandleImpl(minIndex, maxIndex, initial);
198     }
199 
200     @Override
201     public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
202         super.respectMaybeMoreData(respectMaybeMoreData);
203         return this;
204     }
205 }