1 /*------------------------------------------------------------------------------
2 Name: BigMessage.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.stress;
7
8 import java.util.logging.Logger;
9 import java.util.logging.Level;
10 import org.xmlBlaster.util.StopWatch;
11 import org.xmlBlaster.util.Global;
12 import org.xmlBlaster.util.XmlBlasterException;
13 import org.xmlBlaster.client.qos.ConnectQos;
14 import org.xmlBlaster.client.qos.DisconnectQos;
15 import org.xmlBlaster.client.I_Callback;
16 import org.xmlBlaster.client.I_XmlBlasterAccess;
17 import org.xmlBlaster.client.qos.PublishQos;
18 import org.xmlBlaster.client.key.UpdateKey;
19 import org.xmlBlaster.client.qos.UpdateQos;
20 import org.xmlBlaster.client.qos.SubscribeReturnQos;
21 import org.xmlBlaster.client.key.GetKey;
22 import org.xmlBlaster.client.qos.EraseReturnQos;
23 import org.xmlBlaster.util.MsgUnit;
24 import org.xmlBlaster.util.EmbeddedXmlBlaster;
25 import org.xmlBlaster.test.Util;
26
27 import junit.framework.*;
28
29
30 /**
31 * This client tests a message of 2 Megabytes published and subscribed
32 * <p>
33 * We start our own xmlBlaster server in a thread.
34 * </p>
35 * Invoke examples:<br />
36 * <pre>
37 * java junit.textui.TestRunner org.xmlBlaster.test.stress.BigMessage
38 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.stress.BigMessage
39 * </pre>
40 */
41 public class BigMessage extends TestCase implements I_Callback
42 {
43 private static String ME = "BigMessage";
44 private final Global glob;
45 private static Logger log = Logger.getLogger(BigMessage.class.getName());
46
47 private I_XmlBlasterAccess con = null;
48 private String name;
49 private String passwd = "secret";
50 private EmbeddedXmlBlaster serverThread;
51 private boolean startEmbedded = true;
52 private int serverPort = 7615;
53 private String oid = "BigMessage";
54 private int contentSize = 3 * 1000 * 1000; // 3 MB
55
56 private boolean messageArrived = false;
57 private String assertInUpdate = null;
58
59 private StopWatch stopWatchRoundTrip = null;
60
61 /**
62 * Constructs the BigMessage object.
63 * <p />
64 * @param testName The name used in the test suite
65 */
66 public BigMessage(String testName) {
67 super(testName);
68 this.glob = Global.instance();
69
70 this.name = testName; // name to login to xmlBlaster
71 }
72
73 /**
74 * Sets up the fixture.
75 * <p />
76 * We start an own xmlBlaster server in a separate thread,
77 * it is configured to load the tinySQL BigMessage driver to test SQL access (with dBase files)
78 * <p />
79 * Then we connect as a client
80 */
81 protected void setUp() {
82 this.contentSize = glob.getProperty().get("contentSize", contentSize);
83 this.startEmbedded = glob.getProperty().get("startEmbedded", this.startEmbedded);
84
85 if (this.startEmbedded) {
86 glob.init(Util.getOtherServerPorts(serverPort));
87 serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
88 log.info("XmlBlaster is ready for testing a big message");
89 }
90 else
91 log.warning("You need to start an external xmlBlaster server for this test or use option -startEmbedded true");
92
93 try {
94 log.info("Connecting ...");
95 con = glob.getXmlBlasterAccess();
96 ConnectQos qos = new ConnectQos(glob, name, passwd);
97 con.connect(qos, this); // Login to xmlBlaster
98 }
99 catch (Exception e) {
100 Thread.currentThread().dumpStack();
101 log.severe("Can't connect to xmlBlaster: " + e.toString());
102 }
103 }
104
105 /**
106 * Tears down the fixture.
107 * <p />
108 * cleaning up .... erase() the previous message OID and logout
109 */
110 protected void tearDown() {
111 try {
112 log.info("Erasing message " + oid + " ...");
113 EraseReturnQos[] arr = con.erase("<key oid='" + oid + "'/>", "<qos/>");
114 log.info("Erasing of message " + oid + " done.");
115 assertEquals("Wrong number of message erased", 1, arr.length);
116 assertTrue(assertInUpdate, assertInUpdate == null);
117 } catch(XmlBlasterException e) { log.severe("XmlBlasterException: " + e.getMessage()); }
118
119 con.disconnect(null);
120 con=null;
121
122 if (this.startEmbedded) {
123 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
124 this.serverThread = null;
125 // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
126 Util.resetPorts();
127 }
128 }
129
130 /**
131 * Create a RDBMS table, fill some data and destroy it again.
132 * We use the tinySQL dBase BigMessage driver for testing.
133 */
134 public void testBigMessage() {
135 log.info("######## Start testBigMessage()");
136 StopWatch stopWatch = null;
137
138 stopWatch = new StopWatch();
139 byte[] content = new byte[contentSize];
140 for (int i=0; i<content.length; i++) {
141 content[i] = (byte)(i % 255);
142 }
143 log.info("Allocated message content with size=" + content.length/1000000 + " MB");
144 try {
145 PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>"
146 MsgUnit msgUnit = new MsgUnit("<key oid='" + oid + "'/>", content, "<qos/>");
147 stopWatch = new StopWatch();
148 stopWatchRoundTrip = new StopWatch();
149
150 con.publish(msgUnit);
151
152 long avg = 0;
153 long elapsed = stopWatch.elapsed();
154 if (elapsed > 0L)
155 avg = ((long)(contentSize)) / elapsed; // byte/milli == kbyte/sec
156
157 log.info("Success: Publishing of " + oid + " with size=" + contentSize/1000000 + " MB done, avg=" + avg + " KB/sec " + stopWatch.nice());
158 } catch(XmlBlasterException e) {
159 log.severe("XmlBlasterException: " + e.getMessage());
160 fail("Can't publish huge message: " + e.getMessage());
161 }
162
163 messageArrived = false;
164
165 try {
166 SubscribeReturnQos subscriptionId = con.subscribe("<key oid='" + oid + "'/>", "<qos/>");
167 log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done");
168 } catch(XmlBlasterException e) {
169 log.severe("XmlBlasterException: " + e.getMessage());
170 fail("subscribe - XmlBlasterException: " + e.getMessage());
171 }
172
173 waitOnUpdate(20000L);
174 assertTrue(assertInUpdate, assertInUpdate == null);
175 assertEquals("Message not arrived", true, messageArrived);
176
177 // Allow the update to return to xmlBlaster ...
178 try { Thread.sleep(3000L); } catch( InterruptedException i) {}
179 log.info("######## End testBigMessage()");
180 }
181
182 /**
183 * This is the callback method invoked from xmlBlaster
184 * delivering us a new asynchronous message.
185 * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
186 */
187 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
188 {
189 log.info("Receiving update of a message state=" + updateQos.getState());
190
191 if (updateQos.isErased()) {
192 log.info("Ignore erase event");
193 return ""; // We ignore the erase event on tearDown
194 }
195
196 long elapsed = stopWatchRoundTrip.elapsed();
197 long avg = 0;
198 if (elapsed > 0L)
199 avg = ((long)(contentSize)) / elapsed; // byte/milli == kbyte/sec
200
201
202 log.info("Receiving update of message oid=" + updateKey.getOid() +
203 " size=" + content.length + " ...");
204
205 log.info("Success: Publish+Update of " + oid + " with size=" + contentSize/1000000 + " MB done, roundtrip avg=" +
206 avg + " KB/sec " + stopWatchRoundTrip.nice());
207
208 assertInUpdate = "Wrong sender, expected:" + name + " but was:" + updateQos.getSender().getLoginName();
209 assertEquals("Wrong sender", name, updateQos.getSender().getLoginName());
210
211 assertInUpdate = "Wrong oid of message returned expected:" + oid + " but was:" + updateKey.getOid();
212 assertEquals("Message oid is wrong", oid, updateKey.getOid());
213
214 assertInUpdate = "Wrong message size arrived in update, expected:" + contentSize + " but was:" + content.length;
215 assertEquals("Wrong message size arrived", contentSize, content.length);
216
217 assertInUpdate = null;
218 messageArrived = true;
219 return "";
220 }
221
222 /**
223 * Little helper, waits until the variable 'messageArrive' is set
224 * to true, or fails when the given timeout occurs.
225 * @param timeout in milliseconds
226 */
227 private void waitOnUpdate(final long timeout)
228 {
229 long pollingInterval = 50L; // check every 0.05 seconds
230 if (timeout < 50) pollingInterval = timeout / 10L;
231 long sum = 0L;
232 while (!messageArrived) {
233 try {
234 Thread.sleep(pollingInterval);
235 }
236 catch( InterruptedException i)
237 {}
238 sum += pollingInterval;
239 if (sum > timeout) {
240 log.info("Timeout of " + timeout + " occurred");
241 fail("Timeout of " + timeout + " occurred");
242 }
243 }
244 }
245
246 /**
247 * Invoke:
248 * <pre>
249 * java org.xmlBlaster.test.stress.BigMessage -contentSize 2000000 -startEmbedded false
250 * <pre>
251 */
252 public static void main(String args[]) {
253 Global glob = new Global();
254 if (glob.init(args) != 0) {
255 System.exit(0);
256 }
257 BigMessage big = new BigMessage("BigMessage");
258 big.setUp();
259 big.testBigMessage();
260 big.tearDown();
261 }
262 }
syntax highlighted by Code2HTML, v. 0.9.1