1 /*------------------------------------------------------------------------------
2 Name: MassiveSubTest.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Load test for xmlBlaster
6 ------------------------------------------------------------------------------*/
7 package org.xmlBlaster.test.stress;
8
9 import java.util.logging.Logger;
10 import java.util.logging.Level;
11
12 import org.xmlBlaster.util.Global;
13 import org.xmlBlaster.util.ThreadLister;
14 import org.xmlBlaster.util.def.Constants;
15 import org.xmlBlaster.util.StopWatch;
16 import org.xmlBlaster.client.qos.ConnectQos;
17 import org.xmlBlaster.client.qos.ConnectReturnQos;
18 import org.xmlBlaster.util.XmlBlasterException;
19 import org.xmlBlaster.client.I_XmlBlasterAccess;
20 import org.xmlBlaster.client.I_Callback;
21 import org.xmlBlaster.client.key.UpdateKey;
22 import org.xmlBlaster.client.qos.UpdateQos;
23 import org.xmlBlaster.client.qos.EraseReturnQos;
24 import org.xmlBlaster.client.key.SubscribeKey;
25 import org.xmlBlaster.client.qos.SubscribeQos;
26 import org.xmlBlaster.util.MsgUnit;
27 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
28 import org.xmlBlaster.util.EmbeddedXmlBlaster;
29 import org.xmlBlaster.j2ee.util.GlobalUtil;
30
31 import org.xmlBlaster.test.Util;
32 import org.xmlBlaster.test.MsgInterceptor;
33 import junit.framework.*;
34 /**
35 * Test differents scenarios for a massive ammount of subscibers.
36 *
37 * <p>Test 5000 subscribers (or numSubscribers) on one connection.</p>
38 * <p>Test 5000 subscribers (or numSubscribers) with maxSubPerCon per connection</p>
39 * <p>Test 5000 subscribers (or numSubscribers) on one connection each.</p>
40 * <p>Do it for IOP, RMI</p>
41 *
42 * <p>If withEmbedded is set to false will run without an embedded server.</p>
43 * <pre>
44 * java -Xms18M -Xmx256M org.xmlBlaster.test.stress.MassiveSubTest
45 * java -Xms18M -Xmx256M junit.swingui.TestRunner -noloading org.xmlBlaster.test.stress.MassiveSubTest
46 * </pre>
47 * @author Peter Antman
48 */
49
50 public class MassiveSubTest extends TestCase implements I_Callback {
51 private int numSubscribers = 5000;
52 private int maxSubPerCon = 0;
53 private boolean useOneConnection = false;
54 private boolean withEmbedded = true;
55 private int noToPub = 1;
56 private int numToRec = numSubscribers * noToPub;
57 private String ME = "MassiveSubTest";
58 private Global glob;
59 private static Logger log = Logger.getLogger(MassiveSubTest.class.getName());
60 private int serverPort = 7615;
61 private EmbeddedXmlBlaster serverThread;
62 private boolean messageArrived = false;
63 private MsgInterceptor updateInterceptor;
64
65 private final String publishOid1 = "dummy1";
66 private I_XmlBlasterAccess oneConnection;
67 private String oneName;
68
69 private int numReceived = 0; // error checking
70 private final String contentMime = "text/xml";
71 private final String contentMimeExtended = "1.0";
72 private GlobalUtil globalUtil;
73
74 class Client {
75 String loginName;
76 I_XmlBlasterAccess connection;
77 String subscribeOid;
78 boolean oneConnection;
79 }
80 private Client[] manyClients;
81 private I_XmlBlasterAccess[] manyConnections;
82 private StopWatch stopWatch = new StopWatch();
83
84
85 public MassiveSubTest(String testName) {
86 super(testName);
87 Global glob_ = Global.instance();
88 setProtoMax(glob_, "IOR", "500");
89 init(glob_, testName, "testManyClients", true);
90 }
91
92 public MassiveSubTest(Global glob, String testName, String loginName, boolean useOneConnection) {
93 super(testName);
94 init(glob, testName, loginName, useOneConnection);
95 }
96
97 public void init(Global glob, String testName, String loginName, boolean useOneConnection) {
98 this.glob = glob;
99
100 this.oneName = loginName;
101 numSubscribers = glob.getProperty().get("numSubscribers", numSubscribers);
102 maxSubPerCon = glob.getProperty().get("maxSubPerCon", maxSubPerCon);
103 withEmbedded = glob.getProperty().get("withEmbedded", withEmbedded);
104 noToPub = glob.getProperty().get("noToPub", noToPub);
105 this.useOneConnection = useOneConnection;
106 String clientProtocol = glob.getProperty().get("client.protocol", "IOR");
107 try {
108 glob.getProperty().set("client.protocol",clientProtocol);
109
110
111
112 }catch(XmlBlasterException ex) {
113 assertTrue("Could not setup test: " + ex, false);
114 }
115 ME = ME+":"+clientProtocol+(useOneConnection ? ":oneCon":":manyCon")+":"+numSubscribers + (maxSubPerCon>0?"/"+maxSubPerCon:"");
116
117 numToRec = numSubscribers * noToPub;
118
119 }
120
121 /**
122 * Sets up the fixture.
123 * <p />
124 * Connect to xmlBlaster and login
125 */
126 protected void setUp()
127 {
128 String[] args = {
129 "-ClientProtocolPlugin[LOCAL][1.0]",
130 "org.xmlBlaster.client.protocol.local.LocalConnection",
131 "-ClientCbServerProtocolPlugin[LOCAL][1.0]",
132 "org.xmlBlaster.client.protocol.local.LocalCallbackImpl",
133 "-CbProtocolPlugin[LOCAL][1.0]",
134 "org.xmlBlaster.protocol.local.CallbackLocalDriver"
135 };
136 glob.init(args);
137
138 log.info("Setting up test ...");
139 if (withEmbedded) {
140 glob.init(Util.getOtherServerPorts(serverPort));
141 serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
142 log.info("XmlBlaster is ready for testing a lots of subscribers");
143 globalUtil = new GlobalUtil( serverThread.getMain().getGlobal() );
144 } else {
145 globalUtil = new GlobalUtil( );
146 } // end of else
147 glob = globalUtil.getClone(glob);
148
149
150 numReceived = 0;
151 try {
152 oneConnection = glob.getXmlBlasterAccess(); // Find orb
153 ConnectQos connectQos = new ConnectQos(glob, oneName, "secret"); // "<qos></qos>"; During login this is manipulated (callback address added)
154 // If we have many subs on one con, we must raise the max size of the callback queue!
155 CbQueueProperty cbProp =connectQos.getSessionCbQueueProperty();
156 cbProp.setMaxEntries(numSubscribers+1000);
157 cbProp.setMaxEntriesCache(numSubscribers+1000);
158 this.updateInterceptor = new MsgInterceptor(this.glob, log, this); // Collect received msgs
159 ConnectReturnQos connectReturnQos = oneConnection.connect(connectQos, this.updateInterceptor);
160 log.info("Connected: " + connectReturnQos.toXml());
161 }
162 catch (Exception e) {
163 log.severe("Login failed: " + e.toString());
164 e.printStackTrace();
165 assertTrue("Login failed: " + e.toString(), false);
166 }
167 }
168
169 /**
170 * Tears down the fixture.
171 * <p />
172 * cleaning up .... erase() the previous message OID and logout
173 */
174 protected void tearDown()
175 {
176 log.info("Tearing down");
177 if (numReceived != numToRec) {
178 log.severe("numToRec=" + numToRec + " but numReceived=" + numReceived);
179 assertEquals("numToRec=" + numToRec + " but numReceived=" + numReceived, numSubscribers, numReceived);
180 }
181
182
183 if (manyClients != null) {
184 for (int ii=0; ii<numSubscribers; ii++) {
185 Client sub = manyClients[ii];
186 if (sub.oneConnection) {
187 try {
188 if ( sub.connection != null) {
189 sub.connection.unSubscribe( "<key oid='"+sub.subscribeOid+"'/>",
190 "<qos/>");
191 } else {
192 oneConnection.unSubscribe( "<key oid='"+sub.subscribeOid+"'/>",
193 "<qos/>");
194 } // end of else
195
196 }catch(XmlBlasterException ex) {
197 log.severe("Could not unsubscribe: " +sub.subscribeOid+": " + ex);
198 }
199 }else {
200 sub.connection.disconnect(null);
201 }
202 }
203 }
204 if ( manyConnections != null) {
205 for ( int ii = 0;ii<manyConnections.length;ii++) {
206 manyConnections[ii].disconnect(null);
207 } // end of for ()
208
209 } // end of if ()
210
211
212
213
214 {
215 String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
216 "<key oid='" + publishOid1 + "' queryType='EXACT'>\n" +
217 "</key>";
218 String qos = "<qos></qos>";
219 try {
220 EraseReturnQos[] arr = oneConnection.erase(xmlKey, qos);
221 assertEquals("Erase", 1, arr.length);
222 } catch(XmlBlasterException e) { fail("Erase-XmlBlasterException: " + e.getMessage()); }
223 }
224
225 oneConnection.disconnect(null);
226 oneConnection = null;
227 log.info("Logout done");
228 if (withEmbedded) {
229 try { Thread.sleep(100L); } catch( InterruptedException i) {}
230 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
231 this.serverThread = null;
232
233 // reset to default server port (necessary if other tests follow in the same JVM).
234 Util.resetPorts();
235 }
236
237 this.glob = null;
238
239 this.updateInterceptor = null;
240 this.oneConnection = null;
241 this.manyClients = null;
242 this.manyConnections = null;
243 this.stopWatch = null;
244 }
245
246 /**
247 * helper
248 */
249 public void subcribeMany()
250 {
251 int ci=-1;
252 try {
253
254 if (log.isLoggable(Level.FINE)) log.fine("Subscribing ...");
255
256 String passwd = "secret";
257
258 SubscribeKey subKeyW = new SubscribeKey(glob, publishOid1);
259 String subKey = subKeyW.toXml(); // "<key oid='" + publishOid1 + "' queryType='EXACT'></key>";
260
261 SubscribeQos subQosW = new SubscribeQos(glob); // "<qos></qos>";
262 String subQos = subQosW.toXml();
263
264 manyClients = new Client[numSubscribers];
265 if (maxSubPerCon >0 ) {
266 // Check if reasonably
267 if ( numSubscribers % maxSubPerCon!= 0) {
268 assertTrue("numSubscribers not divadable by breakpoint", false);
269 }
270
271 manyConnections = new I_XmlBlasterAccess[numSubscribers/maxSubPerCon];
272 } // end of if ()
273
274
275 long usedBefore = getUsedServerMemory();
276
277 log.info("Setting up " + numSubscribers + " subscriber clients ...");
278
279 int startNoThreads = ThreadLister.countThreads();
280 //ThreadLister.listAllThreads(System.out);
281 stopWatch = new StopWatch();
282 for (int ii=0; ii<numSubscribers; ii++) {
283 Client sub = new Client();
284 sub.loginName = "Joe-" + ii;
285 sub.oneConnection = useOneConnection;
286 if (useOneConnection) {
287 // Should we distribute among a few connections
288 if (maxSubPerCon >0) {
289 if ( ii % maxSubPerCon == 0) {
290 ci++;
291 try {
292 log.fine("Creating connection no: " +ci);
293 Global gg = globalUtil.getClone(glob);
294 // Try to reuse the same ORB to avoid too many threads:
295 if ("IOR".equals(gg.getProperty().get("protocol","IOR")) && ci > 0) {
296 gg.addObjectEntry(Constants.RELATING_CLIENT+":org.xmlBlaster.util.protocol.corba.OrbInstanceWrapper",
297 (org.xmlBlaster.util.protocol.corba.OrbInstanceWrapper)manyConnections[ci-1].getGlobal().getObjectEntry(Constants.RELATING_CLIENT+":org.xmlBlaster.util.protocol.corba.OrbInstanceWrapper"));
298 }
299 manyConnections[ci] = gg.getXmlBlasterAccess();
300 ConnectQos connectQos = new ConnectQos(gg, sub.loginName, passwd); // "<qos></qos>"; During login this is manipulated (callback address added)
301 // If we have many subs on one con, we must raise the max size of the callback queue!
302 CbQueueProperty cbProp =connectQos.getSessionCbQueueProperty();
303 // algo is maxSubPerCon*4
304 cbProp.setMaxEntries(maxSubPerCon*1000);//This means we have a backlog of 1000 messages per subscriber as i normal when each con only have one subscriber!
305 //cbProp.setMaxBytes(4000);
306 //cbProp.setOnOverflow(Constants.ONOVERFLOW_BLOCK);
307 //connectQos.setSubjectQueueProperty(cbProp);
308 log.fine("Login qos: " + connectQos.toXml());
309 ConnectReturnQos connectReturnQos = manyConnections[ci].connect(connectQos, this);
310 log.info("Connected maxSubPerCon=" + maxSubPerCon + " : " + connectReturnQos.toXml());
311 }
312 catch (Exception e) {
313 log.severe("Login failed: " + e.toString());
314 assertTrue("Login failed: " + e.toString(), false);
315 }
316
317 } // end of if ()
318 sub.connection = manyConnections[ci];
319 } else {
320 sub.connection = oneConnection;
321 }
322 }else {
323 try {
324 Global gg = globalUtil.getClone(glob);
325 sub.connection = gg.getXmlBlasterAccess();
326 ConnectQos connectQos = new ConnectQos(gg, sub.loginName, passwd); // "<qos></qos>"; During login this is manipulated (callback address added)
327 ConnectReturnQos connectReturnQos = sub.connection.connect(connectQos, this);
328 log.info("Connected: " + connectReturnQos.toXml());
329 }
330 catch (Exception e) {
331 log.severe("Login failed: " + e.toString());
332 assertTrue("Login failed: " + e.toString(), false);
333 }
334 }
335 try {
336 sub.subscribeOid = sub.connection.subscribe(subKey, subQos).getSubscriptionId();
337 log.fine("Client " + sub.loginName + " subscribed to " + subKeyW.getOid());
338 } catch(XmlBlasterException e) {
339 log.warning("XmlBlasterException: " + e.getMessage());
340 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
341 }
342
343 manyClients[ii] = sub;
344 }
345 double timeForLogins = (double)stopWatch.elapsed()/1000.; // msec -> sec
346
347
348 long usedAfter = getUsedServerMemory();
349 long memPerLogin = (usedAfter - usedBefore)/numSubscribers;
350 int noThreads = ThreadLister.countThreads();
351 int tDiff = noThreads - startNoThreads;
352 int tPerConn = ((ci == 0|| ci == -1) ? tDiff :tDiff/(ci+1));
353 int subPerT = tDiff != 0 ? numSubscribers/tDiff:0;
354
355 log.info(numSubscribers + " subscriber clients are ready.");
356 log.info("Server memory per login consumed=" + memPerLogin);
357 log.info("Time " + (long)(numSubscribers/timeForLogins) + " logins/sec");
358 log.info("Threads created " + tDiff + ", threads per connection " + tPerConn + ", sub per thread " + subPerT);
359 //ThreadLister.listAllThreads(System.out);
360 //try { Thread.sleep(5000000L); } catch( InterruptedException i) {}
361
362 } catch (Error e) {
363 e.printStackTrace();
364 log.severe("Could not set up subscribers: " +e);
365 log.severe("No of threads " + ThreadLister.countThreads() + " for connection no " + ci);
366 throw e;
367 } // end of try-catch
368
369 }
370
371 /**
372 * Query xmlBlaster for its current memory consumption.
373 */
374 long getUsedServerMemory() {
375 String xmlKey = "<key oid='__cmd:?usedMem' queryType='EXACT'></key>";
376 String qos = "<qos></qos>";
377 try {
378 MsgUnit[] msgArr = oneConnection.get(xmlKey, qos);
379 String mem = new String(msgArr[0].getContent());
380 return new Long(mem).longValue();
381 } catch (XmlBlasterException e) {
382 log.warning(e.toString());
383 return 0L;
384 }
385 }
386 /**
387 * TEST: Publish numToPub messages..
388 * <p />
389 * The returned publishOid1 is checked
390 */
391 public void publish()
392 {
393 if (log.isLoggable(Level.FINE)) log.fine("Publishing a message ...");
394
395 numReceived = 0;
396 String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
397 "<key oid='" + publishOid1 + "' contentMime='" + contentMime + "' contentMimeExtended='" + contentMimeExtended + "'>\n" +
398 "</key>";
399 String senderContent = "Yeahh, i'm the new content";
400
401 try {
402 stopWatch = new StopWatch();
403 for (int i = 0; i < noToPub;i++) {
404 senderContent = senderContent+"-"+i;
405 MsgUnit msgUnit = new MsgUnit(xmlKey, senderContent.getBytes(), "<qos></qos>");
406 String tmp = oneConnection.publish(msgUnit).getKeyOid();
407 assertEquals("Wrong publishOid1", publishOid1, tmp);
408 log.info("Success: Publishing done for " + i +", returned oid=" + publishOid1);
409 }
410 } catch(XmlBlasterException e) {
411 log.warning("XmlBlasterException: " + e.getMessage());
412 assertTrue("publishOne - XmlBlasterException: " + e.getMessage(), false);
413 }
414 }
415 /**
416 * This is the callback method invoked from xmlBlaster
417 * delivering us a new asynchronous message.
418 * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
419 */
420 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
421 {
422 //log.info("Client " + loginName + " receiving update of message oid=" + updateKey.getOid() + "...");
423 numReceived++;
424
425 if (numReceived == numToRec) {
426 long avg = 0;
427 double elapsed = stopWatch.elapsed();
428 if (elapsed > 0.)
429 avg = (long)(1000.0 * numReceived / elapsed);
430 log.info(numReceived + " messages updated, average messages/second = " + avg + stopWatch.nice());
431 }
432 return "";
433 }
434
435 /**
436 * TEST: Construct a message and publish it,
437 * all clients should receive an update.
438 */
439 public void testManyClients()
440 {
441 System.out.println("");
442 log.info("TEST 1, many clients, useOneConnection="+useOneConnection);
443
444 subcribeMany();
445 try { Thread.sleep(1000L); } catch( InterruptedException i) {} // Wait some time for callback to arrive ...
446 assertEquals("numReceived after subscribe", 0, numReceived); // there should be no Callback
447
448 publish();
449 long delay = 2000L + 10 * numToRec;
450 log.info("Waiting long enough for updates ..."+delay);
451 Util.delay(delay); // Wait some time for callback to arrive ...
452 // !!!! this.updateInterceptor.
453
454 if ( numReceived != numToRec ){
455 // Warn and wait some more
456 log.warning("Have not yet received more than " +numReceived+"/"+numToRec+" waiting some more");
457 int midRec=numReceived;
458 long avg = 0;
459 double elapsed = stopWatch.elapsed();
460 if (elapsed > 0.)
461 avg = (long)(1000.0 * numReceived / elapsed);
462 log.info(numReceived + " messages updated, average firts round messages/second = " + avg + stopWatch.nice(false));//Don't reset
463 Util.delay(2L*delay);
464 //Lastt delay
465 if ( numReceived != numToRec ){
466 // Warn and wait some more
467 log.warning("Have NOT yet received more than " +numReceived+"/"+numToRec+" waiting last round");
468 avg = 0;
469 elapsed = stopWatch.elapsed()-elapsed;
470 if (elapsed > 0.)
471 avg = (long)(1000.0 *( numReceived -midRec)/ elapsed);
472 log.info(numReceived-midRec + " messages updated this round, average second round messages/second = " + avg + stopWatch.nice(false));//Don't reset
473 Util.delay(4L*delay);
474 }
475
476 }
477
478 log.info("Got messages:" +numReceived+"/"+numToRec);
479 assertEquals("Wrong number of updates", numToRec, numReceived);
480
481 }
482
483 /**
484 * Method is used by TestRunner to load these tests.
485 *
486 * <p>Warning! The default uses the embedded server, to give each round a equal chance. But it is MUCH slower than using a server in another VM.</p>
487 */
488 public static Test suite()
489 {
490 TestSuite suite= new TestSuite();
491 String loginName = "Tim";
492 Global glob = Global.instance();
493 // Test IOR many on one
494 setProtoMax(glob,"IOR","0");
495 suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));
496 // Test IOR many on few
497 setProtoMax(glob,"IOR","500");
498 suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));
499 // Test RMI many on one
500 setProtoMax(glob,"RMI","0");
501 suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));
502 // Test RMI many on few
503 setProtoMax(glob,"RMI","500");
504 suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));
505 // Test IOR many on many
506 setProtoMax(glob,"IOR","0");
507 suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,false));
508 // Test RMI many on many
509 setProtoMax(glob,"RMI","0");
510 suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,false));
511
512 return suite;
513 }
514
515 private static void setProtoMax(Global glob, String proto, String max) {
516 try {
517 glob.getProperty().set("client.protocol",proto);
518 glob.getProperty().set("maxSubPerCon",max);
519 }catch(XmlBlasterException ex) {
520 assertTrue("Could not setup test: " + ex, false);
521 }
522 }
523
524 public static void main(String[] args) {
525 Global glob = new Global(args);
526 setProtoMax(glob, "IOR", "500");
527 MassiveSubTest m = new MassiveSubTest(glob, "testManyClients", "testManyClients", false);
528 m.setUp();
529 m.testManyClients();
530 m.tearDown();
531 }
532
533 } // MassiveSubTest
syntax highlighted by Code2HTML, v. 0.9.1