1 /*------------------------------------------------------------------------------
  2 Name:      TestSubManyClients.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Demo code for a client using xmlBlaster
  6 Version:   $Id: TestSubManyClients.java 17606 2008-12-07 12:31:06Z ruff $
  7 ------------------------------------------------------------------------------*/
  8 package org.xmlBlaster.test.qos;
  9 
 10 import org.xmlBlaster.util.StopWatch;
 11 
 12 import java.util.logging.Logger;
 13 import java.util.logging.Level;
 14 import org.xmlBlaster.util.Global;
 15 import org.xmlBlaster.client.qos.ConnectQos;
 16 import org.xmlBlaster.util.XmlBlasterException;
 17 import org.xmlBlaster.client.I_XmlBlasterAccess;
 18 import org.xmlBlaster.client.I_Callback;
 19 import org.xmlBlaster.client.key.UpdateKey;
 20 import org.xmlBlaster.client.qos.UpdateQos;
 21 import org.xmlBlaster.client.qos.PublishReturnQos;
 22 import org.xmlBlaster.client.qos.EraseReturnQos;
 23 import org.xmlBlaster.client.key.SubscribeKey;
 24 import org.xmlBlaster.client.qos.SubscribeQos;
 25 import org.xmlBlaster.client.key.PublishKey;
 26 import org.xmlBlaster.client.qos.PublishQos;
 27 import org.xmlBlaster.util.MsgUnit;
 28 
 29 import org.xmlBlaster.test.Util;
 30 import junit.framework.*;
 31 
 32 
 33 /**
 34  * This client tests the method subscribe() with a later publish() with XPath query.
 35  * <br />
 36  * The subscribe() should be recognized for this later arriving publish()
 37  * <p>
 38  * This client may be invoked multiple time on the same xmlBlaster server,
 39  * as it cleans up everything after his tests are done.
 40  * <p>
 41  * Invoke examples:<br />
 42  * <pre>
 43  *  java  -Xms10m -Xmx220m org.xmlBlaster.Main -logging WARNING
 44  *
 45  *  java org.xmlBlaster.test.qos.TestSubManyClients -numClients 10000 -dispatch/connection/protocol RMI -warn false
 46  *
 47  *  java junit.textui.TestRunner org.xmlBlaster.test.qos.TestSubManyClients
 48  *  java junit.swingui.TestRunner org.xmlBlaster.test.qos.TestSubManyClients
 49  * </pre>
 50  */
 51 public class TestSubManyClients extends TestCase implements I_Callback
 52 {
 53    private static String ME = "TestSubManyClients";
 54    private final Global glob;
 55    private static Logger log = Logger.getLogger(TestSubManyClients.class.getName());
 56 
 57    private final String publishOid1 = "dummy1";
 58    private final String publishOid2 = "dummy2";
 59    private I_XmlBlasterAccess oneConnection;
 60    private String oneName;
 61 
 62    private int numReceived = 0;         // error checking
 63    private final String contentMime = "text/xml";
 64    private final String contentMimeExtended = "1.0";
 65 
 66    class Client {
 67       String loginName;
 68       I_XmlBlasterAccess connection;
 69       String subscribeOid;
 70    }
 71 
 72    private int numClients;
 73    private Client[] manyClients;
 74 
 75    private StopWatch stopWatch = new StopWatch();
 76 
 77    /**
 78     * Constructs the TestSubManyClients object.
 79     * <p />
 80     * @param testName  The name used in the test suite
 81     * @param loginName The name to login to the xmlBlaster
 82     */
 83    public TestSubManyClients(Global glob, String testName, String loginName)
 84    {
 85       super(testName);
 86       this.glob = glob;
 87 
 88       this.oneName = loginName;
 89       numClients = glob.getProperty().get("numClients", 10);
 90    }
 91 
 92 
 93    /**
 94     * Sets up the fixture.
 95     * <p />
 96     * Connect to xmlBlaster and login
 97     */
 98    protected void setUp()
 99    {
100       log.info("Setting up test ...");
101       numReceived = 0;
102       try {
103          Global globOne = glob.getClone(null);
104          oneConnection = globOne.getXmlBlasterAccess(); // Find orb
105          String passwd = "secret";
106          ConnectQos qos = new ConnectQos(globOne, oneName, passwd);
107          oneConnection.connect(qos, this); // Login to xmlBlaster
108       }
109       catch (Exception e) {
110           log.severe("Login failed: " + e.toString());
111           e.printStackTrace();
112           assertTrue("Login failed: " + e.toString(), false);
113       }
114    }
115 
116 
117    /**
118     * Tears down the fixture.
119     * <p />
120     * cleaning up .... erase() the previous message OID and logout
121     */
122    protected void tearDown()
123    {
124       if (numReceived != numClients) {
125          log.severe("numClients=" + numClients + " but numReceived=" + numReceived);
126          assertEquals("numClients=" + numClients + " but numReceived=" + numReceived, numClients, numReceived);
127       }
128 
129 
130       if (manyClients != null) {
131          for (int ii=0; ii<numClients; ii++) {
132             Client sub = manyClients[ii];
133             sub.connection.disconnect(null);
134          }
135       }
136 
137 
138       {
139          String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
140                          "<key oid='" + publishOid1 + "' queryType='EXACT'>\n" +
141                          "</key>";
142          String qos = "<qos></qos>";
143          try {
144             EraseReturnQos[] arr = oneConnection.erase(xmlKey, qos);
145             assertEquals("Erase", 1, arr.length);
146          } catch(XmlBlasterException e) { fail("Erase-XmlBlasterException: " + e.getMessage()); }
147       }
148 
149       {
150          String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
151                          "<key oid='" + publishOid2 + "' queryType='EXACT'>\n" +
152                          "</key>";
153          String qos = "<qos></qos>";
154          try {
155             EraseReturnQos[] arr = oneConnection.erase(xmlKey, qos);
156             assertEquals("Ersae", 1, arr.length);
157          } catch(XmlBlasterException e) { fail("Erase-XmlBlasterException: " + e.getMessage()); }
158       }
159 
160       oneConnection.disconnect(null);
161       log.info("Logout done");
162    }
163 
164 
165    /**
166     * Many clients subscribe to a message.
167     */
168    public void subcribeMany()
169    {
170       if (log.isLoggable(Level.FINE)) log.fine("Subscribing ...");
171 
172       String passwd = "secret";
173 
174       SubscribeKey subKeyW = new SubscribeKey(glob, publishOid1);
175       String subKey = subKeyW.toXml(); // "<key oid='" + publishOid1 + "' queryType='EXACT'></key>";
176 
177       SubscribeQos subQosW = new SubscribeQos(glob); // "<qos></qos>";
178       String subQos = subQosW.toXml();
179 
180       manyClients = new Client[numClients];
181 
182       long usedBefore = getUsedServerMemory();
183 
184       log.info("Setting up " + numClients + " subscriber clients ...");
185 
186       stopWatch = new StopWatch();
187       for (int ii=0; ii<numClients; ii++) {
188          Client sub = new Client();
189          sub.loginName = "Joe-" + ii;
190 
191          try {
192             Global globTmp = glob.getClone(null);
193             sub.connection = globTmp.getXmlBlasterAccess();
194             ConnectQos loginQosW = new ConnectQos(globTmp, sub.loginName, passwd); // "<qos></qos>"; During login this is manipulated (callback address added)
195             sub.connection.connect(loginQosW, this);
196          }
197          catch (Exception e) {
198              log.severe("Login failed: " + e.toString());
199              assertTrue("Login failed: " + e.toString(), false);
200          }
201 
202          try {
203             sub.subscribeOid = sub.connection.subscribe(subKey, subQos).getSubscriptionId();
204             log.info("Client " + sub.loginName + " subscribed to " + subKeyW.getOid());
205          } catch(XmlBlasterException e) {
206             log.warning("XmlBlasterException: " + e.getMessage());
207             assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
208          }
209 
210          manyClients[ii] = sub;
211       }
212       double timeForLogins = stopWatch.elapsed()/1000.; // msec -> sec
213 
214 
215       long usedAfter = getUsedServerMemory();
216       long memPerLogin = (usedAfter - usedBefore)/numClients;
217 
218       log.info(numClients + " subscriber clients are ready.");
219       log.info("Server memory per login consumed=" + memPerLogin);
220       log.info("Time " + (long)(numClients/timeForLogins) + " logins/sec");
221    }
222 
223 
224    /**
225     * Query xmlBlaster for its current memory consumption. 
226     */
227    long getUsedServerMemory() {
228       String xmlKey = "<key oid='__cmd:?usedMem' queryType='EXACT'></key>";
229       String qos = "<qos></qos>";
230       try {
231          MsgUnit[] msgArr = oneConnection.get(xmlKey, qos);
232          String mem = new String(msgArr[0].getContent());
233          return new Long(mem).longValue();
234       } catch (XmlBlasterException e) {
235          log.warning(e.toString());
236          return 0L;
237       }
238    }
239 
240 
241    /**
242     * TEST: Construct a message and publish it.
243     * <p />
244     * The returned publishOid1 is checked
245     */
246    public void publishOne()
247    {
248       if (log.isLoggable(Level.FINE)) log.fine("Publishing a message ...");
249 
250       numReceived = 0;
251       String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
252                       "<key oid='" + publishOid1 + "' contentMime='" + contentMime + "' contentMimeExtended='" + contentMimeExtended + "'>\n" +
253                       "</key>";
254       String senderContent = "Yeahh, i'm the new content";
255       try {
256          MsgUnit msgUnit = new MsgUnit(xmlKey, senderContent.getBytes(), "<qos></qos>");
257          stopWatch = new StopWatch();
258          String tmp = oneConnection.publish(msgUnit).getKeyOid();
259          assertEquals("Wrong publishOid1", publishOid1, tmp);
260          log.info("Success: Publishing done, returned oid=" + publishOid1);
261       } catch(XmlBlasterException e) {
262          log.warning("XmlBlasterException: " + e.getMessage());
263          assertTrue("publishOne - XmlBlasterException: " + e.getMessage(), false);
264       }
265    }
266 
267 
268    /**
269     * TEST: Construct a message and publish it,
270     * all clients should receive an update. 
271     */
272    public void testManyClients()
273    {
274       System.out.println("");
275       log.info("TEST 1, many publishers, one subscriber ...");
276 
277       subcribeMany();
278       try { Thread.sleep(1000L); } catch( InterruptedException i) {}                                            // Wait some time for callback to arrive ...
279       assertEquals("numReceived after subscribe", 0, numReceived);  // there should be no Callback
280 
281       publishOne();
282       log.info("Waiting long enough for updates ...");
283       Util.delay(2000L + 10 * numClients);                          // Wait some time for callback to arrive ...
284       assertEquals("Wrong number of updates", numClients, numReceived);
285 
286 
287       System.out.println("");
288       log.info("TEST 2, many publishers, one subscriber ...");
289 
290       subcribeOne();
291       try { Thread.sleep(100L); } catch( InterruptedException i) {}                                             // Wait some time ...
292 
293       numReceived = 0;
294       publishMany();
295       log.info("Waiting long enough for updates ...");
296       Util.delay(2000L + 10 * numClients);                          // Wait some time for callback to arrive ...
297       assertEquals("Wrong number of updates", numClients, numReceived);
298    }
299 
300 
301    /**
302     * One client subscribes to a message. 
303     */
304    public void subcribeOne()
305    {
306       if (log.isLoggable(Level.FINE)) log.fine("Subscribing ...");
307 
308       SubscribeKey subKeyW = new SubscribeKey(glob, publishOid2);
309       String subKey = subKeyW.toXml(); // "<key oid='" + publishOid2 + "' queryType='EXACT'></key>";
310 
311       SubscribeQos subQosW = new SubscribeQos(glob); // "<qos></qos>";
312       String subQos = subQosW.toXml();
313 
314       try {
315          oneConnection.subscribe(subKey, subQos);
316          log.info("Client " + oneName + " subscribed to " + subKeyW.getOid());
317       } catch(XmlBlasterException e) {
318          log.warning("XmlBlasterException: " + e.getMessage());
319          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
320       }
321    }
322 
323 
324    /**
325     * TEST: Construct a message and publish it.
326     * <p />
327     * The returned publishOid1 is checked
328     */
329    public void publishMany()
330    {
331       if (log.isLoggable(Level.FINE)) log.fine("Publishing a message ...");
332 
333       PublishKey pubKeyW = new PublishKey(glob, publishOid2, contentMime, contentMimeExtended);
334       String pubKey = pubKeyW.toXml(); // "<key oid='" + publishOid2 + "' contentMime='" + contentMime + "' contentMimeExtended='" + contentMimeExtended + "'></key>"
335 
336       PublishQos pubQosW = new PublishQos(glob);
337       String pubQos = pubQosW.toXml(); // "<qos></qos>"
338 
339       long usedBefore = getUsedServerMemory();
340 
341       log.info(numClients + " clients are publishing one message each ...");
342 
343       stopWatch = new StopWatch();
344 
345       for (int ii=0; ii<numClients; ii++) {
346          Client client = manyClients[ii];
347          // The content changes, equal contents would not be updated to the subscriber without <forceUpdate/>
348          String senderContent = "New content from publisher " + client.loginName;
349          try {
350             MsgUnit msgUnit = new MsgUnit(pubKey, senderContent.getBytes(), pubQos);
351             PublishReturnQos tmp = oneConnection.publish(msgUnit);
352             assertEquals("Wrong publishOid2", publishOid2, tmp.getKeyOid());
353          } catch(XmlBlasterException e) {
354             log.warning("XmlBlasterException: " + e.getMessage());
355             assertTrue("publishOne - XmlBlasterException: " + e.getMessage(), false);
356          }
357       }
358 
359       double timeToPublish = stopWatch.elapsed()/1000.; // msec -> sec
360 
361 
362       long usedAfter = getUsedServerMemory();
363       long memPerLogin = (usedAfter - usedBefore)/numClients;
364 
365       log.info(numClients + " have published their messages.");
366       log.info("Server memory consumed=" + memPerLogin + " bytes.");
367       log.info("Time " + (long)(numClients/timeToPublish) + " publish/sec");
368    }
369 
370    /**
371     * This is the callback method invoked from xmlBlaster
372     * delivering us a new asynchronous message. 
373     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
374     */
375    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
376    {
377       //log.info("Client " + loginName + " receiving update of message oid=" + updateKey.getOid() + "...");
378       numReceived++;
379 
380       if (numReceived == numClients) {
381          long avg = 0;
382          double elapsed = stopWatch.elapsed();
383          if (elapsed > 0.)
384             avg = (long)(1000.0 * numReceived / elapsed);
385          log.info(numReceived + " messages updated, average messages/second = " + avg + stopWatch.nice());
386       }
387       return "";
388    }
389 
390    /**
391     * Method is used by TestRunner to load these tests
392     */
393    public static Test suite()
394    {
395        TestSuite suite= new TestSuite();
396        String loginName = "Tim";
397        suite.addTest(new TestSubManyClients(new Global(), "testManyClients", loginName));
398        return suite;
399    }
400 
401 
402    /**
403     * Invoke: java org.xmlBlaster.test.qos.TestSubManyClients
404     * @deprecated Use the TestRunner from the testsuite to run it:<p />
405     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.qos.TestSubManyClients</pre>
406     */
407    public static void main(String args[])
408    {
409       Global glob = new Global();
410       if (glob.init(args) != 0) {
411          System.err.println(ME + ": Init failed");
412          System.exit(1);
413       }
414       TestSubManyClients testSub = new TestSubManyClients(glob, "TestSubManyClients", "Tim");
415       testSub.setUp();
416       testSub.testManyClients();
417       testSub.tearDown();
418    }
419 }


syntax highlighted by Code2HTML, v. 0.9.1