1 /*------------------------------------------------------------------------------
  2 Name:      TestTopicHistory.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Testing some topic state transitions
  6 ------------------------------------------------------------------------------*/
  7 package org.xmlBlaster.test.topic;
  8 
  9 import java.util.logging.Logger;
 10 import org.xmlBlaster.util.Global;
 11 
 12 import org.xmlBlaster.client.qos.ConnectQos;
 13 import org.xmlBlaster.util.XmlBlasterException;
 14 import org.xmlBlaster.util.MsgUnit;
 15 import org.xmlBlaster.util.qos.TopicProperty;
 16 import org.xmlBlaster.client.I_Callback;
 17 import org.xmlBlaster.client.key.UpdateKey;
 18 import org.xmlBlaster.client.key.PublishKey;
 19 import org.xmlBlaster.client.key.SubscribeKey;
 20 import org.xmlBlaster.client.key.UnSubscribeKey;
 21 import org.xmlBlaster.client.key.EraseKey;
 22 import org.xmlBlaster.client.qos.PublishQos;
 23 import org.xmlBlaster.client.qos.PublishReturnQos;
 24 import org.xmlBlaster.client.qos.UpdateQos;
 25 import org.xmlBlaster.client.qos.SubscribeQos;
 26 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 27 import org.xmlBlaster.client.qos.EraseQos;
 28 import org.xmlBlaster.client.qos.EraseReturnQos;
 29 import org.xmlBlaster.client.qos.UnSubscribeQos;
 30 import org.xmlBlaster.client.I_XmlBlasterAccess;
 31 
 32 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 33 import org.xmlBlaster.test.Util;
 34 
 35 import junit.framework.*;
 36 
 37 
 38 /**
 39  * Here we test access to history messages of a topic. 
 40  * <p>
 41  * </p>
 42  * <p>
 43  * Invoke examples:
 44  * </p>
 45  * <pre>
 46  *    java junit.textui.TestRunner org.xmlBlaster.test.topic.TestTopicHistory
 47  *
 48  *    java junit.swingui.TestRunner -noloading org.xmlBlaster.test.topic.TestTopicHistory
 49  * </pre>
 50  * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/engine.message.lifecycle.html">The engine.message.lifecycle requirement</a>
 51  * @see org.xmlBlaster.engine.TopicHandler
 52  */
 53 public class TestTopicHistory extends TestCase implements I_Callback {
 54    private final Global glob;
 55    private static Logger log = Logger.getLogger(TestTopicHistory.class.getName());
 56 
 57    private I_XmlBlasterAccess con = null;
 58    private String senderContent = "Some message content";
 59    private String publishOid = "TestTopicHistoryMsg";
 60    private SubscribeReturnQos subscribeReturnQos;
 61    private long blockUpdateTime = 0L;
 62 
 63    private EmbeddedXmlBlaster serverThread;
 64    private int serverPort = 9566;
 65    private boolean startEmbedded = false;
 66 
 67    private int numReceived = 0;
 68 
 69    /**
 70     * Constructs the TestTopicHistory object.
 71     * <p />
 72     * @param testName  The name used in the test suite
 73     * @param loginName The name to login to the xmlBlaster
 74     */
 75    public TestTopicHistory(Global glob, String testName) {
 76       super(testName);
 77       this.glob = glob;
 78 
 79    }
 80 
 81    /**
 82     * Sets up the fixture.
 83     * <p />
 84     * Creates a CORBA connection and does a login.<br />
 85     * - One connection for the sender client<br />
 86     */
 87    protected void setUp() {
 88       this.startEmbedded = glob.getProperty().get("startEmbedded", this.startEmbedded);
 89       if (this.startEmbedded) {
 90          glob.init(Util.getOtherServerPorts(serverPort));
 91          String[] args = { };
 92          glob.init(args);
 93          serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
 94          log.info("XmlBlaster is ready for testing the priority dispatch plugin");
 95       }
 96 
 97       try {
 98          con = glob.getXmlBlasterAccess();
 99          ConnectQos qos = new ConnectQos(glob); // == "<qos></qos>";
100          con.connect(qos, this);
101       }
102       catch (Exception e) {
103           log.severe(e.toString());
104           e.printStackTrace();
105       }
106    }
107 
108    /**
109     * Tears down the fixture.
110     * <p />
111     * cleaning up .... logout
112     */
113    protected void tearDown() {
114       try { Thread.sleep(200L); } catch( InterruptedException i) {}   // Wait 200 milli seconds, until all updates are processed ...
115 
116       String xmlKey = "<key oid='" + publishOid + "' queryType='EXACT'>\n</key>";
117       String qos = "<qos></qos>";
118       try {
119          EraseReturnQos[] arr = con.erase(xmlKey, qos);
120          if (arr.length != 0) {
121             log.severe("Erased " + arr.length + " messages instead of 0");
122          }
123          assertEquals("Erase", 0, arr.length);   // The volatile message schould not exist !!
124       } catch(XmlBlasterException e) { fail("Erase XmlBlasterException: " + e.getMessage()); }
125 
126       con.disconnect(null);
127 
128       if (this.startEmbedded) {
129          try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
130          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
131          this.serverThread = null;
132       }
133 
134       // reset to default server port (necessary if other tests follow in the same JVM).
135       Util.resetPorts();
136    }
137 
138    public EraseReturnQos[] sendErase(boolean forceDestroy) {
139       log.info("Erasing a topic forceDestroy=" + forceDestroy);
140       try {
141          EraseQos eq = new EraseQos(glob);
142          eq.setForceDestroy(forceDestroy);
143          EraseKey ek = new EraseKey(glob, publishOid);
144          EraseReturnQos[] er = con.erase(ek.toXml(), eq.toXml());
145          return er;
146       } catch(XmlBlasterException e) {
147          fail("Erase XmlBlasterException: " + e.getMessage());
148       }
149       return null;
150    }
151 
152    /**
153     * Create a topic. 
154     */
155    public void createTopic(String keyOid, TopicProperty topicProperty) {
156       log.info("Creating topic " + keyOid);
157       try {
158          PublishKey pk = new PublishKey(glob, publishOid, "text/xml", "1.0");
159          PublishQos pq = new PublishQos(glob);
160          pq.setTopicProperty(topicProperty);
161          MsgUnit msgUnit = new MsgUnit(pk, senderContent, pq);
162          PublishReturnQos publishReturnQos = con.publish(msgUnit);
163          assertEquals("Retunred oid is invalid", publishOid, publishReturnQos.getKeyOid());
164          log.info("Topic oid=" + publishOid + " created: " + msgUnit.toXml());
165       } catch(XmlBlasterException e) {
166          log.severe("publish() XmlBlasterException: " + e.getMessage());
167          assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
168       }
169    }
170 
171    /**
172     * Publish an almost volatile message.
173     */
174    public void sendExpiringMsg(boolean initializeTopic, long topicDestroyDelay, long msgLifeTime) {
175       log.info("Sending a message initializeTopic=" + initializeTopic + " topicDestroyDelay=" + topicDestroyDelay + " msgLifeTime=" + msgLifeTime);
176       try {
177          // Publish a volatile message
178          PublishKey pk = new PublishKey(glob, publishOid, "text/xml", "1.0");
179          PublishQos pq = new PublishQos(glob);
180          pq.setLifeTime(msgLifeTime);
181          pq.setForceDestroy(false);
182          if (initializeTopic) {
183             // Configure the topic to our needs
184             TopicProperty topicProperty = new TopicProperty(glob);
185             topicProperty.setDestroyDelay(topicDestroyDelay);
186             topicProperty.setCreateDomEntry(false);
187             pq.setTopicProperty(topicProperty);
188          }
189          MsgUnit msgUnit = new MsgUnit(pk, senderContent, pq);
190          PublishReturnQos publishReturnQos = con.publish(msgUnit);
191          assertEquals("Retunred oid is invalid", publishOid, publishReturnQos.getKeyOid());
192          log.info("Sending of '" + senderContent + "' done, returned oid=" + publishOid + " " + msgUnit.toXml());
193       } catch(XmlBlasterException e) {
194          log.severe("publish() XmlBlasterException: " + e.getMessage());
195          assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
196       }
197    }
198 
199    /**
200     * Subscribe a volatile message.
201     */
202    public void subscribeMsg() {
203       log.info("Subscribing message '" + publishOid + "'...");
204       try {
205          // Subscribe for the volatile message
206          SubscribeKey sk = new SubscribeKey(glob, publishOid);
207          SubscribeQos sq = new SubscribeQos(glob);
208          subscribeReturnQos = con.subscribe(sk.toXml(), sq.toXml());
209          log.info("Subscribing of '" + publishOid + "' done");
210       } catch(XmlBlasterException e) {
211          log.severe("subscribe() XmlBlasterException: " + e.getMessage());
212          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
213       }
214    }
215 
216    /**
217     * unSubscribe a message.
218     */
219    public void unSubscribeMsg() {
220       log.info("unSubscribing a volatile message ...");
221       try {
222          // Subscribe for the volatile message
223          UnSubscribeKey sk = new UnSubscribeKey(glob, subscribeReturnQos.getSubscriptionId());
224          UnSubscribeQos sq = new UnSubscribeQos(glob);
225          con.unSubscribe(sk.toXml(), sq.toXml());
226          log.info("UnSubscribing of '" + publishOid + "' done");
227       } catch(XmlBlasterException e) {
228          log.severe("unSubscribe() XmlBlasterException: " + e.getMessage());
229          assertTrue("unSubscribe - XmlBlasterException: " + e.getMessage(), false);
230       }
231    }
232 
233    /**
234     */
235    public void testHistory() {
236       log.info("Entering testHistory ...");
237       numReceived = 0;
238 
239       String keyOid = "smallTopic";
240       TopicProperty topicProperty = new TopicProperty(glob);
241       long topicDestroyDelay = 6000L;
242       topicProperty.setDestroyDelay(topicDestroyDelay);
243       topicProperty.setCreateDomEntry(false);
244       createTopic(keyOid, topicProperty);
245 
246       /*
247       {  // topic transition from START -> [2] -> ALIVE (3 sec)
248          long msgLifeTime = 3000L;
249          sendExpiringMsg(true, topicDestroyDelay, msgLifeTime); 
250          waitOnUpdate(1000L, 0);
251          assertEquals("numReceived after sending", 0, numReceived); // no message arrived?
252          String dump = getDump();
253          log.fine(dump);
254          // Expecting something like:
255          // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicHistoryMsg' state='ALIVE'>
256          //  <uniqueKey>TestTopicHistoryMsg</uniqueKey>
257          assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
258          assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicHistoryMsg' state='ALIVE'") != -1);
259       }
260 
261 
262       {  // topic transition from ALIVE -> [6] -> UNREFERENCED (3 sec)
263          try { Thread.sleep(3500L); } catch( InterruptedException i) {}
264          String dump = getDump();
265          // Expecting something like:
266          // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicHistoryMsg' state='UNREFERENCED'>
267          //  <uniqueKey>TestTopicHistoryMsg</uniqueKey>
268          assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
269          assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicHistoryMsg' state='UNREFERENCED'") != -1);
270       }
271 
272       {  // topic transition from UNREFERENCED -> [11] -> DEAD
273          log.info("Sleeping for another 5 sec, the topic (with destroyDelay=6sec) should be dead then");
274          try { Thread.sleep(6000); } catch( InterruptedException i) {}
275          // Topic should be destroyed now
276 
277          String dump = getDump();
278          log.fine("IS DEAD?"+dump);
279          assertTrue("Not expected a dead topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") == -1);
280       }
281       */
282       log.info("SUCCESS testHistory");
283    }
284 
285    /**
286     * This is the callback method invoked from xmlBlaster
287     * delivering us a new asynchronous message. 
288     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
289     */
290    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
291       log.info("Receiving update of a message " + updateKey.getOid() + " " + updateQos.getState());
292 
293       numReceived += 1;
294 
295       if (updateQos.isOk()) {
296          assertEquals("Wrong oid of message returned", publishOid, updateKey.getOid());
297          assertEquals("Message content is corrupted", new String(senderContent), new String(content));
298       }
299 
300       if (this.blockUpdateTime > 0L) {
301          log.info("Blocking the update callback for " + this.blockUpdateTime + " millis");
302          try { Thread.sleep(this.blockUpdateTime); } catch( InterruptedException i) {}
303          this.blockUpdateTime = 0L;
304          log.info("Block released, reset blockTimer");
305       }
306       return "";
307    }
308 
309    /**
310     * Method is used by TestRunner to load these tests
311     */
312    public static Test suite() {
313        TestSuite suite= new TestSuite();
314        suite.addTest(new TestTopicHistory(new Global(), "testHistory"));
315        return suite;
316    }
317 
318    /**
319     * Invoke: java org.xmlBlaster.test.topic.TestTopicHistory -startEmbedded false
320     */
321    public static void main(String args[]) {
322       TestTopicHistory testSub = new TestTopicHistory(new Global(args), "TestTopicHistory");
323       testSub.setUp();
324       testSub.testHistory();
325       testSub.tearDown();
326    }
327 }


syntax highlighted by Code2HTML, v. 0.9.1