1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.example.redis;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelInitializer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.channel.MultiThreadIoEventLoopGroup;
25 import io.netty.channel.nio.NioIoHandler;
26 import io.netty.channel.socket.SocketChannel;
27 import io.netty.channel.socket.nio.NioSocketChannel;
28 import io.netty.handler.codec.redis.RedisArrayAggregator;
29 import io.netty.handler.codec.redis.RedisBulkStringAggregator;
30 import io.netty.handler.codec.redis.RedisDecoder;
31 import io.netty.handler.codec.redis.RedisEncoder;
32 import io.netty.util.concurrent.GenericFutureListener;
33
34 import java.io.BufferedReader;
35 import java.io.InputStreamReader;
36
37
38
39
40 public class RedisClient {
41 private static final String HOST = System.getProperty("host", "127.0.0.1");
42 private static final int PORT = Integer.parseInt(System.getProperty("port", "6379"));
43
44 public static void main(String[] args) throws Exception {
45 EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
46 try {
47 Bootstrap b = new Bootstrap();
48 b.group(group)
49 .channel(NioSocketChannel.class)
50 .handler(new ChannelInitializer<SocketChannel>() {
51 @Override
52 protected void initChannel(SocketChannel ch) throws Exception {
53 ChannelPipeline p = ch.pipeline();
54 p.addLast(new RedisDecoder());
55 p.addLast(new RedisBulkStringAggregator());
56 p.addLast(new RedisArrayAggregator());
57 p.addLast(new RedisEncoder());
58 p.addLast(new RedisClientHandler());
59 }
60 });
61
62
63 Channel ch = b.connect(HOST, PORT).sync().channel();
64
65
66 System.out.println("Enter Redis commands (quit to end)");
67 ChannelFuture lastWriteFuture = null;
68 BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
69 for (;;) {
70 final String input = in.readLine();
71 final String line = input != null ? input.trim() : null;
72 if (line == null || "quit".equalsIgnoreCase(line)) {
73 ch.close().sync();
74 break;
75 } else if (line.isEmpty()) {
76 continue;
77 }
78
79 lastWriteFuture = ch.writeAndFlush(line);
80 lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {
81 @Override
82 public void operationComplete(ChannelFuture future) throws Exception {
83 if (!future.isSuccess()) {
84 System.err.print("write failed: ");
85 future.cause().printStackTrace(System.err);
86 }
87 }
88 });
89 }
90
91
92 if (lastWriteFuture != null) {
93 lastWriteFuture.sync();
94 }
95 } finally {
96 group.shutdownGracefully();
97 }
98 }
99 }