1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package org.jboss.netty.channel.socket.oio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.DefaultFileRegion;
21  import org.jboss.netty.channel.FileRegion;
22  
23  import java.io.IOException;
24  import java.io.OutputStream;
25  import java.io.PushbackInputStream;
26  import java.net.SocketException;
27  import java.nio.channels.Channels;
28  import java.nio.channels.ClosedChannelException;
29  import java.nio.channels.WritableByteChannel;
30  import java.util.regex.Pattern;
31  
32  import static org.jboss.netty.channel.Channels.*;
33  
34  class OioWorker extends AbstractOioWorker<OioSocketChannel> {
35  
36      private static final Pattern SOCKET_CLOSED_MESSAGE = Pattern.compile(
37              "^.*(?:Socket.*closed).*$", Pattern.CASE_INSENSITIVE);
38  
39      OioWorker(OioSocketChannel channel) {
40          super(channel);
41      }
42  
43      @Override
44      public void run() {
45          boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
46          if (fireConnected && channel.isOpen()) {
47              
48              
49              fireChannelConnected(channel, channel.getRemoteAddress());
50          }
51          super.run();
52      }
53  
54      @Override
55      boolean process() throws IOException {
56          byte[] buf;
57          int readBytes;
58          PushbackInputStream in = channel.getInputStream();
59          int bytesToRead = in.available();
60          if (bytesToRead > 0) {
61              buf = new byte[bytesToRead];
62              readBytes = in.read(buf);
63          } else {
64              int b = in.read();
65              if (b < 0) {
66                  return false;
67              }
68              in.unread(b);
69              return true;
70          }
71          fireMessageReceived(channel, channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes));
72  
73          return true;
74      }
75  
76      static void write(
77              OioSocketChannel channel, ChannelFuture future,
78              Object message) {
79  
80          boolean iothread = isIoThread(channel);
81          OutputStream out = channel.getOutputStream();
82          if (out == null) {
83              Exception e = new ClosedChannelException();
84              future.setFailure(e);
85              if (iothread) {
86                  fireExceptionCaught(channel, e);
87              } else {
88                  fireExceptionCaughtLater(channel, e);
89              }
90              return;
91          }
92  
93          try {
94              int length = 0;
95  
96              
97              
98              if (message instanceof FileRegion) {
99                  FileRegion fr = (FileRegion) message;
100                 try {
101                     synchronized (out) {
102                         WritableByteChannel  bchannel = Channels.newChannel(out);
103 
104                         long i;
105                         while ((i = fr.transferTo(bchannel, length)) > 0) {
106                             length += i;
107                             if (length >= fr.getCount()) {
108                                 break;
109                             }
110                         }
111                     }
112                 } finally {
113                     if (fr instanceof DefaultFileRegion) {
114                         DefaultFileRegion dfr = (DefaultFileRegion) fr;
115                         if (dfr.releaseAfterTransfer()) {
116                             fr.releaseExternalResources();
117                         }
118                     }
119                 }
120             } else {
121                 ChannelBuffer a = (ChannelBuffer) message;
122                 length = a.readableBytes();
123                 synchronized (out) {
124                     a.getBytes(a.readerIndex(), out, length);
125                 }
126             }
127 
128             future.setSuccess();
129             if (iothread) {
130                 fireWriteComplete(channel, length);
131             } else {
132                 fireWriteCompleteLater(channel, length);
133             }
134 
135         } catch (Throwable t) {
136             
137             
138             if (t instanceof SocketException &&
139                     SOCKET_CLOSED_MESSAGE.matcher(
140                             String.valueOf(t.getMessage())).matches()) {
141                 t = new ClosedChannelException();
142             }
143             future.setFailure(t);
144             if (iothread) {
145                 fireExceptionCaught(channel, t);
146             } else {
147                 fireExceptionCaughtLater(channel, t);
148             }
149         }
150     }
151 }