1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.serialization;
17
18 import java.io.ObjectInputStream;
19 import java.io.ObjectOutputStream;
20 import java.io.OutputStream;
21 import java.util.concurrent.atomic.AtomicReference;
22
23 import org.jboss.netty.buffer.ChannelBuffer;
24 import org.jboss.netty.buffer.ChannelBufferFactory;
25 import org.jboss.netty.buffer.ChannelBufferOutputStream;
26 import org.jboss.netty.buffer.ChannelBuffers;
27 import org.jboss.netty.channel.Channel;
28 import org.jboss.netty.channel.ChannelHandlerContext;
29 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
30
31
32
33
34
35
36
37
38 public class CompatibleObjectEncoder extends OneToOneEncoder {
39
40 private final AtomicReference<ChannelBuffer> buffer =
41 new AtomicReference<ChannelBuffer>();
42 private final int resetInterval;
43 private volatile ObjectOutputStream oout;
44 private int writtenObjects;
45
46
47
48
49 public CompatibleObjectEncoder() {
50 this(16);
51 }
52
53
54
55
56
57
58
59
60
61
62 public CompatibleObjectEncoder(int resetInterval) {
63 if (resetInterval < 0) {
64 throw new IllegalArgumentException(
65 "resetInterval: " + resetInterval);
66 }
67 this.resetInterval = resetInterval;
68 }
69
70
71
72
73
74
75 protected ObjectOutputStream newObjectOutputStream(OutputStream out) throws Exception {
76 return new ObjectOutputStream(out);
77 }
78
79 @Override
80 protected Object encode(ChannelHandlerContext context, Channel channel, Object msg) throws Exception {
81 ChannelBuffer buffer = buffer(context);
82 ObjectOutputStream oout = this.oout;
83 if (resetInterval != 0) {
84
85 writtenObjects ++;
86 if (writtenObjects % resetInterval == 0) {
87 oout.reset();
88
89
90 buffer.discardReadBytes();
91 }
92 }
93 oout.writeObject(msg);
94 oout.flush();
95
96 ChannelBuffer encoded = buffer.readBytes(buffer.readableBytes());
97 return encoded;
98 }
99
100 private ChannelBuffer buffer(ChannelHandlerContext ctx) throws Exception {
101 ChannelBuffer buf = buffer.get();
102 if (buf == null) {
103 ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
104 buf = ChannelBuffers.dynamicBuffer(factory);
105 if (buffer.compareAndSet(null, buf)) {
106 boolean success = false;
107 try {
108 oout = newObjectOutputStream(new ChannelBufferOutputStream(buf));
109 success = true;
110 } finally {
111 if (!success) {
112 oout = null;
113 }
114 }
115 } else {
116 buf = buffer.get();
117 }
118 }
119 return buf;
120 }
121 }