1 /*------------------------------------------------------------------------------
  2 Name:      MassiveSubTest.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Load test for xmlBlaster
  6 ------------------------------------------------------------------------------*/
  7 package org.xmlBlaster.test.stress;
  8 
  9 import java.util.logging.Logger;
 10 import java.util.logging.Level;
 11 
 12 import org.xmlBlaster.util.Global;
 13 import org.xmlBlaster.util.ThreadLister;
 14 import org.xmlBlaster.util.def.Constants;
 15 import org.xmlBlaster.util.StopWatch;
 16 import org.xmlBlaster.client.qos.ConnectQos;
 17 import org.xmlBlaster.client.qos.ConnectReturnQos;
 18 import org.xmlBlaster.util.XmlBlasterException;
 19 import org.xmlBlaster.client.I_XmlBlasterAccess;
 20 import org.xmlBlaster.client.I_Callback;
 21 import org.xmlBlaster.client.key.UpdateKey;
 22 import org.xmlBlaster.client.qos.UpdateQos;
 23 import org.xmlBlaster.client.qos.EraseReturnQos;
 24 import org.xmlBlaster.client.key.SubscribeKey;
 25 import org.xmlBlaster.client.qos.SubscribeQos;
 26 import org.xmlBlaster.util.MsgUnit;
 27 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
 28 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 29 import org.xmlBlaster.j2ee.util.GlobalUtil;
 30 
 31 import org.xmlBlaster.test.Util;
 32 import org.xmlBlaster.test.MsgInterceptor;
 33 import junit.framework.*;
 34 /**
 35  * Test differents scenarios for a massive ammount of subscibers.
 36  *
 37  * <p>Test 5000 subscribers (or numSubscribers) on one connection.</p>
 38  * <p>Test 5000 subscribers (or numSubscribers) with maxSubPerCon per connection</p>
 39  * <p>Test 5000 subscribers (or numSubscribers) on one connection each.</p>
 40  * <p>Do it for IOP, RMI</p>
 41  *
 42  * <p>If withEmbedded is set to false will run without an embedded server.</p>
 43  * <pre>
 44  *  java -Xms18M -Xmx256M org.xmlBlaster.test.stress.MassiveSubTest
 45  *  java -Xms18M -Xmx256M junit.swingui.TestRunner -noloading org.xmlBlaster.test.stress.MassiveSubTest
 46  * </pre>
 47  * @author Peter Antman
 48  */
 49 
 50 public class MassiveSubTest extends TestCase implements I_Callback {
 51    private int numSubscribers = 5000;
 52    private int maxSubPerCon = 0;
 53    private boolean useOneConnection = false;
 54    private boolean withEmbedded = true;
 55    private int noToPub = 1;
 56    private int numToRec = numSubscribers * noToPub;
 57    private String ME = "MassiveSubTest";
 58    private Global glob;
 59    private static Logger log = Logger.getLogger(MassiveSubTest.class.getName());
 60    private int serverPort = 7615;
 61    private EmbeddedXmlBlaster serverThread;
 62    private boolean messageArrived = false;
 63    private MsgInterceptor updateInterceptor;
 64 
 65    private final String publishOid1 = "dummy1";
 66    private I_XmlBlasterAccess oneConnection;
 67    private String oneName;
 68 
 69    private int numReceived = 0;         // error checking
 70    private final String contentMime = "text/xml";
 71    private final String contentMimeExtended = "1.0";
 72    private GlobalUtil globalUtil;
 73 
 74    class Client {
 75       String loginName;
 76       I_XmlBlasterAccess connection;
 77       String subscribeOid;
 78       boolean oneConnection;
 79    }
 80    private Client[] manyClients;
 81    private I_XmlBlasterAccess[] manyConnections;
 82    private StopWatch stopWatch = new StopWatch();
 83 
 84 
 85    public MassiveSubTest(String testName) {
 86       super(testName);
 87       Global glob_ = Global.instance();
 88       setProtoMax(glob_, "IOR", "500");
 89       init(glob_, testName, "testManyClients", true);
 90    }
 91 
 92    public MassiveSubTest(Global glob, String testName, String loginName, boolean useOneConnection) {
 93       super(testName);
 94       init(glob, testName, loginName, useOneConnection);
 95    }
 96 
 97    public void init(Global glob, String testName, String loginName, boolean useOneConnection) {
 98       this.glob = glob;
 99 
100       this.oneName = loginName;
101       numSubscribers = glob.getProperty().get("numSubscribers", numSubscribers);
102       maxSubPerCon = glob.getProperty().get("maxSubPerCon", maxSubPerCon);
103       withEmbedded = glob.getProperty().get("withEmbedded", withEmbedded);
104       noToPub = glob.getProperty().get("noToPub", noToPub);
105       this.useOneConnection = useOneConnection;
106       String clientProtocol = glob.getProperty().get("client.protocol", "IOR");
107       try {
108          glob.getProperty().set("client.protocol",clientProtocol);
109 
110 
111 
112       }catch(XmlBlasterException ex) {
113          assertTrue("Could not setup test: " + ex, false);
114       }
115       ME = ME+":"+clientProtocol+(useOneConnection ? ":oneCon":":manyCon")+":"+numSubscribers + (maxSubPerCon>0?"/"+maxSubPerCon:"");
116 
117       numToRec = numSubscribers * noToPub;
118 
119    }
120    
121    /**
122     * Sets up the fixture.
123     * <p />
124     * Connect to xmlBlaster and login
125     */
126    protected void setUp()
127    {
128       String[] args = {
129          "-ClientProtocolPlugin[LOCAL][1.0]",
130          "org.xmlBlaster.client.protocol.local.LocalConnection",
131          "-ClientCbServerProtocolPlugin[LOCAL][1.0]",
132          "org.xmlBlaster.client.protocol.local.LocalCallbackImpl",
133          "-CbProtocolPlugin[LOCAL][1.0]",
134          "org.xmlBlaster.protocol.local.CallbackLocalDriver"
135       };
136       glob.init(args);
137 
138       log.info("Setting up test ...");
139       if (withEmbedded) {
140          glob.init(Util.getOtherServerPorts(serverPort));
141          serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
142          log.info("XmlBlaster is ready for testing a lots of subscribers");
143          globalUtil = new GlobalUtil( serverThread.getMain().getGlobal() );
144       } else {
145          globalUtil = new GlobalUtil( );
146       } // end of else
147       glob = globalUtil.getClone(glob);
148 
149 
150       numReceived = 0;
151       try {
152          oneConnection = glob.getXmlBlasterAccess(); // Find orb
153          ConnectQos connectQos = new ConnectQos(glob, oneName, "secret"); // "<qos></qos>"; During login this is manipulated (callback address added)
154          // If we have many subs on one con, we must raise the max size of the callback queue!
155          CbQueueProperty cbProp =connectQos.getSessionCbQueueProperty();
156          cbProp.setMaxEntries(numSubscribers+1000);
157          cbProp.setMaxEntriesCache(numSubscribers+1000);
158          this.updateInterceptor = new MsgInterceptor(this.glob, log, this); // Collect received msgs
159          ConnectReturnQos connectReturnQos = oneConnection.connect(connectQos, this.updateInterceptor);
160          log.info("Connected: " + connectReturnQos.toXml());
161       }
162       catch (Exception e) {
163           log.severe("Login failed: " + e.toString());
164           e.printStackTrace();
165           assertTrue("Login failed: " + e.toString(), false);
166       }
167    }
168    
169    /**
170     * Tears down the fixture.
171     * <p />
172     * cleaning up .... erase() the previous message OID and logout
173     */
174    protected void tearDown()
175    {
176       log.info("Tearing down");
177       if (numReceived != numToRec) {
178          log.severe("numToRec=" + numToRec + " but numReceived=" + numReceived);
179          assertEquals("numToRec=" + numToRec + " but numReceived=" + numReceived, numSubscribers, numReceived);
180       }
181       
182 
183       if (manyClients != null) {
184          for (int ii=0; ii<numSubscribers; ii++) {
185             Client sub = manyClients[ii];
186             if (sub.oneConnection) {
187                try {
188                   if ( sub.connection != null) {
189                      sub.connection.unSubscribe( "<key oid='"+sub.subscribeOid+"'/>",
190                                                  "<qos/>");
191                   } else {
192                      oneConnection.unSubscribe( "<key oid='"+sub.subscribeOid+"'/>",
193                                                 "<qos/>");
194                   } // end of else
195                   
196                }catch(XmlBlasterException ex) {
197                   log.severe("Could not unsubscribe: " +sub.subscribeOid+": " + ex);
198                }
199             }else {
200                sub.connection.disconnect(null);
201             }
202          }
203       }
204       if ( manyConnections != null) {
205          for ( int ii = 0;ii<manyConnections.length;ii++) {
206             manyConnections[ii].disconnect(null);
207          } // end of for ()
208          
209       } // end of if ()
210       
211 
212 
213       
214       {
215          String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
216             "<key oid='" + publishOid1 + "' queryType='EXACT'>\n" +
217             "</key>";
218          String qos = "<qos></qos>";
219          try {
220             EraseReturnQos[] arr = oneConnection.erase(xmlKey, qos);
221             assertEquals("Erase", 1, arr.length);
222          } catch(XmlBlasterException e) { fail("Erase-XmlBlasterException: " + e.getMessage()); }
223       }
224       
225       oneConnection.disconnect(null);
226       oneConnection = null;
227       log.info("Logout done");
228       if (withEmbedded) {
229          try { Thread.sleep(100L); } catch( InterruptedException i) {}
230          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
231          this.serverThread = null;
232          
233          // reset to default server port (necessary if other tests follow in the same JVM).
234          Util.resetPorts();
235       }
236 
237       this.glob = null;
238      
239       this.updateInterceptor = null;
240       this.oneConnection = null;
241       this.manyClients = null;
242       this.manyConnections = null;
243       this.stopWatch = null;
244    }
245 
246    /**
247     * helper
248     */
249    public void subcribeMany()
250    {
251       int ci=-1;
252       try {
253 
254          if (log.isLoggable(Level.FINE)) log.fine("Subscribing ...");
255          
256          String passwd = "secret";
257          
258          SubscribeKey subKeyW = new SubscribeKey(glob, publishOid1);
259          String subKey = subKeyW.toXml(); // "<key oid='" + publishOid1 + "' queryType='EXACT'></key>";
260          
261          SubscribeQos subQosW = new SubscribeQos(glob); // "<qos></qos>";
262          String subQos = subQosW.toXml();
263          
264          manyClients = new Client[numSubscribers];
265          if (maxSubPerCon >0 ) {
266             // Check if reasonably
267             if (  numSubscribers %  maxSubPerCon!= 0) {
268              assertTrue("numSubscribers not divadable by breakpoint", false);
269             }
270             
271             manyConnections = new I_XmlBlasterAccess[numSubscribers/maxSubPerCon];
272          } // end of if ()
273          
274          
275          long usedBefore = getUsedServerMemory();
276          
277          log.info("Setting up " + numSubscribers + " subscriber clients ...");
278 
279          int startNoThreads = ThreadLister.countThreads();
280          //ThreadLister.listAllThreads(System.out);
281          stopWatch = new StopWatch();
282          for (int ii=0; ii<numSubscribers; ii++) {
283             Client sub = new Client();
284             sub.loginName = "Joe-" + ii;
285             sub.oneConnection = useOneConnection;
286             if (useOneConnection) {
287                // Should we distribute among a few connections
288                if (maxSubPerCon >0) {
289                   if (  ii % maxSubPerCon == 0) {
290                      ci++;
291                      try {
292                         log.fine("Creating connection no: " +ci);
293                         Global gg = globalUtil.getClone(glob);
294                         // Try to reuse the same ORB to avoid too many threads:
295                         if ("IOR".equals(gg.getProperty().get("protocol","IOR")) && ci > 0) {
296                            gg.addObjectEntry(Constants.RELATING_CLIENT+":org.xmlBlaster.util.protocol.corba.OrbInstanceWrapper",
297                                              (org.xmlBlaster.util.protocol.corba.OrbInstanceWrapper)manyConnections[ci-1].getGlobal().getObjectEntry(Constants.RELATING_CLIENT+":org.xmlBlaster.util.protocol.corba.OrbInstanceWrapper"));
298                         }
299                         manyConnections[ci] = gg.getXmlBlasterAccess();
300                         ConnectQos connectQos = new ConnectQos(gg, sub.loginName, passwd); // "<qos></qos>"; During login this is manipulated (callback address added)
301                         // If we have many subs on one con, we must raise the max size of the callback queue!
302                         CbQueueProperty cbProp =connectQos.getSessionCbQueueProperty();
303                         // algo is maxSubPerCon*4
304                         cbProp.setMaxEntries(maxSubPerCon*1000);//This means we have a backlog of 1000 messages per subscriber as i normal when each con only have one subscriber!
305                         //cbProp.setMaxBytes(4000);
306                         //cbProp.setOnOverflow(Constants.ONOVERFLOW_BLOCK);
307                         //connectQos.setSubjectQueueProperty(cbProp);
308                         log.fine("Login qos: " +  connectQos.toXml());
309                         ConnectReturnQos connectReturnQos = manyConnections[ci].connect(connectQos, this);
310                         log.info("Connected maxSubPerCon=" + maxSubPerCon + " : " + connectReturnQos.toXml());
311                      }
312                      catch (Exception e) {
313                         log.severe("Login failed: " + e.toString());
314                         assertTrue("Login failed: " + e.toString(), false);
315                      }
316                      
317                   } // end of if ()
318                   sub.connection = manyConnections[ci];
319                } else {
320                   sub.connection = oneConnection;
321                }
322             }else {
323                try {
324                   Global gg = globalUtil.getClone(glob);
325                   sub.connection = gg.getXmlBlasterAccess();
326                   ConnectQos connectQos = new ConnectQos(gg, sub.loginName, passwd); // "<qos></qos>"; During login this is manipulated (callback address added)
327                   ConnectReturnQos connectReturnQos = sub.connection.connect(connectQos, this);
328                   log.info("Connected: " + connectReturnQos.toXml());
329                }
330                catch (Exception e) {
331                   log.severe("Login failed: " + e.toString());
332                   assertTrue("Login failed: " + e.toString(), false);
333                }                                                        
334             }
335             try {
336             sub.subscribeOid = sub.connection.subscribe(subKey, subQos).getSubscriptionId();
337             log.fine("Client " + sub.loginName + " subscribed to " + subKeyW.getOid());
338             } catch(XmlBlasterException e) {
339                log.warning("XmlBlasterException: " + e.getMessage());
340                assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
341             }
342             
343             manyClients[ii] = sub;
344          }
345          double timeForLogins = (double)stopWatch.elapsed()/1000.; // msec -> sec
346 
347          
348          long usedAfter = getUsedServerMemory();
349          long memPerLogin = (usedAfter - usedBefore)/numSubscribers;
350          int noThreads = ThreadLister.countThreads();
351          int tDiff = noThreads - startNoThreads;
352          int tPerConn = ((ci == 0|| ci == -1) ? tDiff :tDiff/(ci+1));
353          int subPerT = tDiff != 0 ? numSubscribers/tDiff:0;
354          
355          log.info(numSubscribers + " subscriber clients are ready.");
356          log.info("Server memory per login consumed=" + memPerLogin);
357          log.info("Time " + (long)(numSubscribers/timeForLogins) + " logins/sec");
358          log.info("Threads created " + tDiff + ", threads per connection " + tPerConn + ", sub  per thread " + subPerT);
359          //ThreadLister.listAllThreads(System.out);
360          //try { Thread.sleep(5000000L); } catch( InterruptedException i) {}
361          
362       } catch (Error e) {
363          e.printStackTrace();
364          log.severe("Could not set up subscribers: " +e);
365          log.severe("No of threads " + ThreadLister.countThreads() + " for connection no " + ci);
366          throw e;
367       } // end of try-catch
368       
369    }
370    
371    /**
372     * Query xmlBlaster for its current memory consumption. 
373     */
374    long getUsedServerMemory() {
375       String xmlKey = "<key oid='__cmd:?usedMem' queryType='EXACT'></key>";
376       String qos = "<qos></qos>";
377       try {
378          MsgUnit[] msgArr = oneConnection.get(xmlKey, qos);
379          String mem = new String(msgArr[0].getContent());
380          return new Long(mem).longValue();
381       } catch (XmlBlasterException e) {
382          log.warning(e.toString());
383          return 0L;
384       }
385    }
386    /**
387     * TEST: Publish numToPub messages..
388     * <p />
389     * The returned publishOid1 is checked
390     */
391    public void publish()
392    {
393       if (log.isLoggable(Level.FINE)) log.fine("Publishing a message ...");
394       
395       numReceived = 0;
396       String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
397                       "<key oid='" + publishOid1 + "' contentMime='" + contentMime + "' contentMimeExtended='" + contentMimeExtended + "'>\n" +
398          "</key>";
399       String senderContent = "Yeahh, i'm the new content";
400 
401       try {
402          stopWatch = new StopWatch();
403          for (int i = 0; i < noToPub;i++) {
404             senderContent = senderContent+"-"+i;
405             MsgUnit msgUnit = new MsgUnit(xmlKey, senderContent.getBytes(), "<qos></qos>");
406             String tmp = oneConnection.publish(msgUnit).getKeyOid();
407             assertEquals("Wrong publishOid1", publishOid1, tmp);
408             log.info("Success: Publishing done for " + i +", returned oid=" + publishOid1);
409          }
410       } catch(XmlBlasterException e) {
411          log.warning("XmlBlasterException: " + e.getMessage());
412          assertTrue("publishOne - XmlBlasterException: " + e.getMessage(), false);
413       }
414    }
415    /**
416     * This is the callback method invoked from xmlBlaster
417     * delivering us a new asynchronous message. 
418     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
419     */
420    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
421    {
422       //log.info("Client " + loginName + " receiving update of message oid=" + updateKey.getOid() + "...");
423       numReceived++;
424 
425       if (numReceived == numToRec) {
426          long avg = 0;
427          double elapsed = stopWatch.elapsed();
428          if (elapsed > 0.)
429             avg = (long)(1000.0 * numReceived / elapsed);
430          log.info(numReceived + " messages updated, average messages/second = " + avg + stopWatch.nice());
431       }
432       return "";
433    }
434    
435    /**
436     * TEST: Construct a message and publish it,
437     * all clients should receive an update. 
438     */
439    public void testManyClients()
440    {
441       System.out.println("");
442       log.info("TEST 1, many clients, useOneConnection="+useOneConnection);
443       
444       subcribeMany();
445       try { Thread.sleep(1000L); } catch( InterruptedException i) {}                                            // Wait some time for callback to arrive ...
446       assertEquals("numReceived after subscribe", 0, numReceived);  // there should be no Callback
447       
448       publish();
449       long delay = 2000L + 10 * numToRec;
450       log.info("Waiting long enough for updates ..."+delay);
451       Util.delay(delay);                          // Wait some time for callback to arrive ...
452       // !!!! this.updateInterceptor.
453 
454       if ( numReceived != numToRec ){
455          // Warn and wait some more
456          log.warning("Have not yet received more than " +numReceived+"/"+numToRec+" waiting some more");
457          int midRec=numReceived;
458          long avg = 0;
459          double elapsed = stopWatch.elapsed();
460          if (elapsed > 0.)
461             avg = (long)(1000.0 * numReceived / elapsed);
462          log.info(numReceived + " messages updated, average firts round messages/second = " + avg + stopWatch.nice(false));//Don't reset
463          Util.delay(2L*delay); 
464          //Lastt delay
465          if ( numReceived != numToRec ){
466          // Warn and wait some more
467          log.warning("Have NOT yet received more than " +numReceived+"/"+numToRec+" waiting last round");
468          avg = 0;
469          elapsed = stopWatch.elapsed()-elapsed;
470          if (elapsed > 0.)
471             avg = (long)(1000.0 *( numReceived -midRec)/ elapsed);
472          log.info(numReceived-midRec + " messages updated this round, average second round messages/second = " + avg + stopWatch.nice(false));//Don't reset
473          Util.delay(4L*delay); 
474       }
475          
476       }
477 
478       log.info("Got messages:" +numReceived+"/"+numToRec);
479       assertEquals("Wrong number of updates", numToRec, numReceived);
480       
481    }
482    
483    /**
484     * Method is used by TestRunner to load these tests.
485     *
486     * <p>Warning! The default uses the embedded server, to give each round a equal chance. But it is MUCH slower than using a server in another VM.</p>
487     */
488    public static Test suite()
489    {
490       TestSuite suite= new TestSuite();
491       String loginName = "Tim";
492       Global glob = Global.instance();
493       // Test IOR many on one
494       setProtoMax(glob,"IOR","0");
495       suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));
496       // Test IOR many on few
497       setProtoMax(glob,"IOR","500");
498       suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));
499       // Test RMI many on one
500       setProtoMax(glob,"RMI","0");
501       suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));
502       // Test RMI many on few
503       setProtoMax(glob,"RMI","500");
504       suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,true));
505       // Test IOR many on many
506       setProtoMax(glob,"IOR","0");
507       suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,false));
508       // Test RMI many on many
509       setProtoMax(glob,"RMI","0");
510       suite.addTest(new MassiveSubTest(glob, "testManyClients", loginName,false));
511       
512       return suite;
513    }
514 
515    private static void setProtoMax(Global glob, String proto, String max) {
516       try {
517          glob.getProperty().set("client.protocol",proto);
518          glob.getProperty().set("maxSubPerCon",max);
519       }catch(XmlBlasterException ex) {
520          assertTrue("Could not setup test: " + ex, false);
521       }
522    }
523 
524    public static void main(String[] args) {
525       Global glob = new Global(args);
526       setProtoMax(glob, "IOR", "500");
527       MassiveSubTest m = new MassiveSubTest(glob, "testManyClients", "testManyClients", false);
528       m.setUp();
529       m.testManyClients();
530       m.tearDown();
531    }
532    
533 } // MassiveSubTest


syntax highlighted by Code2HTML, v. 0.9.1