1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.example.local;
17
18 import java.util.concurrent.TimeUnit;
19
20 import org.jboss.netty.bootstrap.ClientBootstrap;
21 import org.jboss.netty.bootstrap.ServerBootstrap;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.ChannelPipeline;
24 import org.jboss.netty.channel.ChannelPipelineFactory;
25 import org.jboss.netty.channel.Channels;
26 import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
27 import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
28 import org.jboss.netty.channel.local.LocalAddress;
29 import org.jboss.netty.handler.codec.string.StringDecoder;
30 import org.jboss.netty.handler.codec.string.StringEncoder;
31 import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
32 import org.jboss.netty.handler.logging.LoggingHandler;
33 import org.jboss.netty.logging.InternalLogLevel;
34
35 public class LocalExampleMultithreaded {
36
37 private final String port;
38
39 public LocalExampleMultithreaded(String port) {
40 this.port = port;
41 }
42
43 public void run() {
44 LocalAddress socketAddress = new LocalAddress(port);
45
46 OrderedMemoryAwareThreadPoolExecutor eventExecutor =
47 new OrderedMemoryAwareThreadPoolExecutor(
48 5, 1000000, 10000000, 100,
49 TimeUnit.MILLISECONDS);
50
51 ServerBootstrap sb = new ServerBootstrap(
52 new DefaultLocalServerChannelFactory());
53
54 sb.setPipelineFactory(new LocalServerPipelineFactory(eventExecutor));
55 sb.bind(socketAddress);
56
57 ClientBootstrap cb = new ClientBootstrap(
58 new DefaultLocalClientChannelFactory());
59
60 cb.setPipelineFactory(new ChannelPipelineFactory() {
61 public ChannelPipeline getPipeline() throws Exception {
62 return Channels.pipeline(
63 new StringDecoder(),
64 new StringEncoder(),
65 new LoggingHandler(InternalLogLevel.INFO));
66 }
67 });
68
69
70 String[] commands = { "First", "Second", "Third", "quit" };
71 for (int j = 0; j < 5 ; j++) {
72 System.err.println("Start " + j);
73 ChannelFuture channelFuture = cb.connect(socketAddress);
74 channelFuture.awaitUninterruptibly();
75 if (! channelFuture.isSuccess()) {
76 System.err.println("CANNOT CONNECT");
77 channelFuture.getCause().printStackTrace();
78 break;
79 }
80 ChannelFuture lastWriteFuture = null;
81 for (String line: commands) {
82
83 lastWriteFuture = channelFuture.getChannel().write(line);
84 }
85
86
87 if (lastWriteFuture != null) {
88 lastWriteFuture.awaitUninterruptibly();
89 }
90 channelFuture.getChannel().close();
91
92 channelFuture.getChannel().getCloseFuture().awaitUninterruptibly();
93 System.err.println("End " + j);
94 }
95
96
97 cb.releaseExternalResources();
98 sb.releaseExternalResources();
99 eventExecutor.shutdownNow();
100 }
101
102 public static void main(String[] args) throws Exception {
103 new LocalExampleMultithreaded("1").run();
104 }
105 }