1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.oio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.FileRegion;
21
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.nio.channels.Channels;
27 import java.nio.channels.ClosedChannelException;
28 import java.nio.channels.NotYetConnectedException;
29 import java.nio.channels.WritableByteChannel;
30
31
32
33
34 public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
35
36 private static final InputStream CLOSED_IN = new InputStream() {
37 @Override
38 public int read() {
39 return -1;
40 }
41 };
42
43 private static final OutputStream CLOSED_OUT = new OutputStream() {
44 @Override
45 public void write(int b) throws IOException {
46 throw new ClosedChannelException();
47 }
48 };
49
50 private InputStream is;
51 private OutputStream os;
52 private WritableByteChannel outChannel;
53
54
55
56
57
58
59
60 protected OioByteStreamChannel(Channel parent) {
61 super(parent);
62 }
63
64
65
66
67 protected final void activate(InputStream is, OutputStream os) {
68 if (this.is != null) {
69 throw new IllegalStateException("input was set already");
70 }
71 if (this.os != null) {
72 throw new IllegalStateException("output was set already");
73 }
74 if (is == null) {
75 throw new NullPointerException("is");
76 }
77 if (os == null) {
78 throw new NullPointerException("os");
79 }
80 this.is = is;
81 this.os = os;
82 }
83
84 @Override
85 public boolean isActive() {
86 InputStream is = this.is;
87 if (is == null || is == CLOSED_IN) {
88 return false;
89 }
90
91 OutputStream os = this.os;
92 return !(os == null || os == CLOSED_OUT);
93 }
94
95 @Override
96 protected int available() {
97 try {
98 return is.available();
99 } catch (IOException ignored) {
100 return 0;
101 }
102 }
103
104 @Override
105 protected int doReadBytes(ByteBuf buf) throws Exception {
106 int length = Math.max(1, Math.min(available(), buf.maxWritableBytes()));
107 return buf.writeBytes(is, length);
108 }
109
110 @Override
111 protected void doWriteBytes(ByteBuf buf) throws Exception {
112 OutputStream os = this.os;
113 if (os == null) {
114 throw new NotYetConnectedException();
115 }
116 buf.readBytes(os, buf.readableBytes());
117 }
118
119 @Override
120 protected void doWriteFileRegion(FileRegion region) throws Exception {
121 OutputStream os = this.os;
122 if (os == null) {
123 throw new NotYetConnectedException();
124 }
125 if (outChannel == null) {
126 outChannel = Channels.newChannel(os);
127 }
128
129 long written = 0;
130 for (;;) {
131 long localWritten = region.transferTo(outChannel, written);
132 if (localWritten == -1) {
133 checkEOF(region);
134 return;
135 }
136 written += localWritten;
137
138 if (written >= region.count()) {
139 return;
140 }
141 }
142 }
143
144 private static void checkEOF(FileRegion region) throws IOException {
145 if (region.transfered() < region.count()) {
146 throw new EOFException("Expected to be able to write " + region.count() + " bytes, " +
147 "but only wrote " + region.transfered());
148 }
149 }
150
151 @Override
152 protected void doClose() throws Exception {
153 InputStream is = this.is;
154 OutputStream os = this.os;
155 this.is = CLOSED_IN;
156 this.os = CLOSED_OUT;
157
158 try {
159 if (is != null) {
160 is.close();
161 }
162 } finally {
163 if (os != null) {
164 os.close();
165 }
166 }
167 }
168 }