View Javadoc
1   /*
2    * Copyright 2011 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.traffic;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.channel.Channel;
20  import io.netty5.channel.ChannelHandler;
21  import io.netty5.channel.ChannelHandlerContext;
22  import io.netty5.channel.ChannelOption;
23  import io.netty5.channel.FileRegion;
24  import io.netty5.util.Attribute;
25  import io.netty5.util.AttributeKey;
26  import io.netty5.util.concurrent.Future;
27  import io.netty5.util.concurrent.Promise;
28  import io.netty5.util.internal.logging.InternalLogger;
29  import io.netty5.util.internal.logging.InternalLoggerFactory;
30  
31  import java.util.concurrent.TimeUnit;
32  
33  import static io.netty5.util.internal.ObjectUtil.checkPositive;
34  
35  /**
36   * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
37   * (see {@link GlobalTrafficShapingHandler}) or per session
38   * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
39   * It allows you to implement an almost real time monitoring of the bandwidth using
40   * the monitors from {@link TrafficCounter} that will call back every checkInterval
41   * the method doAccounting of this handler.</p>
42   *
43   * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
44   * the read/write limit or the check interval, several methods allow that for you:</p>
45   * <ul>
46   * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
47   * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
48   * or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
49   * </ul>
50   */
51  public abstract class AbstractTrafficShapingHandler implements ChannelHandler {
52      private static final InternalLogger logger =
53              InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
54      /**
55       * Default delay between two checks: 1s
56       */
57      public static final long DEFAULT_CHECK_INTERVAL = 1000;
58  
59     /**
60      * Default max delay in case of traffic shaping
61      * (during which no communication will occur).
62      * Shall be less than TIMEOUT. Here half of "standard" 30s
63      */
64      public static final long DEFAULT_MAX_TIME = 15000;
65  
66      /**
67       * Default max size to not exceed in buffer (write only).
68       */
69      static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
70  
71      /**
72       * Default minimal time to wait: 10ms
73       */
74      static final long MINIMAL_WAIT = 10;
75  
76      /**
77       * Traffic Counter
78       */
79      protected TrafficCounter trafficCounter;
80  
81      /**
82       * Limit in B/s to apply to write
83       */
84      private volatile long writeLimit;
85  
86      /**
87       * Limit in B/s to apply to read
88       */
89      private volatile long readLimit;
90  
91      /**
92       * Max delay in wait
93       */
94      protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
95  
96      /**
97       * Delay between two performance snapshots
98       */
99      protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
100 
101     static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
102             .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
103     static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
104             .getName() + ".REOPEN_TASK");
105 
106     /**
107      * Max time to delay before proposing to stop writing new objects from next handlers
108      */
109     volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
110     /**
111      * Max size in the list before proposing to stop writing new objects from next handlers
112      */
113     volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
114 
115     /**
116      * Rank in UserDefinedWritability (1 for Channel, 2 for Global TrafficShapingHandler).
117      * Set in final constructor. Must be between 1 and 31
118      */
119     final int userDefinedWritabilityIndex;
120 
121     /**
122      * Default value for Channel UserDefinedWritability index
123      */
124     static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
125 
126     /**
127      * Default value for Global UserDefinedWritability index
128      */
129     static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
130 
131     /**
132      * Default value for GlobalChannel UserDefinedWritability index
133      */
134     static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
135 
136     /**
137      * @param newTrafficCounter
138      *            the TrafficCounter to set
139      */
140     void setTrafficCounter(TrafficCounter newTrafficCounter) {
141         trafficCounter = newTrafficCounter;
142     }
143 
144     /**
145      * @return the index to be used by the TrafficShapingHandler to manage the user defined writability.
146      *              For Channel TSH it is defined as {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
147      *              for Global TSH it is defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
148      *              for GlobalChannel TSH it is defined as
149      *              {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
150      */
151     protected int userDefinedWritabilityIndex() {
152         return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
153     }
154 
155     /**
156      * @param writeLimit
157      *          0 or a limit in bytes/s
158      * @param readLimit
159      *          0 or a limit in bytes/s
160      * @param checkInterval
161      *            The delay between two computations of performances for
162      *            channels or 0 if no stats are to be computed.
163      * @param maxTime
164      *            The maximum delay to wait in case of traffic excess.
165      *            Must be positive.
166      */
167     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
168         this.maxTime = checkPositive(maxTime, "maxTime");
169 
170         userDefinedWritabilityIndex = userDefinedWritabilityIndex();
171         this.writeLimit = writeLimit;
172         this.readLimit = readLimit;
173         this.checkInterval = checkInterval;
174     }
175 
176     /**
177      * Constructor using default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
178      * @param writeLimit
179      *            0 or a limit in bytes/s
180      * @param readLimit
181      *            0 or a limit in bytes/s
182      * @param checkInterval
183      *            The delay between two computations of performances for
184      *            channels or 0 if no stats are to be computed.
185      */
186     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
187         this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
188     }
189 
190     /**
191      * Constructor using default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
192      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
193      *
194      * @param writeLimit
195      *          0 or a limit in bytes/s
196      * @param readLimit
197      *          0 or a limit in bytes/s
198      */
199     protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
200         this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
201     }
202 
203     /**
204      * Constructor using NO LIMIT, default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
205      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
206      */
207     protected AbstractTrafficShapingHandler() {
208         this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
209     }
210 
211     /**
212      * Constructor using NO LIMIT and
213      * default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
214      *
215      * @param checkInterval
216      *            The delay between two computations of performances for
217      *            channels or 0 if no stats are to be computed.
218      */
219     protected AbstractTrafficShapingHandler(long checkInterval) {
220         this(0, 0, checkInterval, DEFAULT_MAX_TIME);
221     }
222 
223     /**
224      * Change the underlying limitations and check interval.
225      * <p>Note the change will be taken as best effort, meaning
226      * that all already scheduled traffics will not be
227      * changed, but only applied to new traffics.</p>
228      * <p>So the expected usage of this method is to be used not too often,
229      * accordingly to the traffic shaping configuration.</p>
230      *
231      * @param newWriteLimit The new write limit (in bytes)
232      * @param newReadLimit The new read limit (in bytes)
233      * @param newCheckInterval The new check interval (in milliseconds)
234      */
235     public void configure(long newWriteLimit, long newReadLimit,
236             long newCheckInterval) {
237         configure(newWriteLimit, newReadLimit);
238         configure(newCheckInterval);
239     }
240 
241     /**
242      * Change the underlying limitations.
243      * <p>Note the change will be taken as best effort, meaning
244      * that all already scheduled traffics will not be
245      * changed, but only applied to new traffics.</p>
246      * <p>So the expected usage of this method is to be used not too often,
247      * accordingly to the traffic shaping configuration.</p>
248      *
249      * @param newWriteLimit The new write limit (in bytes)
250      * @param newReadLimit The new read limit (in bytes)
251      */
252     public void configure(long newWriteLimit, long newReadLimit) {
253         writeLimit = newWriteLimit;
254         readLimit = newReadLimit;
255         if (trafficCounter != null) {
256             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
257         }
258     }
259 
260     /**
261      * Change the check interval.
262      *
263      * @param newCheckInterval The new check interval (in milliseconds)
264      */
265     public void configure(long newCheckInterval) {
266         checkInterval = newCheckInterval;
267         if (trafficCounter != null) {
268             trafficCounter.configure(checkInterval);
269         }
270     }
271 
272     /**
273      * @return the writeLimit
274      */
275     public long getWriteLimit() {
276         return writeLimit;
277     }
278 
279     /**
280      * <p>Note the change will be taken as best effort, meaning
281      * that all already scheduled traffics will not be
282      * changed, but only applied to new traffics.</p>
283      * <p>So the expected usage of this method is to be used not too often,
284      * accordingly to the traffic shaping configuration.</p>
285      *
286      * @param writeLimit the writeLimit to set
287      */
288     public void setWriteLimit(long writeLimit) {
289         this.writeLimit = writeLimit;
290         if (trafficCounter != null) {
291             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
292         }
293     }
294 
295     /**
296      * @return the readLimit
297      */
298     public long getReadLimit() {
299         return readLimit;
300     }
301 
302     /**
303      * <p>Note the change will be taken as best effort, meaning
304      * that all already scheduled traffics will not be
305      * changed, but only applied to new traffics.</p>
306      * <p>So the expected usage of this method is to be used not too often,
307      * accordingly to the traffic shaping configuration.</p>
308      *
309      * @param readLimit the readLimit to set
310      */
311     public void setReadLimit(long readLimit) {
312         this.readLimit = readLimit;
313         if (trafficCounter != null) {
314             trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
315         }
316     }
317 
318     /**
319      * @return the checkInterval
320      */
321     public long getCheckInterval() {
322         return checkInterval;
323     }
324 
325     /**
326      * @param checkInterval the interval in ms between each step check to set, default value being 1000 ms.
327      */
328     public void setCheckInterval(long checkInterval) {
329         this.checkInterval = checkInterval;
330         if (trafficCounter != null) {
331             trafficCounter.configure(checkInterval);
332         }
333     }
334 
335     /**
336      * <p>Note the change will be taken as best effort, meaning
337      * that all already scheduled traffics will not be
338      * changed, but only applied to new traffics.</p>
339      * <p>So the expected usage of this method is to be used not too often,
340      * accordingly to the traffic shaping configuration.</p>
341      *
342      * @param maxTime
343      *            Max delay in wait, shall be less than TIME OUT in related protocol.
344      *            Must be positive.
345      */
346     public void setMaxTimeWait(long maxTime) {
347         this.maxTime = checkPositive(maxTime, "maxTime");
348     }
349 
350     /**
351      * @return the max delay in wait to prevent TIME OUT
352      */
353     public long getMaxTimeWait() {
354         return maxTime;
355     }
356 
357     /**
358      * @return the maxWriteDelay
359      */
360     public long getMaxWriteDelay() {
361         return maxWriteDelay;
362     }
363 
364     /**
365      * <p>Note the change will be taken as best effort, meaning
366      * that all already scheduled traffics will not be
367      * changed, but only applied to new traffics.</p>
368      * <p>So the expected usage of this method is to be used not too often,
369      * accordingly to the traffic shaping configuration.</p>
370      *
371      * @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspension is set.
372      *              Must be positive.
373      */
374     public void setMaxWriteDelay(long maxWriteDelay) {
375         this.maxWriteDelay = checkPositive(maxWriteDelay, "maxWriteDelay");
376     }
377 
378     /**
379      * @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes.
380      */
381     public long getMaxWriteSize() {
382         return maxWriteSize;
383     }
384 
385     /**
386      * <p>Note that this limit is a best effort on memory limitation to prevent Out Of
387      * Memory Exception. To ensure it works, the handler generating the write should
388      * use one of the way provided by Netty to handle the capacity:</p>
389      * <p>- the {@code Channel.isWritable()} property and the corresponding
390      * {@code channelWritabilityChanged()}</p>
391      * <p>- the {@code Future.addListener(future -> ...)}</p>
392      *
393      * @param maxWriteSize the maximum Write Size allowed in the buffer
394      *            per channel before write suspended is set,
395      *            default being {@value #DEFAULT_MAX_SIZE} bytes.
396      */
397     public void setMaxWriteSize(long maxWriteSize) {
398         this.maxWriteSize = maxWriteSize;
399     }
400 
401     /**
402      * Called each time the accounting is computed from the TrafficCounters.
403      * This method could be used for instance to implement almost real time accounting.
404      *
405      * @param counter
406      *            the TrafficCounter that computes its performance
407      */
408     protected void doAccounting(TrafficCounter counter) {
409         // NOOP by default
410     }
411 
412     /**
413      * Class to implement setReadable at fix time
414      */
415     static final class ReopenReadTimerTask implements Runnable {
416         final ChannelHandlerContext ctx;
417         ReopenReadTimerTask(ChannelHandlerContext ctx) {
418             this.ctx = ctx;
419         }
420 
421         @Override
422         public void run() {
423             Channel channel = ctx.channel();
424             if (!channel.getOption(ChannelOption.AUTO_READ) && isHandlerActive(ctx)) {
425                 // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
426                 // Then Just reset the status
427                 if (logger.isDebugEnabled()) {
428                     logger.debug("Not unsuspend: " + channel.getOption(ChannelOption.AUTO_READ) + ':' +
429                             isHandlerActive(ctx));
430                 }
431                 channel.attr(READ_SUSPENDED).set(false);
432             } else {
433                 // Anything else allows the handler to reset the AutoRead
434                 if (logger.isDebugEnabled()) {
435                     if (channel.getOption(ChannelOption.AUTO_READ) && !isHandlerActive(ctx)) {
436                         if (logger.isDebugEnabled()) {
437                             logger.debug("Unsuspend: " + channel.getOption(ChannelOption.AUTO_READ) + ':' +
438                                     isHandlerActive(ctx));
439                         }
440                     } else {
441                         if (logger.isDebugEnabled()) {
442                             logger.debug("Normal unsuspend: " + channel.getOption(ChannelOption.AUTO_READ) + ':'
443                                     + isHandlerActive(ctx));
444                         }
445                     }
446                 }
447                 channel.attr(READ_SUSPENDED).set(false);
448                 channel.setOption(ChannelOption.AUTO_READ, true);
449                 channel.read();
450             }
451             if (logger.isDebugEnabled()) {
452                 logger.debug("Unsuspend final status => " + channel.getOption(ChannelOption.AUTO_READ) + ':'
453                         + isHandlerActive(ctx));
454             }
455         }
456     }
457 
458     /**
459      * Release the Read suspension
460      */
461     void releaseReadSuspended(ChannelHandlerContext ctx) {
462         Channel channel = ctx.channel();
463         channel.attr(READ_SUSPENDED).set(false);
464         channel.setOption(ChannelOption.AUTO_READ, true);
465     }
466 
467     @Override
468     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
469         long size = calculateSize(msg);
470         long now = TrafficCounter.milliSecondFromNano();
471         if (size > 0) {
472             // compute the number of ms to wait before reopening the channel
473             long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
474             wait = checkWaitReadTime(ctx, wait, now);
475             if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
476                 // time in order to try to limit the traffic
477                 // Only AutoRead AND HandlerActive True means Context Active
478                 Channel channel = ctx.channel();
479                 if (logger.isDebugEnabled()) {
480                     logger.debug("Read suspend: " + wait + ':' + channel.getOption(ChannelOption.AUTO_READ) + ':'
481                             + isHandlerActive(ctx));
482                 }
483                 if (channel.getOption(ChannelOption.AUTO_READ) && isHandlerActive(ctx)) {
484                     channel.setOption(ChannelOption.AUTO_READ, false);
485                     channel.attr(READ_SUSPENDED).set(true);
486                     // Create a Runnable to reactive the read if needed. If one was create before it will just be
487                     // reused to limit object creation
488                     Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
489                     Runnable reopenTask = attr.get();
490                     if (reopenTask == null) {
491                         reopenTask = new ReopenReadTimerTask(ctx);
492                         attr.set(reopenTask);
493                     }
494                     ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
495                     if (logger.isDebugEnabled()) {
496                         logger.debug("Suspend final status => " + channel.getOption(ChannelOption.AUTO_READ) + ':'
497                                 + isHandlerActive(ctx) + " will reopened at: " + wait);
498                     }
499                 }
500             }
501         }
502         informReadOperation(ctx, now);
503         ctx.fireChannelRead(msg);
504     }
505 
506     @Override
507     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
508         Channel channel = ctx.channel();
509         if (channel.hasAttr(REOPEN_TASK)) {
510             //release the reopen task
511             channel.attr(REOPEN_TASK).set(null);
512         }
513     }
514 
515     /**
516      * Method overridden in GTSH to take into account specific timer for the channel.
517      * @param wait the wait delay computed in ms
518      * @param now the relative now time in ms
519      * @return the wait to use according to the context
520      */
521     long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
522         // no change by default
523         return wait;
524     }
525 
526     /**
527      * Method overridden in GTSH to take into account specific timer for the channel.
528      * @param now the relative now time in ms
529      */
530     void informReadOperation(final ChannelHandlerContext ctx, final long now) {
531         // default noop
532     }
533 
534     protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
535         Boolean suspended = ctx.channel().attr(READ_SUSPENDED).get();
536         return suspended == null || Boolean.FALSE.equals(suspended);
537     }
538 
539     @Override
540     public void read(ChannelHandlerContext ctx) {
541         if (isHandlerActive(ctx)) {
542             // For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False
543             ctx.read();
544         }
545     }
546 
547     @Override
548     public Future<Void> write(final ChannelHandlerContext ctx, final Object msg) {
549         long size = calculateSize(msg);
550         long now = TrafficCounter.milliSecondFromNano();
551         Promise<Void> promise = ctx.newPromise();
552         if (size > 0) {
553             // compute the number of ms to wait before continue with the channel
554             long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
555             if (wait >= MINIMAL_WAIT) {
556                 if (logger.isDebugEnabled()) {
557                     logger.debug("Write suspend: " + wait + ':'
558                             + ctx.channel().getOption(ChannelOption.AUTO_READ) + ':'
559                             + isHandlerActive(ctx));
560                 }
561                 submitWrite(ctx, msg, size, wait, now, promise);
562                 return promise.asFuture();
563             }
564         }
565         // to maintain order of write
566         submitWrite(ctx, msg, size, 0, now, promise);
567         return promise.asFuture();
568     }
569 
570     @Deprecated
571     protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
572             final long delay, final Promise<Void> promise) {
573         submitWrite(ctx, msg, calculateSize(msg),
574                 delay, TrafficCounter.milliSecondFromNano(), promise);
575     }
576 
577     abstract void submitWrite(
578             ChannelHandlerContext ctx, Object msg, long size, long delay, long now, Promise<Void> promise);
579 
580     @Override
581     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
582         setUserDefinedWritability(ctx, true);
583         ctx.fireChannelRegistered();
584     }
585 
586     // TODO: Fix me later!
587     void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
588         /**
589         ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
590         if (cob != null) {
591             cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
592         }
593          **/
594     }
595 
596     /**
597      * Check the writability according to delay and size for the channel.
598      * Set if necessary setUserDefinedWritability status.
599      * @param delay the computed delay
600      * @param queueSize the current queueSize
601      */
602     void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
603         if (queueSize > maxWriteSize || delay > maxWriteDelay) {
604             setUserDefinedWritability(ctx, false);
605         }
606     }
607     /**
608      * Explicitly release the Write suspended status.
609      */
610     void releaseWriteSuspended(ChannelHandlerContext ctx) {
611         setUserDefinedWritability(ctx, true);
612     }
613 
614     /**
615      * @return the current TrafficCounter (if
616      *         channel is still connected)
617      */
618     public TrafficCounter trafficCounter() {
619         return trafficCounter;
620     }
621 
622     @Override
623     public String toString() {
624         StringBuilder builder = new StringBuilder(290)
625             .append("TrafficShaping with Write Limit: ").append(writeLimit)
626             .append(" Read Limit: ").append(readLimit)
627             .append(" CheckInterval: ").append(checkInterval)
628             .append(" maxDelay: ").append(maxWriteDelay)
629             .append(" maxSize: ").append(maxWriteSize)
630             .append(" and Counter: ");
631         if (trafficCounter != null) {
632             builder.append(trafficCounter);
633         } else {
634             builder.append("none");
635         }
636         return builder.toString();
637     }
638 
639     /**
640      * Calculate the size of the given {@link Object}.
641      *
642      * This implementation supports {@link Buffer} and {@link FileRegion}.
643      * Sub-classes may override this.
644      * @param msg the msg for which the size should be calculated.
645      * @return size the size of the msg or {@code -1} if unknown.
646      */
647     protected long calculateSize(Object msg) {
648         // TODO we should have a Sized interface to generalise all of this.
649         if (msg instanceof Buffer) {
650             return ((Buffer) msg).readableBytes();
651         }
652         if (msg instanceof FileRegion) {
653             return ((FileRegion) msg).count();
654         }
655         return -1;
656     }
657 }