1 /*-----t-------------------------------------------------------------------------
  2 Name:      TestStreamMessages.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.client;
  7 
  8 import java.io.ByteArrayInputStream;
  9 import java.io.ByteArrayOutputStream;
 10 import java.io.IOException;
 11 import java.io.InputStream;
 12 import java.util.Random;
 13 
 14 import java.util.logging.Logger;
 15 
 16 import javax.jms.DeliveryMode;
 17 import javax.jms.JMSException;
 18 
 19 import org.xmlBlaster.client.I_StreamingCallback;
 20 import org.xmlBlaster.client.I_XmlBlasterAccess;
 21 import org.xmlBlaster.client.XmlBlasterAccess;
 22 import org.xmlBlaster.client.key.PublishKey;
 23 import org.xmlBlaster.client.key.SubscribeKey;
 24 import org.xmlBlaster.client.key.UnSubscribeKey;
 25 import org.xmlBlaster.client.key.UpdateKey;
 26 import org.xmlBlaster.client.qos.ConnectQos;
 27 import org.xmlBlaster.client.qos.ConnectReturnQos;
 28 import org.xmlBlaster.client.qos.DisconnectQos;
 29 import org.xmlBlaster.client.qos.PublishQos;
 30 import org.xmlBlaster.client.qos.SubscribeQos;
 31 import org.xmlBlaster.client.qos.UnSubscribeQos;
 32 import org.xmlBlaster.client.qos.UpdateQos;
 33 import org.xmlBlaster.jms.XBConnectionMetaData;
 34 import org.xmlBlaster.jms.XBDestination;
 35 import org.xmlBlaster.jms.XBMessageProducer;
 36 import org.xmlBlaster.jms.XBSession;
 37 import org.xmlBlaster.jms.XBStreamingMessage;
 38 import org.xmlBlaster.test.Msg;
 39 import org.xmlBlaster.test.MsgInterceptor;
 40 import org.xmlBlaster.util.Global;
 41 import org.xmlBlaster.util.MsgUnit;
 42 import org.xmlBlaster.util.XmlBlasterException;
 43 import org.xmlBlaster.util.def.Constants;
 44 import org.xmlBlaster.util.def.ErrorCode;
 45 import org.xmlBlaster.util.qos.ClientProperty;
 46 import org.xmlBlaster.util.qos.address.CallbackAddress;
 47 
 48 import junit.framework.TestCase;
 49 
 50 
 51 /**
 52  * <p>
 53  * This is an interesting example, since it creates a XmlBlaster server instance
 54  * in the same JVM , but in a separate thread, talking over CORBA with it.
 55  * <p>
 56  * Invoke examples:<br />
 57  * <pre>
 58  *   java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestStreamMessages
 59  *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestStreamMessages
 60  * </pre>
 61  * @see org.xmlBlaster.client.I_XmlBlasterAccess
 62  */
 63 public class TestStreamMessages extends TestCase implements I_StreamingCallback {
 64    private static String ME = "TestStreamMessages";
 65    private Global global;
 66    private static Logger log = Logger.getLogger(TestStreamMessages.class.getName());
 67    private Global connGlobal;
 68    //private Global publisherGlobal;
 69    private String oid = "testStreamMessages";
 70    private MsgInterceptor updateInterceptor;
 71    private byte[] msgContent;
 72    private long delay = 5000000L;
 73    private boolean ignoreException;
 74    
 75    public TestStreamMessages() {
 76       this(null);
 77    }
 78 
 79    public TestStreamMessages(Global global) {
 80       super("TestStreamMessages");
 81       this.global = global;
 82       if (this.global == null) {
 83          this.global = new Global();
 84          this.global.init((String[])null);
 85       }
 86    }
 87 
 88    /**
 89     * Sets up the fixture.
 90     * <p />
 91     * Connect to xmlBlaster and login
 92     */
 93    protected void setUp() {
 94       try {
 95          this.connGlobal = this.global.getClone(null);
 96          // this.publisherGlobal = this.global.getClone(null);
 97          // this.publisherGlobal.getXmlBlasterAccess().connect(new ConnectQos(this.publisherGlobal, "one/2", "secret"), null);
 98          
 99          this.updateInterceptor = new MsgInterceptor(this.connGlobal, log, null, this);
100          boolean withQueue = true;
101          // we need failsafe behaviour to enable holdback messages on client update exceptions
102          ConnectQos connectQos = new ConnectQos(this.connGlobal, "streamingMsgTester/1", "secret");
103          connectQos.getAddress().setDelay(5000L);
104          connectQos.getAddress().setPingInterval(5000L);
105          connectQos.getAddress().setRetries(-1);
106          CallbackAddress cbAddr = new CallbackAddress(this.global);
107          cbAddr.setDelay(5000L);
108          cbAddr.setPingInterval(5000L);
109          cbAddr.setRetries(-1);
110          connectQos.addCallbackAddress(cbAddr);
111          XmlBlasterAccess access = (XmlBlasterAccess)this.connGlobal.getXmlBlasterAccess();
112          ConnectReturnQos retQos = access.connect(connectQos, this.updateInterceptor, withQueue);
113          log.info("connect return qos: " + retQos.toXml());
114          
115          SubscribeQos subQos = new SubscribeQos(this.connGlobal);
116          subQos.setWantInitialUpdate(false);
117          subQos.setMultiSubscribe(false);
118          this.connGlobal.getXmlBlasterAccess().subscribe(new SubscribeKey(this.connGlobal, this.oid), subQos);
119       }
120       catch (XmlBlasterException ex) {
121          ex.printStackTrace();
122          fail("aborting since exception ex: " + ex.getMessage());
123       }
124    }
125    
126    
127    /**
128     * Tears down the fixture.
129     * <p />
130     * cleaning up .... erase() the previous message OID and logout
131     */
132    protected void tearDown() {
133       log.info("Entering tearDown(), test is finished");
134       try {
135          Thread.sleep(1000L); // since the cb could be too fast
136          this.connGlobal.getXmlBlasterAccess().unSubscribe(new UnSubscribeKey(this.connGlobal, this.oid), new UnSubscribeQos(this.connGlobal));
137          this.connGlobal.getXmlBlasterAccess().disconnect(new DisconnectQos(this.connGlobal));
138          this.connGlobal.shutdown();
139          this.connGlobal = null;
140          // this.publisherGlobal.getXmlBlasterAccess().disconnect(new DisconnectQos(this.publisherGlobal));
141          // this.publisherGlobal.shutdown();
142          // this.publisherGlobal = null;
143       }
144       catch (InterruptedException ex) {
145          ex.printStackTrace();
146       }
147       catch (XmlBlasterException ex) {
148          ex.printStackTrace();
149          fail("aborting since exception ex: " + ex.getMessage());
150       }
151    }
152 
153    private final String getMemInfo() {
154       StringBuffer buf = new StringBuffer(256);
155       final int MEGA = 1024 * 1024;
156       buf.append("MEMORY: total='").append(Runtime.getRuntime().totalMemory()/MEGA).append("' ");
157       buf.append("max='").append(Runtime.getRuntime().maxMemory()/MEGA).append("' ");
158       buf.append("free='").append(Runtime.getRuntime().freeMemory()/MEGA).append("' MB");
159       return buf.toString();
160    }
161    
162    public String update(String cbSessionId, UpdateKey updateKey, InputStream is, UpdateQos updateQos) throws XmlBlasterException, IOException {
163       
164       ClientProperty prop = updateQos.getClientProperty(Constants.addJmsPrefix("interrupted", log));
165       boolean doInterrupt = false;
166       if (prop != null)
167          doInterrupt = prop.getBooleanValue();
168       ByteArrayOutputStream baos = new ByteArrayOutputStream();
169       byte[] buf = new byte[300];
170       int count = 0;
171       String name = updateQos.getClientProperty("nameOfTest", "");
172       boolean isException = "testException".equals(name);
173       log.info("test '" + name + "' before reading: " + getMemInfo());
174       while(true) {
175          int ret = is.read(buf);
176          if (ret == -1 || doInterrupt)
177             break;
178          baos.write(buf, 0, ret);
179          count += ret;
180          if (isException && count > 600 && !ignoreException) { // it must pass the second time
181             this.ignoreException = true;
182             throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "fake exception to be hold back (dispatcher must go to false)", "fake");
183          }
184       }
185       log.info("test '" + name + "' before closing input stream: " + getMemInfo());
186       is.close();
187       log.info("test '" + name + "' after closing: " + getMemInfo());
188       this.msgContent = baos.toByteArray();
189       byte[] content = this.msgContent;
190       log.info("Receiving update of a message oid=" + updateKey.getOid() +
191                         " priority=" + updateQos.getPriority() +
192                         " state=" + updateQos.getState() +
193                         " contentSize=" + content.length);
194       this.updateInterceptor.setMsgContent(content);
195       return "OK";
196    }
197 
198 
199    private void doPublish(byte[] content, int maxChunkSize, boolean doInterrupt, String name) throws XmlBlasterException {
200       log.info("Publishing for '" + name + "'");
201       // Global glob = this.global.getClone(null);
202       Global glob = this.connGlobal;
203       I_XmlBlasterAccess conn = glob.getXmlBlasterAccess();
204       PublishKey key = new PublishKey(glob, this.oid);
205       PublishQos qos = new PublishQos(glob);
206       qos.setPersistent(true);
207       if (doInterrupt)
208          qos.addClientProperty("interrupted", true);
209       qos.addClientProperty("nameOfTest", name);
210       qos.addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, log), maxChunkSize);
211       ByteArrayInputStream bais = new ByteArrayInputStream(content);
212       conn.publishStream(bais, key.getData(), qos.getData(), maxChunkSize, null);
213    }
214    
215    private void doPublishJMS(byte[] content, int maxChunkSize, boolean doInterrupt, String name) throws JMSException {
216       // Global glob = this.global.getClone(null);
217       // XBSession session = new XBSession(this.publisherGlobal, XBSession.AUTO_ACKNOWLEDGE, false);
218       log.info("Publishing for '" + name + "'");
219       XBSession session = new XBSession(this.connGlobal, XBSession.AUTO_ACKNOWLEDGE, false);
220       XBMessageProducer producer = new XBMessageProducer(session, new XBDestination(this.oid, null));
221       producer.setDeliveryMode(DeliveryMode.PERSISTENT);
222       XBStreamingMessage msg = session.createStreamingMessage(null);
223       if (doInterrupt)
224          msg.setBooleanProperty("interrupted", true);
225       msg.setIntProperty(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, maxChunkSize);
226       msg.setStringProperty("nameOfTest", name); // to recognize it in '__sys__deadMessage'
227       ByteArrayInputStream bais = new ByteArrayInputStream(content);
228       msg.setInputStream(bais);
229       producer.send(msg);
230    }
231    
232    private byte[] createRandomContent(int size) {
233       byte[] ret = new byte[size];
234       Random random = new Random();
235       random.nextBytes(ret);
236       return ret;
237    }
238    
239    public void testManyChunks() {
240       int maxChunkSize = 128;
241       byte[] content = createRandomContent(maxChunkSize*5 - 1);
242       try {
243          this.updateInterceptor.clear();
244          doPublish(content, maxChunkSize, false, "testManyChunks");
245          int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
246          assertEquals("wrong number of updates when testing testManyChunks", 1, ret);
247          Msg[] msgs = this.updateInterceptor.getMsgs();
248          assertEquals("wrong number of msg entries when testing testManyChunks", 1, msgs.length);
249          assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length);
250          assertTrue("", compareContent(content, msgs[0].getContent()));
251       }
252       catch (XmlBlasterException ex) {
253          ex.printStackTrace();
254          fail();
255       }
256    }
257 
258    /**
259     * This test is to check that we don't have a problem in the buffer of the Pipes due to 
260     * large chunks of messages.
261     */
262    public void testManyBigChunks() {
263       String name = "testManyBigChunks";
264       int maxChunkSize = 1000 * 1000; // since JMS implementation does not allow more
265       byte[] content = createRandomContent(maxChunkSize*3 -1);
266       try {
267          this.updateInterceptor.clear();
268          doPublish(content, maxChunkSize, false, name);
269          int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
270          assertEquals("wrong number of updates when testing " + name, 1, ret);
271          Msg[] msgs = this.updateInterceptor.getMsgs();
272          assertEquals("wrong number of msg entries when testing " + name, 1, msgs.length);
273          assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length);
274          assertTrue("", compareContent(content, msgs[0].getContent()));
275       }
276       catch (XmlBlasterException ex) {
277          ex.printStackTrace();
278          fail();
279       }
280    }
281 
282    public void testManyChunksTwoMessages() {
283       int maxChunkSize = 64;
284       byte[] content = createRandomContent(maxChunkSize*5 - 1);
285       try {
286          this.updateInterceptor.clear();
287          doPublish(content, maxChunkSize, false, "testManyChunksTwoMessages1");
288          content = createRandomContent(maxChunkSize*5 - 1);
289          doPublish(content, maxChunkSize, false, "testManyChunksTwoMessages2");
290          int ret = this.updateInterceptor.waitOnUpdate(this.delay, 2);
291          assertEquals("wrong number of updates when testing testManyChunksTwoMessages", 2, ret);
292          Msg[] msgs = this.updateInterceptor.getMsgs();
293          assertEquals("wrong number of msg entries when testing testManyChunksTwoMessages", 2, msgs.length);
294          assertEquals("Wrong size of returned buffer", content.length, msgs[1].getContent().length);
295          assertTrue("", compareContent(content, msgs[1].getContent()));
296       }
297       catch (XmlBlasterException ex) {
298          ex.printStackTrace();
299          fail();
300       }
301    }
302 
303    public void testSingleChunk() {
304       int maxChunkSize = 200;
305       byte[] content = createRandomContent(maxChunkSize-10);
306       try {
307          this.updateInterceptor.clear();
308          doPublish(content, maxChunkSize, false, "testSingleChunk");
309          int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
310          assertEquals("wrong number of updates when testing testSingleChunk", 1, ret);
311          Msg[] msgs = this.updateInterceptor.getMsgs();
312          assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length);
313          assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length);
314          assertTrue("", compareContent(content, msgs[0].getContent()));
315       }
316       catch (XmlBlasterException ex) {
317          ex.printStackTrace();
318          fail();
319       }
320    }
321 
322    public void testException() {
323       int maxChunkSize = 200;
324       byte[] content = createRandomContent(900);
325       try {
326          this.updateInterceptor.clear();
327          doPublish(content, maxChunkSize, false, "testException");
328          int ret = this.updateInterceptor.waitOnUpdate(2000L, 1);
329          assertEquals("wrong number of updates when testing testSingleChunk", 0, ret);
330          this.updateInterceptor.clear();
331          ((XmlBlasterAccess)this.connGlobal.getXmlBlasterAccess()).setCallbackDispatcherActive(true);
332          ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
333          Msg[] msgs = this.updateInterceptor.getMsgs();
334          assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length);
335          assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length);
336          assertTrue("", compareContent(content, msgs[0].getContent()));
337       }
338       catch (XmlBlasterException ex) {
339          ex.printStackTrace();
340          fail();
341       }
342    }
343 
344    public void testInterruptedRead() {
345       int maxChunkSize = 256;
346       byte[] content = createRandomContent(maxChunkSize*5-10);
347       try {
348          this.updateInterceptor.clear();
349          doPublish(content, maxChunkSize, true /* interruped */, "testSingleChunk");
350          int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
351          assertEquals("wrong number of updates when testing testSingleChunk", 1, ret);
352          Msg[] msgs = this.updateInterceptor.getMsgs();
353          assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length);
354          assertTrue("", content.length > msgs[0].getContent().length);
355       }
356       catch (XmlBlasterException ex) {
357          ex.printStackTrace();
358          fail();
359       }
360    }
361 
362    public void testNormalMessage() {
363       int maxChunkSize = 500;
364       byte[] content = createRandomContent(maxChunkSize);
365       try {
366          this.updateInterceptor.clear();
367          this.connGlobal.getXmlBlasterAccess().publish(new MsgUnit(new PublishKey(this.connGlobal, this.oid), content, new PublishQos(this.connGlobal)));
368          int ret = this.updateInterceptor.waitOnUpdate(this.delay, 1);
369          assertEquals("wrong number of updates when testing testSingleChunk", 1, ret);
370          Msg[] msgs = this.updateInterceptor.getMsgs();
371          assertEquals("wrong number of msg entries when testing testSingleChunk", 1, msgs.length);
372          assertEquals("Wrong size of returned buffer", content.length, msgs[0].getContent().length);
373          assertTrue("", compareContent(content, msgs[0].getContent()));
374       }
375       catch (XmlBlasterException ex) {
376          ex.printStackTrace();
377          fail();
378       }
379    }
380 
381    private boolean compareContent(byte[] buf1, byte[] buf2) {
382       if (buf1 == null && buf2 == null)
383          return true;
384 
385       if (buf1 == null || buf2 == null)
386          return false;
387       
388       if (buf1.length != buf2.length)
389          return false;
390       for (int i=0; i < buf1.length; i++) {
391          if (buf1[i] != buf2[i])
392             return false;
393       }
394       return true;
395    }
396    
397    /**
398     * Invoke: java org.xmlBlaster.test.client.TestStreamMessages
399     * <p />
400     * @deprecated Use the TestRunner from the testsuite to run it:<p />
401     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestStreamMessages</pre>
402     */
403    public static void main(String args[]) {
404       Global global = new Global();
405       if (global.init(args) != 0) {
406          System.out.println(ME + ": Init failed");
407          System.exit(1);
408       }
409 
410       TestStreamMessages test = new TestStreamMessages(global);
411 
412       test.setUp();
413       test.testManyBigChunks();
414       test.tearDown();
415 
416       test.setUp();
417       test.testManyChunks();
418       test.tearDown();
419 
420       test.setUp();
421       test.testException();
422       test.tearDown();
423       
424       test.setUp();
425       test.testSingleChunk();
426       test.tearDown();
427 
428       test.setUp();
429       test.testInterruptedRead();
430       test.tearDown();
431 
432       test.setUp();
433       test.testNormalMessage();
434       test.tearDown();
435 
436       test.setUp();
437       test.testManyChunksTwoMessages();
438       test.tearDown();
439    }
440 }


syntax highlighted by Code2HTML, v. 0.9.1