1 /*------------------------------------------------------------------------------
2 Name: TestTopicHistory.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Testing some topic state transitions
6 ------------------------------------------------------------------------------*/
7 package org.xmlBlaster.test.topic;
8
9 import java.util.logging.Logger;
10 import org.xmlBlaster.util.Global;
11
12 import org.xmlBlaster.client.qos.ConnectQos;
13 import org.xmlBlaster.util.XmlBlasterException;
14 import org.xmlBlaster.util.MsgUnit;
15 import org.xmlBlaster.util.qos.TopicProperty;
16 import org.xmlBlaster.client.I_Callback;
17 import org.xmlBlaster.client.key.UpdateKey;
18 import org.xmlBlaster.client.key.PublishKey;
19 import org.xmlBlaster.client.key.SubscribeKey;
20 import org.xmlBlaster.client.key.UnSubscribeKey;
21 import org.xmlBlaster.client.key.EraseKey;
22 import org.xmlBlaster.client.qos.PublishQos;
23 import org.xmlBlaster.client.qos.PublishReturnQos;
24 import org.xmlBlaster.client.qos.UpdateQos;
25 import org.xmlBlaster.client.qos.SubscribeQos;
26 import org.xmlBlaster.client.qos.SubscribeReturnQos;
27 import org.xmlBlaster.client.qos.EraseQos;
28 import org.xmlBlaster.client.qos.EraseReturnQos;
29 import org.xmlBlaster.client.qos.UnSubscribeQos;
30 import org.xmlBlaster.client.I_XmlBlasterAccess;
31
32 import org.xmlBlaster.util.EmbeddedXmlBlaster;
33 import org.xmlBlaster.test.Util;
34
35 import junit.framework.*;
36
37
38 /**
39 * Here we test access to history messages of a topic.
40 * <p>
41 * </p>
42 * <p>
43 * Invoke examples:
44 * </p>
45 * <pre>
46 * java junit.textui.TestRunner org.xmlBlaster.test.topic.TestTopicHistory
47 *
48 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.topic.TestTopicHistory
49 * </pre>
50 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/engine.message.lifecycle.html">The engine.message.lifecycle requirement</a>
51 * @see org.xmlBlaster.engine.TopicHandler
52 */
53 public class TestTopicHistory extends TestCase implements I_Callback {
54 private final Global glob;
55 private static Logger log = Logger.getLogger(TestTopicHistory.class.getName());
56
57 private I_XmlBlasterAccess con = null;
58 private String senderContent = "Some message content";
59 private String publishOid = "TestTopicHistoryMsg";
60 private SubscribeReturnQos subscribeReturnQos;
61 private long blockUpdateTime = 0L;
62
63 private EmbeddedXmlBlaster serverThread;
64 private int serverPort = 9566;
65 private boolean startEmbedded = false;
66
67 private int numReceived = 0;
68
69 /**
70 * Constructs the TestTopicHistory object.
71 * <p />
72 * @param testName The name used in the test suite
73 * @param loginName The name to login to the xmlBlaster
74 */
75 public TestTopicHistory(Global glob, String testName) {
76 super(testName);
77 this.glob = glob;
78
79 }
80
81 /**
82 * Sets up the fixture.
83 * <p />
84 * Creates a CORBA connection and does a login.<br />
85 * - One connection for the sender client<br />
86 */
87 protected void setUp() {
88 this.startEmbedded = glob.getProperty().get("startEmbedded", this.startEmbedded);
89 if (this.startEmbedded) {
90 glob.init(Util.getOtherServerPorts(serverPort));
91 String[] args = { };
92 glob.init(args);
93 serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
94 log.info("XmlBlaster is ready for testing the priority dispatch plugin");
95 }
96
97 try {
98 con = glob.getXmlBlasterAccess();
99 ConnectQos qos = new ConnectQos(glob); // == "<qos></qos>";
100 con.connect(qos, this);
101 }
102 catch (Exception e) {
103 log.severe(e.toString());
104 e.printStackTrace();
105 }
106 }
107
108 /**
109 * Tears down the fixture.
110 * <p />
111 * cleaning up .... logout
112 */
113 protected void tearDown() {
114 try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait 200 milli seconds, until all updates are processed ...
115
116 String xmlKey = "<key oid='" + publishOid + "' queryType='EXACT'>\n</key>";
117 String qos = "<qos></qos>";
118 try {
119 EraseReturnQos[] arr = con.erase(xmlKey, qos);
120 if (arr.length != 0) {
121 log.severe("Erased " + arr.length + " messages instead of 0");
122 }
123 assertEquals("Erase", 0, arr.length); // The volatile message schould not exist !!
124 } catch(XmlBlasterException e) { fail("Erase XmlBlasterException: " + e.getMessage()); }
125
126 con.disconnect(null);
127
128 if (this.startEmbedded) {
129 try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
130 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
131 this.serverThread = null;
132 }
133
134 // reset to default server port (necessary if other tests follow in the same JVM).
135 Util.resetPorts();
136 }
137
138 public EraseReturnQos[] sendErase(boolean forceDestroy) {
139 log.info("Erasing a topic forceDestroy=" + forceDestroy);
140 try {
141 EraseQos eq = new EraseQos(glob);
142 eq.setForceDestroy(forceDestroy);
143 EraseKey ek = new EraseKey(glob, publishOid);
144 EraseReturnQos[] er = con.erase(ek.toXml(), eq.toXml());
145 return er;
146 } catch(XmlBlasterException e) {
147 fail("Erase XmlBlasterException: " + e.getMessage());
148 }
149 return null;
150 }
151
152 /**
153 * Create a topic.
154 */
155 public void createTopic(String keyOid, TopicProperty topicProperty) {
156 log.info("Creating topic " + keyOid);
157 try {
158 PublishKey pk = new PublishKey(glob, publishOid, "text/xml", "1.0");
159 PublishQos pq = new PublishQos(glob);
160 pq.setTopicProperty(topicProperty);
161 MsgUnit msgUnit = new MsgUnit(pk, senderContent, pq);
162 PublishReturnQos publishReturnQos = con.publish(msgUnit);
163 assertEquals("Retunred oid is invalid", publishOid, publishReturnQos.getKeyOid());
164 log.info("Topic oid=" + publishOid + " created: " + msgUnit.toXml());
165 } catch(XmlBlasterException e) {
166 log.severe("publish() XmlBlasterException: " + e.getMessage());
167 assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
168 }
169 }
170
171 /**
172 * Publish an almost volatile message.
173 */
174 public void sendExpiringMsg(boolean initializeTopic, long topicDestroyDelay, long msgLifeTime) {
175 log.info("Sending a message initializeTopic=" + initializeTopic + " topicDestroyDelay=" + topicDestroyDelay + " msgLifeTime=" + msgLifeTime);
176 try {
177 // Publish a volatile message
178 PublishKey pk = new PublishKey(glob, publishOid, "text/xml", "1.0");
179 PublishQos pq = new PublishQos(glob);
180 pq.setLifeTime(msgLifeTime);
181 pq.setForceDestroy(false);
182 if (initializeTopic) {
183 // Configure the topic to our needs
184 TopicProperty topicProperty = new TopicProperty(glob);
185 topicProperty.setDestroyDelay(topicDestroyDelay);
186 topicProperty.setCreateDomEntry(false);
187 pq.setTopicProperty(topicProperty);
188 }
189 MsgUnit msgUnit = new MsgUnit(pk, senderContent, pq);
190 PublishReturnQos publishReturnQos = con.publish(msgUnit);
191 assertEquals("Retunred oid is invalid", publishOid, publishReturnQos.getKeyOid());
192 log.info("Sending of '" + senderContent + "' done, returned oid=" + publishOid + " " + msgUnit.toXml());
193 } catch(XmlBlasterException e) {
194 log.severe("publish() XmlBlasterException: " + e.getMessage());
195 assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
196 }
197 }
198
199 /**
200 * Subscribe a volatile message.
201 */
202 public void subscribeMsg() {
203 log.info("Subscribing message '" + publishOid + "'...");
204 try {
205 // Subscribe for the volatile message
206 SubscribeKey sk = new SubscribeKey(glob, publishOid);
207 SubscribeQos sq = new SubscribeQos(glob);
208 subscribeReturnQos = con.subscribe(sk.toXml(), sq.toXml());
209 log.info("Subscribing of '" + publishOid + "' done");
210 } catch(XmlBlasterException e) {
211 log.severe("subscribe() XmlBlasterException: " + e.getMessage());
212 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
213 }
214 }
215
216 /**
217 * unSubscribe a message.
218 */
219 public void unSubscribeMsg() {
220 log.info("unSubscribing a volatile message ...");
221 try {
222 // Subscribe for the volatile message
223 UnSubscribeKey sk = new UnSubscribeKey(glob, subscribeReturnQos.getSubscriptionId());
224 UnSubscribeQos sq = new UnSubscribeQos(glob);
225 con.unSubscribe(sk.toXml(), sq.toXml());
226 log.info("UnSubscribing of '" + publishOid + "' done");
227 } catch(XmlBlasterException e) {
228 log.severe("unSubscribe() XmlBlasterException: " + e.getMessage());
229 assertTrue("unSubscribe - XmlBlasterException: " + e.getMessage(), false);
230 }
231 }
232
233 /**
234 */
235 public void testHistory() {
236 log.info("Entering testHistory ...");
237 numReceived = 0;
238
239 String keyOid = "smallTopic";
240 TopicProperty topicProperty = new TopicProperty(glob);
241 long topicDestroyDelay = 6000L;
242 topicProperty.setDestroyDelay(topicDestroyDelay);
243 topicProperty.setCreateDomEntry(false);
244 createTopic(keyOid, topicProperty);
245
246 /*
247 { // topic transition from START -> [2] -> ALIVE (3 sec)
248 long msgLifeTime = 3000L;
249 sendExpiringMsg(true, topicDestroyDelay, msgLifeTime);
250 waitOnUpdate(1000L, 0);
251 assertEquals("numReceived after sending", 0, numReceived); // no message arrived?
252 String dump = getDump();
253 log.fine(dump);
254 // Expecting something like:
255 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicHistoryMsg' state='ALIVE'>
256 // <uniqueKey>TestTopicHistoryMsg</uniqueKey>
257 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
258 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicHistoryMsg' state='ALIVE'") != -1);
259 }
260
261
262 { // topic transition from ALIVE -> [6] -> UNREFERENCED (3 sec)
263 try { Thread.sleep(3500L); } catch( InterruptedException i) {}
264 String dump = getDump();
265 // Expecting something like:
266 // <TopicHandler id='http_192_168_1_4_3412/topic/TestTopicHistoryMsg' state='UNREFERENCED'>
267 // <uniqueKey>TestTopicHistoryMsg</uniqueKey>
268 assertTrue("Missing topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") != -1);
269 assertTrue("Topic in wrong state:" + dump, dump.indexOf("TestTopicHistoryMsg' state='UNREFERENCED'") != -1);
270 }
271
272 { // topic transition from UNREFERENCED -> [11] -> DEAD
273 log.info("Sleeping for another 5 sec, the topic (with destroyDelay=6sec) should be dead then");
274 try { Thread.sleep(6000); } catch( InterruptedException i) {}
275 // Topic should be destroyed now
276
277 String dump = getDump();
278 log.fine("IS DEAD?"+dump);
279 assertTrue("Not expected a dead topic", dump.indexOf("<uniqueKey>"+publishOid+"</uniqueKey>") == -1);
280 }
281 */
282 log.info("SUCCESS testHistory");
283 }
284
285 /**
286 * This is the callback method invoked from xmlBlaster
287 * delivering us a new asynchronous message.
288 * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
289 */
290 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
291 log.info("Receiving update of a message " + updateKey.getOid() + " " + updateQos.getState());
292
293 numReceived += 1;
294
295 if (updateQos.isOk()) {
296 assertEquals("Wrong oid of message returned", publishOid, updateKey.getOid());
297 assertEquals("Message content is corrupted", new String(senderContent), new String(content));
298 }
299
300 if (this.blockUpdateTime > 0L) {
301 log.info("Blocking the update callback for " + this.blockUpdateTime + " millis");
302 try { Thread.sleep(this.blockUpdateTime); } catch( InterruptedException i) {}
303 this.blockUpdateTime = 0L;
304 log.info("Block released, reset blockTimer");
305 }
306 return "";
307 }
308
309 /**
310 * Method is used by TestRunner to load these tests
311 */
312 public static Test suite() {
313 TestSuite suite= new TestSuite();
314 suite.addTest(new TestTopicHistory(new Global(), "testHistory"));
315 return suite;
316 }
317
318 /**
319 * Invoke: java org.xmlBlaster.test.topic.TestTopicHistory -startEmbedded false
320 */
321 public static void main(String args[]) {
322 TestTopicHistory testSub = new TestTopicHistory(new Global(args), "TestTopicHistory");
323 testSub.setUp();
324 testSub.testHistory();
325 testSub.tearDown();
326 }
327 }
syntax highlighted by Code2HTML, v. 0.9.1