1 /*------------------------------------------------------------------------------
  2 Name:      LoadTestSub.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Load test for xmlBlaster
  6 Version:   $Id: LoadTestSub.java 14846 2006-03-07 17:14:22Z ruff $
  7 ------------------------------------------------------------------------------*/
  8 package org.xmlBlaster.test.stress;
  9 
 10 import org.xmlBlaster.util.StopWatch;
 11 
 12 import java.util.logging.Logger;
 13 import java.util.logging.Level;
 14 import org.xmlBlaster.util.Global;
 15 import org.xmlBlaster.util.XmlBlasterException;
 16 import org.xmlBlaster.util.MsgUnit;
 17 import org.xmlBlaster.client.I_XmlBlasterAccess;
 18 import org.xmlBlaster.client.key.UpdateKey;
 19 import org.xmlBlaster.client.qos.ConnectQos;
 20 import org.xmlBlaster.client.qos.UpdateQos;
 21 import org.xmlBlaster.client.qos.PublishReturnQos;
 22 import org.xmlBlaster.client.qos.EraseReturnQos;
 23 import org.xmlBlaster.client.I_Callback;
 24 
 25 import junit.framework.*;
 26 
 27 
 28 /**
 29  * This client does a subscribe() with many publish() calls.<br />
 30  * The same message is published 1000 times, to measure messages/second performance.
 31  * <p />
 32  * This client may be invoked multiple time on the same xmlBlaster server,
 33  * as it cleans up everything after his tests are done.
 34  * <p>
 35  * Invoke examples:<br />
 36  * <pre>
 37  *    java junit.textui.TestRunner org.xmlBlaster.test.stress.LoadTestSub
 38  *    java junit.swingui.TestRunner org.xmlBlaster.test.stress.LoadTestSub
 39  * </pre>
 40  */
 41 public class LoadTestSub extends TestCase implements I_Callback
 42 {
 43    private static String ME = "LoadTestSub";
 44    private boolean messageArrived = false;
 45    private StopWatch stopWatch = null;
 46    private Global glob;
 47    private static Logger log = Logger.getLogger(LoadTestSub.class.getName());
 48 
 49    private String subscribeOid;
 50    private String publishOid = "LoadTestSub";
 51    private I_XmlBlasterAccess senderConnection;
 52    private String senderName;
 53    private String receiverName;         // sender/receiver is here the same client
 54    private String passwd;
 55 
 56    private final int numPublish;        // 200;
 57    private int numReceived = 0;         // error checking
 58    private int burstModePublish = 1;
 59    private boolean publishOneway = false;
 60    private boolean persistent = false;
 61    private final String contentMime = "text/plain";
 62    private final String contentMimeExtended = "1.0";
 63    private int lastContentNumber = -1;
 64    private final String someContent = "Yeahh, i'm the new content, my total length is big an bigger but still i want to be longer and longer until i have reached some 180 bytes, here is remaining blahh to fill the last";
 65 
 66    public LoadTestSub() { // JUNIT
 67       this(new Global(), "LoadTestSub", "LoadTestSub", "secret", 1000, 1, false, false);
 68    }
 69    
 70    /**
 71     * Constructs the LoadTestSub object.
 72     * <p />
 73     * @param testName  The name used in the test suite
 74     * @param loginName The name to login to the xmlBlaster
 75     * @param numPublish The number of messages to send
 76     * @param burstModePublish send given number of publish messages in one bulk
 77     */
 78    public LoadTestSub(Global glob, String testName, String loginName, String passwd, int numPublish, int burstModePublish, boolean publishOneway, boolean persistent)
 79    {
 80        super(testName);
 81        this.glob = glob;
 82 
 83        this.senderName = loginName;
 84        this.receiverName = loginName;
 85        this.passwd = passwd;
 86        this.numPublish = numPublish;
 87        this.burstModePublish = burstModePublish;
 88        this.publishOneway = publishOneway;
 89        this.persistent = persistent;
 90    }
 91 
 92 
 93    /**
 94     * Sets up the fixture.
 95     * <p />
 96     * Connect to xmlBlaster and login
 97     */
 98    protected void setUp()
 99    {
100       try {
101          senderConnection = glob.getXmlBlasterAccess();
102          ConnectQos connectQos = new ConnectQos(glob, senderName, passwd);
103          senderConnection.connect(connectQos, this); // Login to xmlBlaster
104          if (burstModePublish > numPublish)
105             burstModePublish = numPublish;
106          if ((numPublish % burstModePublish) != 0)
107             log.severe("numPublish should by dividable by publish.burstMode");
108          log.info("Connected to xmlBlaster, numPublish=" + numPublish + " burstModePublish=" + burstModePublish + " dispatch/callback/burstMode/collectTime=" 
109               + glob.getProperty().get("dispatch/callback/burstMode/collectTime", 0L));
110       }
111       catch (Exception e) {
112           log.severe(e.toString());
113           e.printStackTrace();
114           assertTrue(e.toString(), false);
115       }
116 
117    }
118 
119 
120    /**
121     * Tears down the fixture.
122     * <p />
123     * cleaning up .... erase() the previous message OID and logout
124     */
125    protected void tearDown()
126    {
127       long avg = 0;
128       double elapsed = stopWatch.elapsed();
129       if (elapsed > 0.)
130          avg = (long)(1000.0 * numPublish / elapsed);
131       log.info(numPublish + " messages updated, average messages/second = " + avg + stopWatch.nice());
132 
133       String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
134                       "<key oid='" + publishOid + "' queryType='EXACT'>\n" +
135                       "</key>";
136       String qos = "<qos></qos>";
137       try {
138          EraseReturnQos[] arr = senderConnection.erase(xmlKey, qos);
139          if (arr.length != 1) log.severe("Erased " + arr.length + " messages:");
140       } catch(XmlBlasterException e) { log.severe("XmlBlasterException: " + e.getMessage()); }
141 
142       senderConnection.disconnect(null);
143    }
144 
145 
146    /**
147     * TEST: Subscribe to messages with XPATH.
148     * <p />
149     * The returned subscribeOid is checked
150     */
151    public void doSubscribeXPath()
152    {
153       if (log.isLoggable(Level.FINE)) log.fine("Subscribing using XPath syntax ...");
154 
155       String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
156                       "<key oid='' queryType='XPATH'>\n" +
157                       "   //LoadTestSub-AGENT" +
158                       "</key>";
159       String qos = "<qos></qos>";
160       numReceived = 0;
161       subscribeOid = null;
162       try {
163          subscribeOid = senderConnection.subscribe(xmlKey, qos).getSubscriptionId();
164          log.info("Success: Subscribe on " + subscribeOid + " done");
165       } catch(XmlBlasterException e) {
166          log.warning("XmlBlasterException: " + e.getMessage());
167          // assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
168       }
169    }
170 
171 
172    /**
173     * TEST: Construct a message and publish it.
174     * <p />
175     * The returned publishOid is checked
176     */
177    public void doPublish()
178    {
179       log.info("Publishing " + numPublish + " messages ...");
180 
181       numReceived = 0;
182       String xmlKey = "<key oid='" + publishOid + "' contentMime='" + contentMime + "' contentMimeExtended='" + contentMimeExtended + "'>\n" +
183                       "   <LoadTestSub-AGENT id='192.168.124.10' subId='1' type='generic'>" +
184                       "      <LoadTestSub-DRIVER id='FileProof' pollingFreq='10'>" +
185                       "      </LoadTestSub-DRIVER>"+
186                       "   </LoadTestSub-AGENT>" +
187                       "</key>";
188       String qos = "";
189       if (this.persistent) qos = "<qos><persistent>true</persistent></qos>";
190       //String qos = "<qos><persistent>true</persistent></qos>";
191 
192       MsgUnit[] arr = new MsgUnit[burstModePublish];
193       PublishReturnQos[] publishOids;
194       try {
195          for (int kk=0; kk<burstModePublish; kk++)
196             arr[kk] = new MsgUnit(xmlKey, someContent.getBytes(), qos);
197       }
198       catch (XmlBlasterException e) {
199          fail(e.getMessage());
200       }
201       stopWatch = new StopWatch();
202       try {
203          for (int ii=0; ii<numPublish; ) {
204             for (int jj=0; jj<burstModePublish; jj++) {
205                arr[jj] = new MsgUnit(arr[jj], null, new String(someContent + (ii+1)).getBytes(), null);
206             }
207             ii+=burstModePublish;
208             if (publishOneway)
209                senderConnection.publishOneway(arr);
210             else
211                publishOids = senderConnection.publishArr(arr);
212             /*
213             if (((ii+1) % 1) == 0)
214                log.info("Success: Publishing done: '" + someContent + "'");
215             */
216          }
217          long avg = 0;
218          double elapsed = stopWatch.elapsed();
219          if (elapsed > 0.)
220             avg = (long)(1000.0 * numPublish / elapsed);
221          log.info("Success: Publishing done, " + numPublish + " messages sent, average messages/second = " + avg);
222          //assertEquals("oid is different", oid, publishOid);
223       } catch(XmlBlasterException e) {
224          log.warning("XmlBlasterException: " + e.getMessage());
225          assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
226       }
227 
228       // assertTrue("returned publishOid == null", publishOid != null);
229       // assertNotEquals("returned publishOid", 0, publishOid.length());
230    }
231 
232 
233    /**
234     * TEST: Construct a message and publish it,<br />
235     * the previous XPath subscription should match and send an update.
236     */
237    public void testManyPublish()
238    {
239       doSubscribeXPath();
240       doPublish();
241       waitOnUpdate(60*1000L, numPublish);
242       assertEquals("numReceived after publishing", numPublish, numReceived); // message arrived?
243    }
244 
245    /**
246     * This is the callback method invoked from xmlBlaster
247     * delivering us a new asynchronous message. 
248     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
249     */
250    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
251    {
252       if (log.isLoggable(Level.FINER)) log.finer("Receiving update of a message ...");
253 
254       numReceived++;
255       if ((numReceived % 1000) == 0) {
256          long avg = (long)((double)numReceived / (double)(stopWatch.elapsed()/1000.));
257          String contentStr = new String(content);
258          String tok = "... " + contentStr.substring(contentStr.length() - 10);
259          log.info("Success: Update #" + numReceived + " received: '" + tok + "', average messages/second = " + avg);
260       }
261       messageArrived = true;
262       String currentContent = new String(content);
263       int val = -1;
264       if (lastContentNumber >= 0) {
265          String number = currentContent.substring(someContent.length());
266          try { val = new Integer(number).intValue(); } catch (NumberFormatException e) { log.severe(e.toString()); }
267          if (val <= lastContentNumber) {
268             log.severe("lastContent=" + lastContentNumber + " currentContent=" + currentContent);
269             //assertTrue("Sequence of received message is broken", false);
270          }
271       }
272       lastContentNumber = val;
273       return "";
274    }
275 
276 
277    /**
278     * Little helper, waits until the wanted number of messages are arrived
279     * or returns when the given timeout occurs.
280     * <p />
281     * @param timeout in milliseconds
282     * @param numWait how many messages to wait
283     */
284    private void waitOnUpdate(final long timeout, final int numWait)
285    {
286       long pollingInterval = 50L;  // check every 0.05 seconds
287       if (timeout < 50)  pollingInterval = timeout / 10L;
288       long sum = 0L;
289       while (numReceived < numWait) {
290          try {
291             Thread.sleep(pollingInterval);
292          }
293          catch( InterruptedException i)
294          {}
295          sum += pollingInterval;
296          if (sum > timeout) {
297             log.warning("Timeout of " + timeout + " occurred");
298             break;
299          }
300       }
301    }
302 
303 
304    /**
305     * Method is used by TestRunner to load these tests
306     */
307    public static Test suite()
308    {
309        TestSuite suite= new TestSuite();
310        String loginName = "Tim";
311        int numMsg = 200;
312        suite.addTest(new LoadTestSub(new Global(), "testManyPublish", loginName, "secret", numMsg, 200, false, false));
313        return suite;
314    }
315 
316    static void usage()
317    {
318       System.out.println("\nAvailable options:");
319       System.out.println("   -numPublish         Number of messages to send [5000].");
320       System.out.println("   -publish.burstMode  Collect given number of messages when publishing [1].");
321       System.out.println("   -publish.oneway     Send messages oneway (publish does not receive return value) [false].");
322       System.out.println("   -publish.persistent Send persistent messages if set to true, otherwise transient. [false].");
323       System.out.println(Global.instance().usage());
324    }
325 
326    /**
327     * Invoke: java org.xmlBlaster.test.stress.LoadTestSub
328     * <br />
329     * You can use the command line option -numPublish 5000 to change the number of messages sent.
330     * <br />
331     * @deprecated Use the TestRunner from the testsuite to run it:<p />
332     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.stress.LoadTestSub</pre>
333     */
334    public static void main(String args[])
335    {
336       Global glob = new Global();
337       int ret = glob.init(args);
338       if (ret != 0) {
339          usage();
340          System.out.println("Oneway Example: java -Xms18M -Xmx32M org.xmlBlaster.test.stress.LoadTestSub -publish.oneway true -burstMode/collectTime 500 -dispatch/callback/oneway true -dispatch/callback/burstMode/collectTime 200 -numPublish 5000 -protocol IOR");
341          System.out.println("Syn    Example: java -Xms18M -Xmx32M org.xmlBlaster.test.stress.LoadTestSub -publish.oneway false -dispatch/callback/oneway false -publish.burstMode 200 -dispatch/callback/burstMode/collectTime 200 -numPublish 5000 -protocol IOR");
342          System.exit(1);
343       }
344 
345       int numPublish = glob.getProperty().get("numPublish", 5000);
346       int burstModePublish = glob.getProperty().get("publish.burstMode", 1);
347       boolean publishOneway = glob.getProperty().get("publish.oneway", false);
348       boolean persistent = glob.getProperty().get("publish.persistent", false);
349 
350       LoadTestSub testSub = new LoadTestSub(glob, "LoadTestSub", glob.getProperty().get("name", "Tim"),
351                                             glob.getProperty().get("passwd", "secret"),
352                                             numPublish, burstModePublish, publishOneway, persistent);
353       testSub.setUp();
354       testSub.testManyPublish();
355       System.out.println("Success, hit a key to logout and exit");
356       try { System.in.read(); } catch(java.io.IOException e) {}
357       testSub.tearDown();
358    }
359 }


syntax highlighted by Code2HTML, v. 0.9.1