1 /*-----t-------------------------------------------------------------------------
2 Name: TestStreamMessages.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.client;
7
8 import java.io.ByteArrayInputStream;
9 import java.io.ByteArrayOutputStream;
10 import java.io.IOException;
11 import java.io.InputStream;
12 import java.util.Random;
13
14 import java.util.logging.Logger;
15
16 import javax.jms.DeliveryMode;
17 import javax.jms.JMSException;
18
19 import org.xmlBlaster.client.I_StreamingCallback;
20 import org.xmlBlaster.client.I_XmlBlasterAccess;
21 import org.xmlBlaster.client.XmlBlasterAccess;
22 import org.xmlBlaster.client.key.PublishKey;
23 import org.xmlBlaster.client.key.SubscribeKey;
24 import org.xmlBlaster.client.key.UnSubscribeKey;
25 import org.xmlBlaster.client.key.UpdateKey;
26 import org.xmlBlaster.client.qos.ConnectQos;
27 import org.xmlBlaster.client.qos.ConnectReturnQos;
28 import org.xmlBlaster.client.qos.DisconnectQos;
29 import org.xmlBlaster.client.qos.PublishQos;
30 import org.xmlBlaster.client.qos.SubscribeQos;
31 import org.xmlBlaster.client.qos.UnSubscribeQos;
32 import org.xmlBlaster.client.qos.UpdateQos;
33 import org.xmlBlaster.jms.XBConnectionMetaData;
34 import org.xmlBlaster.jms.XBDestination;
35 import org.xmlBlaster.jms.XBMessageProducer;
36 import org.xmlBlaster.jms.XBSession;
37 import org.xmlBlaster.jms.XBStreamingMessage;
38 import org.xmlBlaster.test.Msg;
39 import org.xmlBlaster.test.MsgInterceptor;
40 import org.xmlBlaster.util.Global;
41 import org.xmlBlaster.util.MsgUnit;
42 import org.xmlBlaster.util.XmlBlasterException;
43 import org.xmlBlaster.util.def.Constants;
44 import org.xmlBlaster.util.def.ErrorCode;
45 import org.xmlBlaster.util.qos.ClientProperty;
46 import org.xmlBlaster.util.qos.address.CallbackAddress;
47
48 import junit.framework.TestCase;
49
50
51 /**
52 * <p>
53 * This is an interesting example, since it creates a XmlBlaster server instance
54 * in the same JVM , but in a separate thread, talking over CORBA with it.
55 * <p>
56 * Invoke examples:<br />
57 * <pre>
58 * java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestStreamMessages
59 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestStreamMessages
60 * </pre>
61 * @see org.xmlBlaster.client.I_XmlBlasterAccess
62 */
63 public class TestStreamMessages extends TestCase implements I_StreamingCallback {
64 private static String ME = "TestStreamMessages";
65 private Global global;
66 private static Logger log = Logger.getLogger(TestStreamMessages.class.getName());
67 private Global connGlobal;
68 //private Global publisherGlobal;
69 private String oid = "testStreamMessages";
70 private MsgInterceptor updateInterceptor;
71 private byte[] msgContent;
72 private long delay = 5000000L;
73 private boolean ignoreException;
74
75 public TestStreamMessages() {
76 this(null);
77 }
78
79 public TestStreamMessages(Global global) {
80 super("TestStreamMessages");
81 this.global = global;
82 if (this.global == null) {
83 this.global = new Global();
84 this.global.init((String[])null);
85 }
86 }
87
88 /**
89 * Sets up the fixture.
90 * <p />
91 * Connect to xmlBlaster and login
92 */
93 protected void setUp() {
94 try {
95 this.connGlobal = this.global.getClone(null);
96 // this.publisherGlobal = this.global.getClone(null);
97 // this.publisherGlobal.getXmlBlasterAccess().connect(new ConnectQos(this.publisherGlobal, "one/2", "secret"), null);
98
99 this.updateInterceptor = new MsgInterceptor(this.connGlobal, log, null, this);
100 boolean withQueue = true;
101 // we need failsafe behaviour to enable holdback messages on client update exceptions
102 ConnectQos connectQos = new ConnectQos(this.connGlobal, "streamingMsgTester/1", "secret");
103 connectQos.getAddress().setDelay(5000L);
104 connectQos.getAddress().setPingInterval(5000L);
105 connectQos.getAddress().setRetries(-1);
106 CallbackAddress cbAddr = new CallbackAddress(this.global);
107 cbAddr.setDelay(5000L);
108 cbAddr.setPingInterval(5000L);
109 cbAddr.setRetries(-1);
110 connectQos.addCallbackAddress(cbAddr);
111 XmlBlasterAccess access = (XmlBlasterAccess)this.connGlobal.getXmlBlasterAccess();
112 ConnectReturnQos retQos = access.connect(connectQos, this.updateInterceptor, withQueue);
113 log.info("connect return qos: " + retQos.toXml());
114
115 SubscribeQos subQos = new SubscribeQos(this.connGlobal);
116 subQos.setWantInitialUpdate(false);
117 subQos.setMultiSubscribe(false);
118 this.connGlobal.getXmlBlasterAccess().subscribe(new SubscribeKey(this.connGlobal, this.oid), subQos);
119 }
120 catch (XmlBlasterException ex) {
121 ex.printStackTrace();
122 fail("aborting since exception ex: " + ex.getMessage());
123 }
124 }
125
126
127 /**
128 * Tears down the fixture.
129 * <p />
130 * cleaning up .... erase() the previous message OID and logout
131 */
132 protected void tearDown() {
133 log.info("Entering tearDown(), test is finished");
134 try {
135 Thread.sleep(1000L); // since the cb could be too fast
136 this.connGlobal.getXmlBlasterAccess().unSubscribe(new UnSubscribeKey(this.connGlobal, this.oid), new UnSubscribeQos(this.connGlobal));
137 this.connGlobal.getXmlBlasterAccess().disconnect(new DisconnectQos(this.connGlobal));
138 this.connGlobal.shutdown();
139 this.connGlobal = null;
140 // this.publisherGlobal.getXmlBlasterAccess().disconnect(new DisconnectQos(this.publisherGlobal));
141 // this.publisherGlobal.shutdown();
142 // this.publisherGlobal = null;
143 }
144 catch (InterruptedException ex) {
145 ex.printStackTrace();
146 }
147 catch (XmlBlasterException ex) {
148 ex.printStackTrace();
149 fail("aborting since exception ex: " + ex.getMessage());
150 }
151 }
152
153 private final String getMemInfo() {
154 StringBuffer buf = new StringBuffer(256);
155 final int MEGA = 1024 * 1024;
156 buf.append("MEMORY: total='").append(Runtime.getRuntime().totalMemory()/MEGA).append("' ");
157 buf.append("max='").append(Runtime.getRuntime().maxMemory()/MEGA).append("' ");
158 buf.append("free='").append(Runtime.getRuntime().freeMemory()/MEGA).append("' MB");
159 return buf.toString();
160 }
161
162 public String update(String cbSessionId, UpdateKey updateKey, InputStream is, UpdateQos updateQos) throws XmlBlasterException, IOException {
163
164 ClientProperty prop = updateQos.getClientProperty(Constants.addJmsPrefix("interrupted", log));
165 boolean doInterrupt = false;
166 if (prop != null)
167 doInterrupt = prop.getBooleanValue();
168 ByteArrayOutputStream baos = new ByteArrayOutputStream();
169 byte[] buf = new byte[300];
170 int count = 0;
171 String name = updateQos.getClientProperty("nameOfTest", "");
172 boolean isException = "testException".equals(name);
173 log.info("test '" + name + "' before reading: " + getMemInfo());
174 while(true) {
175 int ret = is.read(buf);
176 if (ret == -1 || doInterrupt)
177 break;
178 baos.write(buf, 0, ret);
179 count += ret;
180 if (isException && count > 600 && !ignoreException) { // it must pass the second time
181 this.ignoreException = true;
182 throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "fake exception to be hold back (dispatcher must go to false)", "fake");
183 }
184 }
185 log.info("test '" + name + "' before closing input stream: " + getMemInfo());
186 is.close();
187 log.info("test '" + name + "' after closing: " + getMemInfo());
188 this.msgContent = baos.toByteArray();
189 byte[] content = this.msgContent;
190 log.info("Receiving update of a message oid=" + updateKey.getOid() +
191 " priority=" + updateQos.getPriority() +
192 " state=" + updateQos.getState() +
193 " contentSize=" + content.length);
194 this.updateInterceptor.setMsgContent(content);
195 return "OK";
196 }
197
198
199 private void doPublish(byte[] content, int maxChunkSize, boolean doInterrupt, String name) throws XmlBlasterException {
200 log.info("Publishing for '" + name + "'");
201 // Global glob = this.global.getClone(null);
202 Global glob = this.connGlobal;
203 I_XmlBlasterAccess conn = glob.getXmlBlasterAccess();
204 PublishKey key = new PublishKey(glob, this.oid);
205 PublishQos qos = new PublishQos(glob);
206 qos.setPersistent(true);
207 if (doInterrupt)
208 qos.addClientProperty("interrupted", true);
209 qos.addClientProperty("nameOfTest", name);
210 qos.addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, log), maxChunkSize);
211 ByteArrayInputStream bais = new ByteArrayInputStream(content);
212 conn.publishStream(bais, key.getData(), qos.getData(), maxChunkSize, null);
213 }
214
215 private void doPublishJMS(byte[] content, int maxChunkSize, boolean doInterrupt, String name) throws JMSException {
216 // Global glob = this.global.getClone(null);
217 // XBSession session = new XBSession(this.publisherGlobal, XBSession.AUTO_ACKNOWLEDGE, false);
218 log.info("Publishing for '" + name + "'");
219 XBSession session = new XBSession(this.connGlobal, XBSession.AUTO_ACKNOWLEDGE, false);
220 XBMessageProducer producer = new XBMessageProducer(session, new XBDestination(this.oid, null));
221 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
222 XBStreamingMessage msg = session.createStreamingMessage(null);
223 if (doInterrupt)
224 msg.setBooleanProperty("interrupted", true);
225 msg.setIntProperty(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, maxChunkSize);
226 msg.setStringProperty("nameOfTest", name); // to recognize it in '__sys__deadMessage'
227 ByteArrayInputStream bais = new ByteArrayInputStream(content);
228 msg.setInputStream(bais);
229 producer.send(msg);
230 }
231
232 private byte[] createRandomContent(int size) {
233 byte[] ret = new byte[size];
234 Random random = new Random();
235 random.nextBytes(ret);
236 return ret;
237 }
238
239 public void testManyChunks() {
240 int maxChunkSize = 128;
241 byte[] content = createRandomContent(maxChunkSize*5 - 1);
242 try {
243 this.updateInterceptor.clear();
244 doPublish(content, maxChunkSize, false, "testManyChunks");
245 int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
246 assertEquals("wrong number of updates when testing testManyChunks", 1, ret);
247 Msg[] msgs = this.updateInterceptor.getMsgs();
248 assertEquals("wrong number of msg entries when testing testManyChunks", 1, msgs.length);
249 assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length);
250 assertTrue("", compareContent(content, msgs[0].getContent()));
251 }
252 catch (XmlBlasterException ex) {
253 ex.printStackTrace();
254 fail();
255 }
256 }
257
258 /**
259 * This test is to check that we don't have a problem in the buffer of the Pipes due to
260 * large chunks of messages.
261 */
262 public void testManyBigChunks() {
263 String name = "testManyBigChunks";
264 int maxChunkSize = 1000 * 1000; // since JMS implementation does not allow more
265 byte[] content = createRandomContent(maxChunkSize*3 -1);
266 try {
267 this.updateInterceptor.clear();
268 doPublish(content, maxChunkSize, false, name);
269 int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
270 assertEquals("wrong number of updates when testing " + name, 1, ret);
271 Msg[] msgs = this.updateInterceptor.getMsgs();
272 assertEquals("wrong number of msg entries when testing " + name, 1, msgs.length);
273 assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length);
274 assertTrue("", compareContent(content, msgs[0].getContent()));
275 }
276 catch (XmlBlasterException ex) {
277 ex.printStackTrace();
278 fail();
279 }
280 }
281
282 public void testManyChunksTwoMessages() {
283 int maxChunkSize = 64;
284 byte[] content = createRandomContent(maxChunkSize*5 - 1);
285 try {
286 this.updateInterceptor.clear();
287 doPublish(content, maxChunkSize, false, "testManyChunksTwoMessages1");
288 content = createRandomContent(maxChunkSize*5 - 1);
289 doPublish(content, maxChunkSize, false, "testManyChunksTwoMessages2");
290 int ret = this.updateInterceptor.waitOnUpdate(this.delay, 2);
291 assertEquals("wrong number of updates when testing testManyChunksTwoMessages", 2, ret);
292 Msg[] msgs = this.updateInterceptor.getMsgs();
293 assertEquals("wrong number of msg entries when testing testManyChunksTwoMessages", 2, msgs.length);
294 assertEquals("Wrong size of returned buffer", content.length, msgs[1].getContent().length);
295 assertTrue("", compareContent(content, msgs[1].getContent()));
296 }
297 catch (XmlBlasterException ex) {
298 ex.printStackTrace();
299 fail();
300 }
301 }
302
303 public void testSingleChunk() {
304 int maxChunkSize = 200;
305 byte[] content = createRandomContent(maxChunkSize-10);
306 try {
307 this.updateInterceptor.clear();
308 doPublish(content, maxChunkSize, false, "testSingleChunk");
309 int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
310 assertEquals("wrong number of updates when testing testSingleChunk", 1, ret);
311 Msg[] msgs = this.updateInterceptor.getMsgs();
312 assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length);
313 assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length);
314 assertTrue("", compareContent(content, msgs[0].getContent()));
315 }
316 catch (XmlBlasterException ex) {
317 ex.printStackTrace();
318 fail();
319 }
320 }
321
322 public void testException() {
323 int maxChunkSize = 200;
324 byte[] content = createRandomContent(900);
325 try {
326 this.updateInterceptor.clear();
327 doPublish(content, maxChunkSize, false, "testException");
328 int ret = this.updateInterceptor.waitOnUpdate(2000L, 1);
329 assertEquals("wrong number of updates when testing testSingleChunk", 0, ret);
330 this.updateInterceptor.clear();
331 ((XmlBlasterAccess)this.connGlobal.getXmlBlasterAccess()).setCallbackDispatcherActive(true);
332 ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
333 Msg[] msgs = this.updateInterceptor.getMsgs();
334 assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length);
335 assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length);
336 assertTrue("", compareContent(content, msgs[0].getContent()));
337 }
338 catch (XmlBlasterException ex) {
339 ex.printStackTrace();
340 fail();
341 }
342 }
343
344 public void testInterruptedRead() {
345 int maxChunkSize = 256;
346 byte[] content = createRandomContent(maxChunkSize*5-10);
347 try {
348 this.updateInterceptor.clear();
349 doPublish(content, maxChunkSize, true /* interruped */, "testSingleChunk");
350 int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
351 assertEquals("wrong number of updates when testing testSingleChunk", 1, ret);
352 Msg[] msgs = this.updateInterceptor.getMsgs();
353 assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length);
354 assertTrue("", content.length > msgs[0].getContent().length);
355 }
356 catch (XmlBlasterException ex) {
357 ex.printStackTrace();
358 fail();
359 }
360 }
361
362 public void testNormalMessage() {
363 int maxChunkSize = 500;
364 byte[] content = createRandomContent(maxChunkSize);
365 try {
366 this.updateInterceptor.clear();
367 this.connGlobal.getXmlBlasterAccess().publish(new MsgUnit(new PublishKey(this.connGlobal, this.oid), content, new PublishQos(this.connGlobal)));
368 int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
369 assertEquals("wrong number of updates when testing testSingleChunk", 1, ret);
370 Msg[] msgs = this.updateInterceptor.getMsgs();
371 assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length);
372 assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length);
373 assertTrue("", compareContent(content, msgs[0].getContent()));
374 }
375 catch (XmlBlasterException ex) {
376 ex.printStackTrace();
377 fail();
378 }
379 }
380
381 private boolean compareContent(byte[] buf1, byte[] buf2) {
382 if (buf1 == null && buf2 == null)
383 return true;
384
385 if (buf1 == null || buf2 == null)
386 return false;
387
388 if (buf1.length != buf2.length)
389 return false;
390 for (int i=0; i < buf1.length; i++) {
391 if (buf1[i] != buf2[i])
392 return false;
393 }
394 return true;
395 }
396
397 /**
398 * Invoke: java org.xmlBlaster.test.client.TestStreamMessages
399 * <p />
400 * @deprecated Use the TestRunner from the testsuite to run it:<p />
401 * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestStreamMessages</pre>
402 */
403 public static void main(String args[]) {
404 Global global = new Global();
405 if (global.init(args) != 0) {
406 System.out.println(ME + ": Init failed");
407 System.exit(1);
408 }
409
410 TestStreamMessages test = new TestStreamMessages(global);
411
412 test.setUp();
413 test.testManyBigChunks();
414 test.tearDown();
415
416 test.setUp();
417 test.testManyChunks();
418 test.tearDown();
419
420 test.setUp();
421 test.testException();
422 test.tearDown();
423
424 test.setUp();
425 test.testSingleChunk();
426 test.tearDown();
427
428 test.setUp();
429 test.testInterruptedRead();
430 test.tearDown();
431
432 test.setUp();
433 test.testNormalMessage();
434 test.tearDown();
435
436 test.setUp();
437 test.testManyChunksTwoMessages();
438 test.tearDown();
439 }
440 }
syntax highlighted by Code2HTML, v. 0.9.1