View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import java.util.ArrayList;
19  import java.util.List;
20  
21  import static java.lang.Math.max;
22  import static java.lang.Math.min;
23  
24  /**
25   * The {@link RecvByteBufAllocator} that automatically increases and
26   * decreases the predicted buffer size on feed back.
27   * <p>
28   * It gradually increases the expected number of readable bytes if the previous
29   * read fully filled the allocated buffer.  It gradually decreases the expected
30   * number of readable bytes if the read operation was not able to fill a certain
31   * amount of the allocated buffer two times consecutively.  Otherwise, it keeps
32   * returning the same prediction.
33   */
34  public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
35  
36      static final int DEFAULT_MINIMUM = 64;
37      static final int DEFAULT_INITIAL = 1024;
38      static final int DEFAULT_MAXIMUM = 65536;
39  
40      private static final int INDEX_INCREMENT = 4;
41      private static final int INDEX_DECREMENT = 1;
42  
43      private static final int[] SIZE_TABLE;
44  
45      static {
46          List<Integer> sizeTable = new ArrayList<Integer>();
47          for (int i = 16; i < 512; i += 16) {
48              sizeTable.add(i);
49          }
50  
51          for (int i = 512; i > 0; i <<= 1) {
52              sizeTable.add(i);
53          }
54  
55          SIZE_TABLE = new int[sizeTable.size()];
56          for (int i = 0; i < SIZE_TABLE.length; i ++) {
57              SIZE_TABLE[i] = sizeTable.get(i);
58          }
59      }
60  
61      /**
62       * @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
63       */
64      @Deprecated
65      public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
66  
67      private static int getSizeTableIndex(final int size) {
68          for (int low = 0, high = SIZE_TABLE.length - 1;;) {
69              if (high < low) {
70                  return low;
71              }
72              if (high == low) {
73                  return high;
74              }
75  
76              int mid = low + high >>> 1;
77              int a = SIZE_TABLE[mid];
78              int b = SIZE_TABLE[mid + 1];
79              if (size > b) {
80                  low = mid + 1;
81              } else if (size < a) {
82                  high = mid - 1;
83              } else if (size == a) {
84                  return mid;
85              } else {
86                  return mid + 1;
87              }
88          }
89      }
90  
91      private final class HandleImpl extends MaxMessageHandle {
92          private final int minIndex;
93          private final int maxIndex;
94          private int index;
95          private int nextReceiveBufferSize;
96          private boolean decreaseNow;
97  
98          public HandleImpl(int minIndex, int maxIndex, int initial) {
99              this.minIndex = minIndex;
100             this.maxIndex = maxIndex;
101 
102             index = getSizeTableIndex(initial);
103             nextReceiveBufferSize = SIZE_TABLE[index];
104         }
105 
106         @Override
107         public void lastBytesRead(int bytes) {
108             // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
109             // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
110             // the selector to check for more data. Going back to the selector can add significant latency for large
111             // data transfers.
112             if (bytes == attemptedBytesRead()) {
113                 record(bytes);
114             }
115             super.lastBytesRead(bytes);
116         }
117 
118         @Override
119         public int guess() {
120             return nextReceiveBufferSize;
121         }
122 
123         private void record(int actualReadBytes) {
124             if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
125                 if (decreaseNow) {
126                     index = max(index - INDEX_DECREMENT, minIndex);
127                     nextReceiveBufferSize = SIZE_TABLE[index];
128                     decreaseNow = false;
129                 } else {
130                     decreaseNow = true;
131                 }
132             } else if (actualReadBytes >= nextReceiveBufferSize) {
133                 index = min(index + INDEX_INCREMENT, maxIndex);
134                 nextReceiveBufferSize = SIZE_TABLE[index];
135                 decreaseNow = false;
136             }
137         }
138 
139         @Override
140         public void readComplete() {
141             record(totalBytesRead());
142         }
143     }
144 
145     private final int minIndex;
146     private final int maxIndex;
147     private final int initial;
148 
149     /**
150      * Creates a new predictor with the default parameters.  With the default
151      * parameters, the expected buffer size starts from {@code 1024}, does not
152      * go down below {@code 64}, and does not go up above {@code 65536}.
153      */
154     public AdaptiveRecvByteBufAllocator() {
155         this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
156     }
157 
158     /**
159      * Creates a new predictor with the specified parameters.
160      *
161      * @param minimum  the inclusive lower bound of the expected buffer size
162      * @param initial  the initial buffer size when no feed back was received
163      * @param maximum  the inclusive upper bound of the expected buffer size
164      */
165     public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
166         if (minimum <= 0) {
167             throw new IllegalArgumentException("minimum: " + minimum);
168         }
169         if (initial < minimum) {
170             throw new IllegalArgumentException("initial: " + initial);
171         }
172         if (maximum < initial) {
173             throw new IllegalArgumentException("maximum: " + maximum);
174         }
175 
176         int minIndex = getSizeTableIndex(minimum);
177         if (SIZE_TABLE[minIndex] < minimum) {
178             this.minIndex = minIndex + 1;
179         } else {
180             this.minIndex = minIndex;
181         }
182 
183         int maxIndex = getSizeTableIndex(maximum);
184         if (SIZE_TABLE[maxIndex] > maximum) {
185             this.maxIndex = maxIndex - 1;
186         } else {
187             this.maxIndex = maxIndex;
188         }
189 
190         this.initial = initial;
191     }
192 
193     @SuppressWarnings("deprecation")
194     @Override
195     public Handle newHandle() {
196         return new HandleImpl(minIndex, maxIndex, initial);
197     }
198 
199     @Override
200     public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
201         super.respectMaybeMoreData(respectMaybeMoreData);
202         return this;
203     }
204 }