1 package org.jgroups.tests; 2 3 import org.jgroups.Message; 4 import org.jgroups.conf.ClassConfigurator; 5 import org.jgroups.protocols.*; 6 import org.jgroups.stack.IpAddress; 7 import org.jgroups.util.List; 8 import org.jgroups.util.Buffer; 9 import org.jgroups.util.ExposedByteArrayOutputStream; 10 11 import java.io.*; 12 import java.util.Enumeration ; 13 import java.util.LinkedList ; 14 15 19 public class MessageSerializationTest2 { 20 Message msg; 21 Buffer buf; 22 long start, stop, total; 23 double msgs_per_sec, time_per_msg; 24 List my_list=new List(); 25 int num=50000; 26 ObjectOutputStream out; 27 ExposedByteArrayOutputStream output; 28 ByteArrayInputStream input; 29 ObjectInputStream in; 30 DataOutputStream dos; 31 DataInputStream dis; 32 int msgs_read=0; 33 List l2=new List(); 34 35 36 37 public void start(int num, boolean use_serialization, boolean use_streamable, 38 boolean use_additional_data, boolean add_headers) throws Exception { 39 IpAddress dest=new IpAddress("228.8.8.8", 7500); 40 IpAddress SRC=new IpAddress("127.0.0.1", 5555); 41 if(use_additional_data) 42 src.setAdditionalData("bela".getBytes()); 43 44 ClassConfigurator.getInstance(true); 45 46 this.num=num; 47 System.out.println("-- starting to create " + num + " msgs"); 48 start=System.currentTimeMillis(); 49 for(int i=1; i <= num; i++) { 50 msg=new Message(dest, src, ("Hello world from message #" +i).getBytes()); 51 if(add_headers) { 52 addHeaders(msg); 53 } 54 my_list.add(msg); 55 } 56 stop=System.currentTimeMillis(); 57 total=stop-start; 58 msgs_per_sec=num / (total/1000.0); 59 time_per_msg=total / (double)num; 60 System.out.println("\n-- total time for creating " + num + 61 " msgs = " + total + "ms \n(" + msgs_per_sec + " msgs/sec, time_per_msg=" + time_per_msg + " ms)"); 62 63 LinkedList l_ser=null, l_stream=null; 64 65 if(use_streamable) 66 l_stream=marshalMessages(); 67 68 if(use_serialization) 69 l_ser=serializeMessage(); 70 71 if(l_ser != null && l_stream != null) 72 printDiffs(l_ser, l_stream); 73 } 74 75 76 void addHeaders(Message msg) { 77 msg.putHeader("UDP", new UdpHeader("MyGroup")); 78 msg.putHeader("PING", new PingHeader(PingHeader.GET_MBRS_REQ, null)); 79 msg.putHeader("FD_SOCK", new FD_SOCK.FdHeader()); 80 msg.putHeader("VERIFY_SUSPECT", new VERIFY_SUSPECT.VerifyHeader()); 81 msg.putHeader("STABLE", new org.jgroups.protocols.pbcast.STABLE.StableHeader()); 82 msg.putHeader("NAKACK", new org.jgroups.protocols.pbcast.NakAckHeader()); 83 msg.putHeader("UNICAST", new UNICAST.UnicastHeader()); 84 msg.putHeader("FRAG", new FragHeader()); 85 msg.putHeader("GMS", new org.jgroups.protocols.pbcast.GMS.GmsHeader()); 86 } 87 88 private void printDiffs(LinkedList l_ser, LinkedList l_stream) { 89 int size_ser, size_stream; 90 long write_ser, write_stream, read_ser, read_stream; 91 92 size_ser=((Integer )l_ser.get(0)).intValue(); 93 size_stream=((Integer )l_stream.get(0)).intValue(); 94 write_ser=((Long )l_ser.get(1)).longValue(); 95 read_ser=((Long )l_ser.get(2)).longValue(); 96 write_stream=((Long )l_stream.get(1)).longValue(); 97 read_stream=((Long )l_stream.get(2)).longValue(); 98 System.out.println("\n\nserialized size=" + size_ser + ", streamable size=" + size_stream + 99 ", streamable is " + (100.0 / size_stream * size_ser -100) + " percent smaller"); 100 System.out.println("serialized write=" + write_ser + ", streamable write=" + write_stream + 101 ", streamable write is " + (100.0 / write_stream * write_ser -100) + " percent faster"); 102 System.out.println("serialized read=" + read_ser + ", streamable read=" + read_stream + 103 ", streamable read is " + (100.0 / read_stream * read_ser -100) + " percent faster"); 104 } 105 106 107 112 LinkedList serializeMessage() throws IOException { 113 LinkedList retval=new LinkedList (); 114 System.out.println("-- starting to serialize " + num + " msgs"); 115 start=System.currentTimeMillis(); 116 output=new ExposedByteArrayOutputStream(65000); 117 out=new ObjectOutputStream(output); 118 my_list.writeExternal(out); 119 out.close(); 120 stop=System.currentTimeMillis(); 121 buf=new Buffer(output.getRawBuffer(), 0, output.size()); 122 System.out.println("** serialized buffer size=" + buf.getLength() + " bytes"); 123 retval.add(new Integer (buf.getLength())); 124 125 total=stop-start; 126 retval.add(new Long (total)); 127 msgs_per_sec=num / (total/1000.0); 128 time_per_msg=total / (double)num; 129 System.out.println("\n-- total time for serializing " + num + 130 " msgs = " + total + "ms \n(" + msgs_per_sec + " msgs/sec, time_per_msg=" + time_per_msg + " ms)"); 131 132 System.out.println("-- starting to unserialize msgs"); 133 start=System.currentTimeMillis(); 134 ByteArrayInputStream input2=new ByteArrayInputStream(buf.getBuf(), buf.getOffset(), buf.getLength()); 135 ObjectInputStream in2=new ObjectInputStream(input2); 136 137 try { 138 l2.readExternal(in2); 139 } 140 catch(ClassNotFoundException e) { 141 e.printStackTrace(); 142 } 143 stop=System.currentTimeMillis(); 144 total=stop-start; 145 retval.add(new Long (total)); 146 msgs_read=l2.size(); 147 msgs_per_sec=msgs_read / (total/1000.0); 148 time_per_msg=total / (double)msgs_read; 149 System.out.println("\n-- total time for reading " + msgs_read + 150 " msgs = " + total + "ms \n(" + msgs_per_sec + " msgs/sec, time_per_msg=" + time_per_msg + ')'); 151 l2.removeAll(); 152 return retval; 153 } 154 155 LinkedList marshalMessages() throws IOException, IllegalAccessException , InstantiationException { 156 LinkedList retval=new LinkedList (); 157 System.out.println("\n\n-- starting to marshal " + num + " msgs (using Streamable)"); 158 start=System.currentTimeMillis(); 159 output=new ExposedByteArrayOutputStream(65000); 160 dos=new DataOutputStream(output); 161 dos.writeInt(my_list.size()); 162 for(Enumeration en=my_list.elements(); en.hasMoreElements();) { 163 Message tmp=(Message)en.nextElement(); 164 tmp.writeTo(dos); 165 } 166 167 dos.close(); 168 stop=System.currentTimeMillis(); 169 buf=new Buffer(output.getRawBuffer(), 0, output.size()); 170 System.out.println("** marshalled buffer size=" + buf.getLength() + " bytes"); 171 retval.add(new Integer (buf.getLength())); 172 173 total=stop-start; 174 retval.add(new Long (total)); 175 msgs_per_sec=num / (total/1000.0); 176 time_per_msg=total / (double)num; 177 System.out.println("\n-- total time for marshaling " + num + 178 " msgs = " + total + "ms \n(" + msgs_per_sec + " msgs/sec, time_per_msg=" + time_per_msg + " ms)"); 179 180 System.out.println("-- starting to unmarshal msgs (using Streamable)"); 181 start=System.currentTimeMillis(); 182 input=new ByteArrayInputStream(buf.getBuf(), buf.getOffset(), buf.getLength()); 183 dis=new DataInputStream(input); 184 msgs_read=0; 185 186 int b=dis.readInt(); 187 Message tmp; 188 for(int i=0; i < b; i++) { 189 tmp=new Message(); 190 tmp.readFrom(dis); 191 l2.add(tmp); 192 } 193 194 stop=System.currentTimeMillis(); 195 total=stop-start; 196 retval.add(new Long (total)); 197 msgs_read=l2.size(); 198 msgs_per_sec=msgs_read / (total/1000.0); 199 time_per_msg=total / (double)msgs_read; 200 System.out.println("\n-- total time for reading " + msgs_read + 201 " msgs = " + total + "ms \n(" + msgs_per_sec + " msgs/sec, time_per_msg=" + time_per_msg + ')'); 202 return retval; 203 } 204 205 public static void main(String [] args) { 206 int num=50000; 207 boolean use_serialization=true, use_streamable=true, use_additional_data=false, add_headers=true; 208 209 for(int i=0; i < args.length; i++) { 210 if(args[i].equals("-num")) { 211 num=Integer.parseInt(args[++i]); 212 continue; 213 } 214 if(args[i].equals("-use_serialization")) { 215 use_serialization=new Boolean (args[++i]).booleanValue(); 216 continue; 217 } 218 if(args[i].equals("-use_streamable")) { 219 use_streamable=new Boolean (args[++i]).booleanValue(); 220 continue; 221 } 222 if(args[i].equals("-use_additional_data")) { 223 use_additional_data=new Boolean (args[++i]).booleanValue(); 224 continue; 225 } 226 if(args[i].equals("-add_headers")) { 227 add_headers=new Boolean (args[++i]).booleanValue(); 228 continue; 229 } 230 help(); 231 return; 232 } 233 234 try { 235 new MessageSerializationTest2().start(num, use_serialization, use_streamable, use_additional_data, add_headers); 236 } 237 catch(Exception e) { 238 e.printStackTrace(); 239 } 240 } 241 242 static void help() { 243 System.out.println("MessageSerializationTest2 [-help] [-num <number>] " + 244 "[-use_serialization <true|false>] [-use_streamable <true|false>] " + 245 "[-use_additional_data <true|false>] [-add_headers <true|false>]"); 246 } 247 } 248 | Popular Tags |