1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.io.OutputStream;
22 import java.io.PushbackInputStream;
23 import java.net.SocketException;
24 import java.nio.channels.Channels;
25 import java.nio.channels.ClosedChannelException;
26 import java.nio.channels.WritableByteChannel;
27 import java.util.regex.Pattern;
28
29 import org.jboss.netty.buffer.ChannelBuffer;
30 import org.jboss.netty.channel.ChannelFuture;
31 import org.jboss.netty.channel.DefaultFileRegion;
32 import org.jboss.netty.channel.FileRegion;
33
34 class OioWorker extends AbstractOioWorker<OioSocketChannel> {
35
36 private static final Pattern SOCKET_CLOSED_MESSAGE = Pattern.compile(
37 "^.*(?:Socket.*closed).*$", Pattern.CASE_INSENSITIVE);
38
39 OioWorker(OioSocketChannel channel) {
40 super(channel);
41 }
42
43 @Override
44 public void run() {
45 boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
46 if (fireConnected && channel.isOpen()) {
47
48
49 fireChannelConnected(channel, channel.getRemoteAddress());
50
51 }
52 super.run();
53 }
54
55 @Override
56 boolean process() throws IOException {
57 byte[] buf;
58 int readBytes;
59 PushbackInputStream in = channel.getInputStream();
60 int bytesToRead = in.available();
61 if (bytesToRead > 0) {
62 buf = new byte[bytesToRead];
63 readBytes = in.read(buf);
64 } else {
65 int b = in.read();
66 if (b < 0) {
67 return false;
68 }
69 in.unread(b);
70 return true;
71 }
72 fireMessageReceived(channel, channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes));
73
74 return true;
75 }
76
77 static void write(
78 OioSocketChannel channel, ChannelFuture future,
79 Object message) {
80
81 boolean iothread = isIoThread(channel);
82 OutputStream out = channel.getOutputStream();
83 if (out == null) {
84 Exception e = new ClosedChannelException();
85 future.setFailure(e);
86 if (iothread) {
87 fireExceptionCaught(channel, e);
88 } else {
89 fireExceptionCaughtLater(channel, e);
90 }
91 return;
92 }
93
94 try {
95 int length = 0;
96
97
98
99 if (message instanceof FileRegion) {
100 FileRegion fr = (FileRegion) message;
101 try {
102 synchronized (out) {
103 WritableByteChannel bchannel = Channels.newChannel(out);
104
105 long i = 0;
106 while ((i = fr.transferTo(bchannel, length)) > 0) {
107 length += i;
108 if (length >= fr.getCount()) {
109 break;
110 }
111 }
112 }
113 } finally {
114 if (fr instanceof DefaultFileRegion) {
115 DefaultFileRegion dfr = (DefaultFileRegion) fr;
116 if (dfr.releaseAfterTransfer()) {
117 fr.releaseExternalResources();
118 }
119 }
120
121 }
122 } else {
123 ChannelBuffer a = (ChannelBuffer) message;
124 length = a.readableBytes();
125 synchronized (out) {
126 a.getBytes(a.readerIndex(), out, length);
127 }
128 }
129
130 future.setSuccess();
131 if (iothread) {
132 fireWriteComplete(channel, length);
133 } else {
134 fireWriteCompleteLater(channel, length);
135 }
136
137 } catch (Throwable t) {
138
139
140 if (t instanceof SocketException &&
141 SOCKET_CLOSED_MESSAGE.matcher(
142 String.valueOf(t.getMessage())).matches()) {
143 t = new ClosedChannelException();
144 }
145 future.setFailure(t);
146 if (iothread) {
147 fireExceptionCaught(channel, t);
148 } else {
149 fireExceptionCaughtLater(channel, t);
150 }
151 }
152 }
153
154
155 }