View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.example.factorial;
17  
18  import io.netty5.channel.ChannelHandlerContext;
19  import io.netty5.channel.SimpleChannelInboundHandler;
20  import io.netty5.util.concurrent.Future;
21  import io.netty5.util.concurrent.FutureListener;
22  
23  import java.math.BigInteger;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.LinkedBlockingQueue;
26  
27  /**
28   * Handler for a client-side channel.  This handler maintains stateful
29   * information which is specific to a certain channel using member variables.
30   * Therefore, an instance of this handler can cover only one channel.  You have
31   * to create a new handler instance whenever you create a new channel and insert
32   * this handler to avoid a race condition.
33   */
34  public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {
35  
36      private ChannelHandlerContext ctx;
37      private int receivedMessages;
38      private int next = 1;
39      final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<>();
40  
41      public BigInteger getFactorial() {
42          boolean interrupted = false;
43          try {
44              for (;;) {
45                  try {
46                      return answer.take();
47                  } catch (InterruptedException ignore) {
48                      interrupted = true;
49                  }
50              }
51          } finally {
52              if (interrupted) {
53                  Thread.currentThread().interrupt();
54              }
55          }
56      }
57  
58      @Override
59      public void channelActive(ChannelHandlerContext ctx) {
60          this.ctx = ctx;
61          sendNumbers();
62      }
63  
64      @Override
65      public void messageReceived(ChannelHandlerContext ctx, final BigInteger msg) {
66          receivedMessages ++;
67          if (receivedMessages == FactorialClient.COUNT) {
68              // Offer the answer after closing the connection.
69              ctx.channel().close().addListener(future -> {
70                  boolean offered = answer.offer(msg);
71                  assert offered;
72              });
73          }
74      }
75  
76      @Override
77      public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
78          cause.printStackTrace();
79          ctx.close();
80      }
81  
82      private void sendNumbers() {
83          // Do not send more than 4096 numbers.
84          Future<Void> future = null;
85          for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
86              future = ctx.write(Integer.valueOf(next));
87              next++;
88          }
89          if (next <= FactorialClient.COUNT) {
90              assert future != null;
91              future.addListener(numberSender);
92          }
93          ctx.flush();
94      }
95  
96      private final FutureListener<Void> numberSender = future -> {
97          if (future.isSuccess()) {
98              sendNumbers();
99          } else {
100             future.cause().printStackTrace();
101             ctx.channel().close();
102         }
103     };
104 }