View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.util.UncheckedBooleanSupplier;
21  
22  /**
23   * Default implementation of {@link MaxMessagesRecvByteBufAllocator} which respects {@link ChannelConfig#isAutoRead()}
24   * and also prevents overflow.
25   */
26  public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
27      private volatile int maxMessagesPerRead;
28      private volatile boolean respectMaybeMoreData = true;
29  
30      public DefaultMaxMessagesRecvByteBufAllocator() {
31          this(1);
32      }
33  
34      public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) {
35          maxMessagesPerRead(maxMessagesPerRead);
36      }
37  
38      @Override
39      public int maxMessagesPerRead() {
40          return maxMessagesPerRead;
41      }
42  
43      @Override
44      public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) {
45          if (maxMessagesPerRead <= 0) {
46              throw new IllegalArgumentException("maxMessagesPerRead: " + maxMessagesPerRead + " (expected: > 0)");
47          }
48          this.maxMessagesPerRead = maxMessagesPerRead;
49          return this;
50      }
51  
52      /**
53       * Determine if future instances of {@link #newHandle()} will stop reading if we think there is no more data.
54       * @param respectMaybeMoreData
55       * <ul>
56       *     <li>{@code true} to stop reading if we think there is no more data. This may save a system call to read from
57       *          the socket, but if data has arrived in a racy fashion we may give up our {@link #maxMessagesPerRead()}
58       *          quantum and have to wait for the selector to notify us of more data.</li>
59       *     <li>{@code false} to keep reading (up to {@link #maxMessagesPerRead()}) or until there is no data when we
60       *          attempt to read.</li>
61       * </ul>
62       * @return {@code this}.
63       */
64      public DefaultMaxMessagesRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
65          this.respectMaybeMoreData = respectMaybeMoreData;
66          return this;
67      }
68  
69      /**
70       * Get if future instances of {@link #newHandle()} will stop reading if we think there is no more data.
71       * @return
72       * <ul>
73       *     <li>{@code true} to stop reading if we think there is no more data. This may save a system call to read from
74       *          the socket, but if data has arrived in a racy fashion we may give up our {@link #maxMessagesPerRead()}
75       *          quantum and have to wait for the selector to notify us of more data.</li>
76       *     <li>{@code false} to keep reading (up to {@link #maxMessagesPerRead()}) or until there is no data when we
77       *          attempt to read.</li>
78       * </ul>
79       */
80      public final boolean respectMaybeMoreData() {
81          return respectMaybeMoreData;
82      }
83  
84      /**
85       * Focuses on enforcing the maximum messages per read condition for {@link #continueReading()}.
86       */
87      public abstract class MaxMessageHandle implements ExtendedHandle {
88          private ChannelConfig config;
89          private int maxMessagePerRead;
90          private int totalMessages;
91          private int totalBytesRead;
92          private int attemptedBytesRead;
93          private int lastBytesRead;
94          private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
95          private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
96              @Override
97              public boolean get() {
98                  return attemptedBytesRead == lastBytesRead;
99              }
100         };
101 
102         /**
103          * Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
104          */
105         @Override
106         public void reset(ChannelConfig config) {
107             this.config = config;
108             maxMessagePerRead = maxMessagesPerRead();
109             totalMessages = totalBytesRead = 0;
110         }
111 
112         @Override
113         public ByteBuf allocate(ByteBufAllocator alloc) {
114             return alloc.ioBuffer(guess());
115         }
116 
117         @Override
118         public final void incMessagesRead(int amt) {
119             totalMessages += amt;
120         }
121 
122         @Override
123         public void lastBytesRead(int bytes) {
124             lastBytesRead = bytes;
125             if (bytes > 0) {
126                 totalBytesRead += bytes;
127             }
128         }
129 
130         @Override
131         public final int lastBytesRead() {
132             return lastBytesRead;
133         }
134 
135         @Override
136         public boolean continueReading() {
137             return continueReading(defaultMaybeMoreSupplier);
138         }
139 
140         @Override
141         public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
142             return config.isAutoRead() &&
143                    (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
144                    totalMessages < maxMessagePerRead &&
145                    totalBytesRead > 0;
146         }
147 
148         @Override
149         public void readComplete() {
150         }
151 
152         @Override
153         public int attemptedBytesRead() {
154             return attemptedBytesRead;
155         }
156 
157         @Override
158         public void attemptedBytesRead(int bytes) {
159             attemptedBytesRead = bytes;
160         }
161 
162         protected final int totalBytesRead() {
163             return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
164         }
165     }
166 }