1 /*------------------------------------------------------------------------------
2 Name: LoadTestSub.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Load test for xmlBlaster
6 Version: $Id: LoadTestSub.java 14846 2006-03-07 17:14:22Z ruff $
7 ------------------------------------------------------------------------------*/
8 package org.xmlBlaster.test.stress;
9
10 import org.xmlBlaster.util.StopWatch;
11
12 import java.util.logging.Logger;
13 import java.util.logging.Level;
14 import org.xmlBlaster.util.Global;
15 import org.xmlBlaster.util.XmlBlasterException;
16 import org.xmlBlaster.util.MsgUnit;
17 import org.xmlBlaster.client.I_XmlBlasterAccess;
18 import org.xmlBlaster.client.key.UpdateKey;
19 import org.xmlBlaster.client.qos.ConnectQos;
20 import org.xmlBlaster.client.qos.UpdateQos;
21 import org.xmlBlaster.client.qos.PublishReturnQos;
22 import org.xmlBlaster.client.qos.EraseReturnQos;
23 import org.xmlBlaster.client.I_Callback;
24
25 import junit.framework.*;
26
27
28 /**
29 * This client does a subscribe() with many publish() calls.<br />
30 * The same message is published 1000 times, to measure messages/second performance.
31 * <p />
32 * This client may be invoked multiple time on the same xmlBlaster server,
33 * as it cleans up everything after his tests are done.
34 * <p>
35 * Invoke examples:<br />
36 * <pre>
37 * java junit.textui.TestRunner org.xmlBlaster.test.stress.LoadTestSub
38 * java junit.swingui.TestRunner org.xmlBlaster.test.stress.LoadTestSub
39 * </pre>
40 */
41 public class LoadTestSub extends TestCase implements I_Callback
42 {
43 private static String ME = "LoadTestSub";
44 private boolean messageArrived = false;
45 private StopWatch stopWatch = null;
46 private Global glob;
47 private static Logger log = Logger.getLogger(LoadTestSub.class.getName());
48
49 private String subscribeOid;
50 private String publishOid = "LoadTestSub";
51 private I_XmlBlasterAccess senderConnection;
52 private String senderName;
53 private String receiverName; // sender/receiver is here the same client
54 private String passwd;
55
56 private final int numPublish; // 200;
57 private int numReceived = 0; // error checking
58 private int burstModePublish = 1;
59 private boolean publishOneway = false;
60 private boolean persistent = false;
61 private final String contentMime = "text/plain";
62 private final String contentMimeExtended = "1.0";
63 private int lastContentNumber = -1;
64 private final String someContent = "Yeahh, i'm the new content, my total length is big an bigger but still i want to be longer and longer until i have reached some 180 bytes, here is remaining blahh to fill the last";
65
66 public LoadTestSub() { // JUNIT
67 this(new Global(), "LoadTestSub", "LoadTestSub", "secret", 1000, 1, false, false);
68 }
69
70 /**
71 * Constructs the LoadTestSub object.
72 * <p />
73 * @param testName The name used in the test suite
74 * @param loginName The name to login to the xmlBlaster
75 * @param numPublish The number of messages to send
76 * @param burstModePublish send given number of publish messages in one bulk
77 */
78 public LoadTestSub(Global glob, String testName, String loginName, String passwd, int numPublish, int burstModePublish, boolean publishOneway, boolean persistent)
79 {
80 super(testName);
81 this.glob = glob;
82
83 this.senderName = loginName;
84 this.receiverName = loginName;
85 this.passwd = passwd;
86 this.numPublish = numPublish;
87 this.burstModePublish = burstModePublish;
88 this.publishOneway = publishOneway;
89 this.persistent = persistent;
90 }
91
92
93 /**
94 * Sets up the fixture.
95 * <p />
96 * Connect to xmlBlaster and login
97 */
98 protected void setUp()
99 {
100 try {
101 senderConnection = glob.getXmlBlasterAccess();
102 ConnectQos connectQos = new ConnectQos(glob, senderName, passwd);
103 senderConnection.connect(connectQos, this); // Login to xmlBlaster
104 if (burstModePublish > numPublish)
105 burstModePublish = numPublish;
106 if ((numPublish % burstModePublish) != 0)
107 log.severe("numPublish should by dividable by publish.burstMode");
108 log.info("Connected to xmlBlaster, numPublish=" + numPublish + " burstModePublish=" + burstModePublish + " dispatch/callback/burstMode/collectTime="
109 + glob.getProperty().get("dispatch/callback/burstMode/collectTime", 0L));
110 }
111 catch (Exception e) {
112 log.severe(e.toString());
113 e.printStackTrace();
114 assertTrue(e.toString(), false);
115 }
116
117 }
118
119
120 /**
121 * Tears down the fixture.
122 * <p />
123 * cleaning up .... erase() the previous message OID and logout
124 */
125 protected void tearDown()
126 {
127 long avg = 0;
128 double elapsed = stopWatch.elapsed();
129 if (elapsed > 0.)
130 avg = (long)(1000.0 * numPublish / elapsed);
131 log.info(numPublish + " messages updated, average messages/second = " + avg + stopWatch.nice());
132
133 String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
134 "<key oid='" + publishOid + "' queryType='EXACT'>\n" +
135 "</key>";
136 String qos = "<qos></qos>";
137 try {
138 EraseReturnQos[] arr = senderConnection.erase(xmlKey, qos);
139 if (arr.length != 1) log.severe("Erased " + arr.length + " messages:");
140 } catch(XmlBlasterException e) { log.severe("XmlBlasterException: " + e.getMessage()); }
141
142 senderConnection.disconnect(null);
143 }
144
145
146 /**
147 * TEST: Subscribe to messages with XPATH.
148 * <p />
149 * The returned subscribeOid is checked
150 */
151 public void doSubscribeXPath()
152 {
153 if (log.isLoggable(Level.FINE)) log.fine("Subscribing using XPath syntax ...");
154
155 String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
156 "<key oid='' queryType='XPATH'>\n" +
157 " //LoadTestSub-AGENT" +
158 "</key>";
159 String qos = "<qos></qos>";
160 numReceived = 0;
161 subscribeOid = null;
162 try {
163 subscribeOid = senderConnection.subscribe(xmlKey, qos).getSubscriptionId();
164 log.info("Success: Subscribe on " + subscribeOid + " done");
165 } catch(XmlBlasterException e) {
166 log.warning("XmlBlasterException: " + e.getMessage());
167 // assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
168 }
169 }
170
171
172 /**
173 * TEST: Construct a message and publish it.
174 * <p />
175 * The returned publishOid is checked
176 */
177 public void doPublish()
178 {
179 log.info("Publishing " + numPublish + " messages ...");
180
181 numReceived = 0;
182 String xmlKey = "<key oid='" + publishOid + "' contentMime='" + contentMime + "' contentMimeExtended='" + contentMimeExtended + "'>\n" +
183 " <LoadTestSub-AGENT id='192.168.124.10' subId='1' type='generic'>" +
184 " <LoadTestSub-DRIVER id='FileProof' pollingFreq='10'>" +
185 " </LoadTestSub-DRIVER>"+
186 " </LoadTestSub-AGENT>" +
187 "</key>";
188 String qos = "";
189 if (this.persistent) qos = "<qos><persistent>true</persistent></qos>";
190 //String qos = "<qos><persistent>true</persistent></qos>";
191
192 MsgUnit[] arr = new MsgUnit[burstModePublish];
193 PublishReturnQos[] publishOids;
194 try {
195 for (int kk=0; kk<burstModePublish; kk++)
196 arr[kk] = new MsgUnit(xmlKey, someContent.getBytes(), qos);
197 }
198 catch (XmlBlasterException e) {
199 fail(e.getMessage());
200 }
201 stopWatch = new StopWatch();
202 try {
203 for (int ii=0; ii<numPublish; ) {
204 for (int jj=0; jj<burstModePublish; jj++) {
205 arr[jj] = new MsgUnit(arr[jj], null, new String(someContent + (ii+1)).getBytes(), null);
206 }
207 ii+=burstModePublish;
208 if (publishOneway)
209 senderConnection.publishOneway(arr);
210 else
211 publishOids = senderConnection.publishArr(arr);
212 /*
213 if (((ii+1) % 1) == 0)
214 log.info("Success: Publishing done: '" + someContent + "'");
215 */
216 }
217 long avg = 0;
218 double elapsed = stopWatch.elapsed();
219 if (elapsed > 0.)
220 avg = (long)(1000.0 * numPublish / elapsed);
221 log.info("Success: Publishing done, " + numPublish + " messages sent, average messages/second = " + avg);
222 //assertEquals("oid is different", oid, publishOid);
223 } catch(XmlBlasterException e) {
224 log.warning("XmlBlasterException: " + e.getMessage());
225 assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
226 }
227
228 // assertTrue("returned publishOid == null", publishOid != null);
229 // assertNotEquals("returned publishOid", 0, publishOid.length());
230 }
231
232
233 /**
234 * TEST: Construct a message and publish it,<br />
235 * the previous XPath subscription should match and send an update.
236 */
237 public void testManyPublish()
238 {
239 doSubscribeXPath();
240 doPublish();
241 waitOnUpdate(60*1000L, numPublish);
242 assertEquals("numReceived after publishing", numPublish, numReceived); // message arrived?
243 }
244
245 /**
246 * This is the callback method invoked from xmlBlaster
247 * delivering us a new asynchronous message.
248 * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
249 */
250 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
251 {
252 if (log.isLoggable(Level.FINER)) log.finer("Receiving update of a message ...");
253
254 numReceived++;
255 if ((numReceived % 1000) == 0) {
256 long avg = (long)((double)numReceived / (double)(stopWatch.elapsed()/1000.));
257 String contentStr = new String(content);
258 String tok = "... " + contentStr.substring(contentStr.length() - 10);
259 log.info("Success: Update #" + numReceived + " received: '" + tok + "', average messages/second = " + avg);
260 }
261 messageArrived = true;
262 String currentContent = new String(content);
263 int val = -1;
264 if (lastContentNumber >= 0) {
265 String number = currentContent.substring(someContent.length());
266 try { val = new Integer(number).intValue(); } catch (NumberFormatException e) { log.severe(e.toString()); }
267 if (val <= lastContentNumber) {
268 log.severe("lastContent=" + lastContentNumber + " currentContent=" + currentContent);
269 //assertTrue("Sequence of received message is broken", false);
270 }
271 }
272 lastContentNumber = val;
273 return "";
274 }
275
276
277 /**
278 * Little helper, waits until the wanted number of messages are arrived
279 * or returns when the given timeout occurs.
280 * <p />
281 * @param timeout in milliseconds
282 * @param numWait how many messages to wait
283 */
284 private void waitOnUpdate(final long timeout, final int numWait)
285 {
286 long pollingInterval = 50L; // check every 0.05 seconds
287 if (timeout < 50) pollingInterval = timeout / 10L;
288 long sum = 0L;
289 while (numReceived < numWait) {
290 try {
291 Thread.sleep(pollingInterval);
292 }
293 catch( InterruptedException i)
294 {}
295 sum += pollingInterval;
296 if (sum > timeout) {
297 log.warning("Timeout of " + timeout + " occurred");
298 break;
299 }
300 }
301 }
302
303
304 /**
305 * Method is used by TestRunner to load these tests
306 */
307 public static Test suite()
308 {
309 TestSuite suite= new TestSuite();
310 String loginName = "Tim";
311 int numMsg = 200;
312 suite.addTest(new LoadTestSub(new Global(), "testManyPublish", loginName, "secret", numMsg, 200, false, false));
313 return suite;
314 }
315
316 static void usage()
317 {
318 System.out.println("\nAvailable options:");
319 System.out.println(" -numPublish Number of messages to send [5000].");
320 System.out.println(" -publish.burstMode Collect given number of messages when publishing [1].");
321 System.out.println(" -publish.oneway Send messages oneway (publish does not receive return value) [false].");
322 System.out.println(" -publish.persistent Send persistent messages if set to true, otherwise transient. [false].");
323 System.out.println(Global.instance().usage());
324 }
325
326 /**
327 * Invoke: java org.xmlBlaster.test.stress.LoadTestSub
328 * <br />
329 * You can use the command line option -numPublish 5000 to change the number of messages sent.
330 * <br />
331 * @deprecated Use the TestRunner from the testsuite to run it:<p />
332 * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.stress.LoadTestSub</pre>
333 */
334 public static void main(String args[])
335 {
336 Global glob = new Global();
337 int ret = glob.init(args);
338 if (ret != 0) {
339 usage();
340 System.out.println("Oneway Example: java -Xms18M -Xmx32M org.xmlBlaster.test.stress.LoadTestSub -publish.oneway true -burstMode/collectTime 500 -dispatch/callback/oneway true -dispatch/callback/burstMode/collectTime 200 -numPublish 5000 -protocol IOR");
341 System.out.println("Syn Example: java -Xms18M -Xmx32M org.xmlBlaster.test.stress.LoadTestSub -publish.oneway false -dispatch/callback/oneway false -publish.burstMode 200 -dispatch/callback/burstMode/collectTime 200 -numPublish 5000 -protocol IOR");
342 System.exit(1);
343 }
344
345 int numPublish = glob.getProperty().get("numPublish", 5000);
346 int burstModePublish = glob.getProperty().get("publish.burstMode", 1);
347 boolean publishOneway = glob.getProperty().get("publish.oneway", false);
348 boolean persistent = glob.getProperty().get("publish.persistent", false);
349
350 LoadTestSub testSub = new LoadTestSub(glob, "LoadTestSub", glob.getProperty().get("name", "Tim"),
351 glob.getProperty().get("passwd", "secret"),
352 numPublish, burstModePublish, publishOneway, persistent);
353 testSub.setUp();
354 testSub.testManyPublish();
355 System.out.println("Success, hit a key to logout and exit");
356 try { System.in.read(); } catch(java.io.IOException e) {}
357 testSub.tearDown();
358 }
359 }
syntax highlighted by Code2HTML, v. 0.9.1