1 /*------------------------------------------------------------------------------
  2 Name:      BigMessage.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.stress;
  7 
  8 import java.util.logging.Logger;
  9 import java.util.logging.Level;
 10 import org.xmlBlaster.util.StopWatch;
 11 import org.xmlBlaster.util.Global;
 12 import org.xmlBlaster.util.XmlBlasterException;
 13 import org.xmlBlaster.client.qos.ConnectQos;
 14 import org.xmlBlaster.client.qos.DisconnectQos;
 15 import org.xmlBlaster.client.I_Callback;
 16 import org.xmlBlaster.client.I_XmlBlasterAccess;
 17 import org.xmlBlaster.client.qos.PublishQos;
 18 import org.xmlBlaster.client.key.UpdateKey;
 19 import org.xmlBlaster.client.qos.UpdateQos;
 20 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 21 import org.xmlBlaster.client.key.GetKey;
 22 import org.xmlBlaster.client.qos.EraseReturnQos;
 23 import org.xmlBlaster.util.MsgUnit;
 24 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 25 import org.xmlBlaster.test.Util;
 26 
 27 import junit.framework.*;
 28 
 29 
 30 /**
 31  * This client tests a message of 2 Megabytes published and subscribed
 32  * <p>
 33  * We start our own xmlBlaster server in a thread.
 34  * </p>
 35  * Invoke examples:<br />
 36  * <pre>
 37  *    java junit.textui.TestRunner org.xmlBlaster.test.stress.BigMessage
 38  *    java junit.swingui.TestRunner -noloading org.xmlBlaster.test.stress.BigMessage
 39  * </pre>
 40  */
 41 public class BigMessage extends TestCase implements I_Callback
 42 {
 43    private static String ME = "BigMessage";
 44    private final Global glob;
 45    private static Logger log = Logger.getLogger(BigMessage.class.getName());
 46 
 47    private I_XmlBlasterAccess con = null;
 48    private String name;
 49    private String passwd = "secret";
 50    private EmbeddedXmlBlaster serverThread;
 51    private boolean startEmbedded = true;
 52    private int serverPort = 7615;
 53    private String oid = "BigMessage";
 54    private int contentSize = 3 * 1000 * 1000; // 3 MB
 55 
 56    private boolean messageArrived = false;
 57    private String assertInUpdate = null;
 58 
 59    private StopWatch stopWatchRoundTrip = null;
 60 
 61    /**
 62     * Constructs the BigMessage object. 
 63     * <p />
 64     * @param testName   The name used in the test suite
 65     */
 66    public BigMessage(String testName) {
 67       super(testName);
 68       this.glob = Global.instance();
 69 
 70       this.name = testName; // name to login to xmlBlaster
 71    }
 72 
 73    /**
 74     * Sets up the fixture.
 75     * <p />
 76     * We start an own xmlBlaster server in a separate thread,
 77     * it is configured to load the tinySQL BigMessage driver to test SQL access (with dBase files)
 78     * <p />
 79     * Then we connect as a client
 80     */
 81    protected void setUp() {
 82       this.contentSize = glob.getProperty().get("contentSize", contentSize);
 83       this.startEmbedded = glob.getProperty().get("startEmbedded", this.startEmbedded);
 84 
 85       if (this.startEmbedded) {
 86          glob.init(Util.getOtherServerPorts(serverPort));
 87          serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
 88          log.info("XmlBlaster is ready for testing a big message");
 89       }
 90       else
 91          log.warning("You need to start an external xmlBlaster server for this test or use option -startEmbedded true");
 92 
 93       try {
 94          log.info("Connecting ...");
 95          con = glob.getXmlBlasterAccess();
 96          ConnectQos qos = new ConnectQos(glob, name, passwd);
 97          con.connect(qos, this); // Login to xmlBlaster
 98       }
 99       catch (Exception e) {
100          Thread.currentThread().dumpStack();
101          log.severe("Can't connect to xmlBlaster: " + e.toString());
102       }
103    }
104 
105    /**
106     * Tears down the fixture.
107     * <p />
108     * cleaning up .... erase() the previous message OID and logout
109     */
110    protected void tearDown() {
111       try {
112          log.info("Erasing message " + oid + " ...");
113          EraseReturnQos[] arr = con.erase("<key oid='" + oid + "'/>", "<qos/>");
114          log.info("Erasing of message " + oid + " done.");
115          assertEquals("Wrong number of message erased", 1, arr.length);
116          assertTrue(assertInUpdate, assertInUpdate == null);
117       } catch(XmlBlasterException e) { log.severe("XmlBlasterException: " + e.getMessage()); }
118 
119       con.disconnect(null);
120       con=null;
121 
122       if (this.startEmbedded) {
123          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
124          this.serverThread = null;
125          // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
126          Util.resetPorts();
127       }
128    }
129 
130    /**
131     * Create a RDBMS table, fill some data and destroy it again. 
132     * We use the tinySQL dBase BigMessage driver for testing.
133     */
134    public void testBigMessage() {
135       log.info("######## Start testBigMessage()");
136       StopWatch stopWatch = null;
137 
138       stopWatch = new StopWatch();
139       byte[] content = new byte[contentSize];
140       for (int i=0; i<content.length; i++) {
141          content[i] = (byte)(i % 255);
142       }
143       log.info("Allocated message content with size=" + content.length/1000000 + " MB");
144       try {
145          PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>"
146          MsgUnit msgUnit = new MsgUnit("<key oid='" + oid + "'/>", content, "<qos/>");
147          stopWatch = new StopWatch();
148          stopWatchRoundTrip = new StopWatch();
149 
150          con.publish(msgUnit);
151 
152          long avg = 0;
153          long elapsed = stopWatch.elapsed();
154          if (elapsed > 0L)
155             avg = ((long)(contentSize)) / elapsed; // byte/milli == kbyte/sec
156 
157          log.info("Success: Publishing of " + oid + " with size=" + contentSize/1000000 + " MB done, avg=" + avg + " KB/sec " + stopWatch.nice());
158       } catch(XmlBlasterException e) {
159          log.severe("XmlBlasterException: " + e.getMessage());
160          fail("Can't publish huge message: " + e.getMessage()); 
161       }
162 
163       messageArrived = false;
164 
165       try {
166          SubscribeReturnQos subscriptionId = con.subscribe("<key oid='" + oid + "'/>", "<qos/>");
167          log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done");
168       } catch(XmlBlasterException e) {
169          log.severe("XmlBlasterException: " + e.getMessage());
170          fail("subscribe - XmlBlasterException: " + e.getMessage());
171       }
172 
173       waitOnUpdate(20000L);
174       assertTrue(assertInUpdate, assertInUpdate == null);
175       assertEquals("Message not arrived", true, messageArrived);
176 
177       // Allow the update to return to xmlBlaster ...
178       try { Thread.sleep(3000L); } catch( InterruptedException i) {}
179       log.info("######## End testBigMessage()");
180    }
181 
182    /**
183     * This is the callback method invoked from xmlBlaster
184     * delivering us a new asynchronous message. 
185     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
186     */
187    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
188    {
189       log.info("Receiving update of a message state=" + updateQos.getState());
190 
191       if (updateQos.isErased()) {
192          log.info("Ignore erase event");
193          return ""; // We ignore the erase event on tearDown
194       }
195 
196       long elapsed = stopWatchRoundTrip.elapsed();
197       long avg = 0;
198       if (elapsed > 0L)
199             avg = ((long)(contentSize)) / elapsed; // byte/milli == kbyte/sec
200 
201 
202       log.info("Receiving update of message oid=" + updateKey.getOid() + 
203                    " size=" + content.length + " ...");
204 
205       log.info("Success: Publish+Update of " + oid + " with size=" + contentSize/1000000 + " MB done, roundtrip avg=" +
206                avg + " KB/sec " + stopWatchRoundTrip.nice());
207 
208       assertInUpdate = "Wrong sender, expected:" + name + " but was:" + updateQos.getSender().getLoginName();
209       assertEquals("Wrong sender", name, updateQos.getSender().getLoginName());
210 
211       assertInUpdate = "Wrong oid of message returned expected:" + oid + " but was:" + updateKey.getOid();
212       assertEquals("Message oid is wrong", oid, updateKey.getOid());
213 
214       assertInUpdate = "Wrong message size arrived in update, expected:" + contentSize + " but was:" + content.length;
215       assertEquals("Wrong message size arrived", contentSize, content.length);
216 
217       assertInUpdate = null;
218       messageArrived = true;
219       return "";
220    }
221 
222    /**
223     * Little helper, waits until the variable 'messageArrive' is set
224     * to true, or fails when the given timeout occurs.
225     * @param timeout in milliseconds
226     */
227    private void waitOnUpdate(final long timeout)
228    {
229       long pollingInterval = 50L;  // check every 0.05 seconds
230       if (timeout < 50)  pollingInterval = timeout / 10L;
231       long sum = 0L;
232       while (!messageArrived) {
233          try {
234             Thread.sleep(pollingInterval);
235          }
236          catch( InterruptedException i)
237          {}
238          sum += pollingInterval;
239          if (sum > timeout) {
240             log.info("Timeout of " + timeout + " occurred");
241             fail("Timeout of " + timeout + " occurred");
242          }
243       }
244    }
245 
246    /**
247     * Invoke: 
248     * <pre>
249     *  java org.xmlBlaster.test.stress.BigMessage -contentSize 2000000 -startEmbedded false
250     * <pre>
251     */
252    public static void main(String args[]) {
253       Global glob = new Global();
254       if (glob.init(args) != 0) {
255          System.exit(0);
256       }
257       BigMessage big = new BigMessage("BigMessage");
258       big.setUp();
259       big.testBigMessage();
260       big.tearDown();
261    }
262 }


syntax highlighted by Code2HTML, v. 0.9.1