1 /*------------------------------------------------------------------------------
2 Name: TestPersistentSession.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.client;
7
8 import java.util.logging.Logger;
9 import org.xmlBlaster.util.Global;
10 import org.xmlBlaster.util.SessionName;
11 import org.xmlBlaster.util.XmlBlasterException;
12 import org.xmlBlaster.util.def.ErrorCode;
13 import org.xmlBlaster.util.def.Constants;
14 import org.xmlBlaster.util.property.PropString;
15 import org.xmlBlaster.util.EmbeddedXmlBlaster;
16 import org.xmlBlaster.util.qos.address.Address;
17 import org.xmlBlaster.util.qos.address.CallbackAddress;
18 import org.xmlBlaster.util.MsgUnit;
19 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
20 import org.xmlBlaster.client.qos.PublishQos;
21 import org.xmlBlaster.client.I_Callback;
22 import org.xmlBlaster.client.I_ConnectionStateListener;
23 import org.xmlBlaster.client.I_XmlBlasterAccess;
24 import org.xmlBlaster.client.key.SubscribeKey;
25 import org.xmlBlaster.client.key.UpdateKey;
26 import org.xmlBlaster.client.qos.*;
27
28 import org.xmlBlaster.test.Util;
29 import org.xmlBlaster.test.MsgInterceptor;
30
31 import junit.framework.*;
32
33
34 /**
35 * Tests the persistent sessions .
36 * <br />For a description of what this persistent sessions and subscriptions are
37 * please read the requirement engine.persistence.session.
38 * <p>
39 * This is an interesting example, since it creates a XmlBlaster server instance
40 * in the same JVM , but in a separate thread, talking over CORBA with it.
41 * <p>
42 * Invoke examples:<br />
43 * <pre>
44 * java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestPersistentSession
45 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestPersistentSession
46 * </pre>
47 * @see org.xmlBlaster.client.I_XmlBlasterAccess
48 */
49 public class TestPersistentSession extends TestCase implements I_ConnectionStateListener, I_Callback
50 {
51 private static String ME = "TestPersistentSession";
52 private static final boolean TRANSIENT = false;
53 private static final boolean PERSISTENT = true;
54
55 private Global glob;
56 private Global origGlobal;
57 private Global serverGlobal;
58 private static Logger log = Logger.getLogger(TestPersistentSession.class.getName());
59
60 private int serverPort = 7604;
61 private EmbeddedXmlBlaster serverThread;
62
63 private MsgInterceptor[] updateInterceptors;
64 //private I_XmlBlasterAccess con;
65 private String senderName;
66
67 private int numPublish = 8;
68 private int numStop = 3;
69 private int numStart = 5;
70 private final String contentMime = "text/plain";
71
72 private final long reconnectDelay = 2000L;
73 private boolean failsafeCallback = true;
74 /** the session is persistent from the beginning */
75 private boolean persistent = true;
76 private boolean exactSubscription = false;
77 private boolean initialUpdates = true;
78 private int numSubscribers = 4;
79
80 public TestPersistentSession(String testName) {
81 this(null, testName);
82 }
83
84 public TestPersistentSession(Global glob, String testName) {
85 super(testName);
86 this.origGlobal = glob;
87 this.senderName = testName;
88 this.updateInterceptors = new MsgInterceptor[this.numSubscribers];
89 }
90
91 /**
92 * Sets up the fixture.
93 * <p />
94 * Connect to xmlBlaster and login
95 */
96 protected void setUp() {
97 setup(false);
98 }
99
100
101 private void setup(boolean restrictedEntries) {
102 this.origGlobal = (this.origGlobal == null) ? Global.instance() : this.origGlobal;
103
104
105 this.origGlobal.init(Util.getOtherServerPorts(serverPort));
106 this.glob = this.origGlobal.getClone(null);
107
108 String[] args = null;
109 if (restrictedEntries) {
110 args = new String[] {"-persistence/session/maxEntriesCache", "1",
111 "-persistence/session/maxEntries","2",
112 "-persistence/subscribe/maxEntriesCache", "2",
113 "-persistence/subscribe/maxEntries","3",
114 };
115 }
116 this.serverGlobal = this.origGlobal.getClone(args);
117 serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.serverGlobal);
118 log.info("XmlBlaster is ready for testing on bootstrapPort " + serverPort);
119
120 try { // we just connect and disconnect to make sure all resources are really cleaned up
121 Global tmpGlobal = this.origGlobal.getClone(null);
122 I_XmlBlasterAccess con = tmpGlobal.getXmlBlasterAccess(); // Find orb
123
124 String passwd = "secret";
125 ConnectQos connectQos = new ConnectQos(tmpGlobal, senderName, passwd); // == "<qos>...</qos>";
126 connectQos.setSessionName(new SessionName(tmpGlobal, "general/1"));
127 // set the persistent connection
128 connectQos.setPersistent(this.persistent);
129 // Setup fail save handling for connection ...
130 Address addressProp = new Address(tmpGlobal);
131 addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
132 addressProp.setRetries(-1); // -1 == forever
133 addressProp.setPingInterval(-1L); // switched off
134 con.registerConnectionListener(this);
135 connectQos.setAddress(addressProp);
136
137 // setup failsafe handling for callback ...
138 if (this.failsafeCallback) {
139 CallbackAddress cbAddress = new CallbackAddress(tmpGlobal);
140 cbAddress.setRetries(-1);
141 cbAddress.setPingInterval(-1);
142 cbAddress.setDelay(1000L);
143 cbAddress.setSecretCbSessionId("someSecredSessionId");
144 connectQos.addCallbackAddress(cbAddress);
145 }
146 con.connect(connectQos, this);
147 DisconnectQos disconnectQos = new DisconnectQos(tmpGlobal);
148 con.disconnect(disconnectQos);
149 }
150 catch (XmlBlasterException e) {
151 log.warning("setUp() - login failed: " + e.getMessage());
152 fail("setUp() - login fail: " + e.getMessage());
153 }
154 catch (Exception e) {
155 log.severe("setUp() - login failed: " + e.toString());
156 e.printStackTrace();
157 fail("setUp() - login fail: " + e.toString());
158 }
159
160 try {
161 I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess(); // Find orb
162
163 String passwd = "secret";
164 ConnectQos connectQos = new ConnectQos(this.glob, senderName, passwd); // == "<qos>...</qos>";
165 connectQos.setSessionName(new SessionName(this.glob, "general/1"));
166 // set the persistent connection
167 connectQos.setPersistent(this.persistent);
168 // Setup fail save handling for connection ...
169 Address addressProp = new Address(glob);
170 addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
171 addressProp.setRetries(-1); // -1 == forever
172 addressProp.setPingInterval(-1L); // switched off
173 con.registerConnectionListener(this);
174 connectQos.setAddress(addressProp);
175
176 // setup failsafe handling for callback ...
177 if (this.failsafeCallback) {
178 CallbackAddress cbAddress = new CallbackAddress(this.glob);
179 cbAddress.setRetries(-1);
180 cbAddress.setPingInterval(-1);
181 cbAddress.setDelay(1000L);
182 cbAddress.setSecretCbSessionId("someSecredSessionId");
183 connectQos.addCallbackAddress(cbAddress);
184 }
185
186 con.connect(connectQos, this); // Login to xmlBlaster, register for updates
187 }
188 catch (XmlBlasterException e) {
189 log.warning("setUp() - login failed: " + e.getMessage());
190 fail("setUp() - login fail: " + e.getMessage());
191 }
192 catch (Exception e) {
193 log.severe("setUp() - login failed: " + e.toString());
194 e.printStackTrace();
195 fail("setUp() - login fail: " + e.toString());
196 }
197 }
198
199 /**
200 * Tears down the fixture.
201 * <p />
202 * cleaning up .... erase() the previous message OID and logout
203 */
204 protected void tearDown() {
205 log.info("Entering tearDown(), test is finished");
206 String xmlKey = "<key oid='' queryType='XPATH'>\n" +
207 " //TestPersistentSession-AGENT" +
208 "</key>";
209
210 String qos = "<qos><forceDestroy>true</forceDestroy></qos>";
211 I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
212 try {
213 con.erase(xmlKey, qos);
214
215 PropString defaultPlugin = new PropString("CACHE,1.0");
216 String propName = defaultPlugin.setFromEnv(this.glob, glob.getStrippedId(), null, "persistence", Constants.RELATING_TOPICSTORE, "defaultPlugin");
217 log.info("Lookup of propName=" + propName + " defaultValue=" + defaultPlugin.getValue());
218 }
219 catch(XmlBlasterException e) {
220 log.severe("XmlBlasterException: " + e.getMessage());
221 }
222 finally {
223 con.disconnect(null);
224 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
225 this.serverThread = null;
226 // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
227 Util.resetPorts(this.serverGlobal);
228 Util.resetPorts(this.glob);
229 Util.resetPorts(this.origGlobal);
230 this.glob = null;
231 this.serverGlobal = null;
232 con = null;
233 Global.instance().shutdown();
234 }
235 }
236
237 /**
238 * TEST: Subscribe to messages with XPATH.
239 */
240 private void doSubscribe(int num, boolean isExact, boolean isPersistent) {
241 try {
242 SubscribeKey key = null;
243 if (isExact) key = new SubscribeKey(this.glob, "Message-1");
244 else key = new SubscribeKey(this.glob, "//TestPersistentSession-AGENT", "XPATH");
245
246 SubscribeQos qos = new SubscribeQos(this.glob); // "<qos><persistent>true</persistent></qos>";
247 qos.setPersistent(isPersistent);
248 qos.setWantInitialUpdate(this.initialUpdates);
249 qos.setWantNotify(false); // to avoig getting erased messages
250
251 this.updateInterceptors[num] = new MsgInterceptor(this.glob, log, null); // Collect received msgs
252 this.updateInterceptors[num].setLogPrefix("interceptor-" + num);
253 SubscribeReturnQos subscriptionId = this.glob.getXmlBlasterAccess().subscribe(key, qos, this.updateInterceptors[num]);
254
255 log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done");
256 assertTrue("returned null subscriptionId", subscriptionId != null);
257 } catch(XmlBlasterException e) {
258 log.warning("XmlBlasterException: " + e.getMessage());
259 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
260 }
261 }
262
263 /**
264 * TEST: Construct a message and publish it.
265 * <p />
266 */
267 public void doPublish(int counter) throws XmlBlasterException {
268 String oid = "Message" + "-" + counter;
269 log.info("Publishing a message " + oid + " ...");
270 String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'>\n" +
271 " <TestPersistentSession-AGENT id='192.168.124.10' subId='1' type='generic'>" +
272 " </TestPersistentSession-AGENT>" +
273 "</key>";
274 String content = "" + counter;
275 PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>"
276 MsgUnit msgUnit = new MsgUnit(xmlKey, content.getBytes(), qosWrapper.toXml());
277
278 this.glob.getXmlBlasterAccess().publish(msgUnit);
279 log.info("Success: Publishing of " + oid + " done");
280 }
281
282 /**
283 * TEST: <br />
284 */
285 public void persistentSession(boolean doStop) {
286 //doSubscribe(); -> see reachedAlive()
287 log.info("Going to publish " + numPublish + " messages, xmlBlaster will be down for message 3 and 4");
288 //
289 doSubscribe(0, this.exactSubscription, TRANSIENT);
290 doSubscribe(1, this.exactSubscription, PERSISTENT);
291
292 for (int i=0; i<numPublish; i++) {
293 try {
294 if (i == numStop) { // 3
295 if (doStop) {
296 log.info("Stopping xmlBlaster, but continue with publishing ...");
297 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
298 this.serverThread = null;
299 }
300 else {
301 log.info("changing run level but continue with publishing ...");
302 this.serverThread.changeRunlevel(0, true);
303 }
304 }
305 if (i == numStart) {
306 if (doStop) {
307 log.info("Starting xmlBlaster again, expecting the previous published two messages ...");
308 // serverThread = EmbeddedXmlBlaster.startXmlBlaster(serverPort);
309 serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.serverGlobal);
310 log.info("xmlBlaster started, waiting on tail back messsages");
311 }
312 else {
313 log.info("changing runlevel again to runlevel 9. Expecting the previous published two messages ...");
314 this.serverThread.changeRunlevel(9, true);
315 log.info("xmlBlaster runlevel 9 reached, waiting on tail back messsages");
316 }
317
318 // Message-4 We need to wait until the client reconnected (reconnect interval)
319 // Message-5
320 assertEquals("", 2, this.updateInterceptors[1].waitOnUpdate(reconnectDelay*2L, 2));
321 assertEquals("", 2, this.updateInterceptors[3].waitOnUpdate(reconnectDelay*2L, 2));
322
323 for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear();
324 }
325 doPublish(i+1);
326 if (i == 0) {
327 doSubscribe(2, this.exactSubscription, TRANSIENT);
328 doSubscribe(3, this.exactSubscription, PERSISTENT);
329 }
330
331 if (i < numStop || i >= numStart ) {
332 int n = 1;
333 if (i == 0 && !this.initialUpdates) n = 0;
334 assertEquals("Message nr. " + (i+1), 1, this.updateInterceptors[1].waitOnUpdate(4000L, 1));
335 assertEquals("Message nr. " + (i+1), n, this.updateInterceptors[3].waitOnUpdate(4000L, n));
336 }
337 for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear();
338 }
339 catch(XmlBlasterException e) {
340 if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_POLLING)
341 log.warning("Lost connection, my connection layer is polling: " + e.getMessage());
342 else if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_DEAD)
343 assertTrue("Lost connection, my connection layer is NOT polling", false);
344 else
345 assertTrue("Publishing problems: " + e.getMessage(), false);
346 }
347 }
348 doSubscribe(0, this.exactSubscription, TRANSIENT);
349 doSubscribe(1, this.exactSubscription, PERSISTENT);
350 }
351
352 /**
353 * This is the callback method invoked from I_XmlBlasterAccess
354 * informing the client in an asynchronous mode if the connection was established.
355 * <p />
356 * This method is enforced through interface I_ConnectionStateListener
357 */
358 public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
359 log.info("I_ConnectionStateListener: We were lucky, reconnected to xmlBlaster");
360 // doSubscribe(); // initialize on startup and on reconnect
361 }
362
363 public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
364 log.warning("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING);
365 }
366
367 public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
368 log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
369 }
370
371 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
372 String contentStr = new String(content);
373 String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;
374 log.info("Receiving update of a message oid=" + updateKey.getOid() +
375 " priority=" + updateQos.getPriority() +
376 " state=" + updateQos.getState() +
377 " content=" + cont);
378 log.info("further log for receiving update of a message cbSessionId=" + cbSessionId +
379 updateKey.toXml() + "\n" + new String(content) + updateQos.toXml());
380 log.severe("update: should never be invoked (msgInterceptors take care of it since they are passed on subscriptions)");
381
382 return "OK";
383 }
384
385
386 public void testXPathInitialStop() {
387 this.exactSubscription = false;
388 this.initialUpdates = true;
389 persistentSession(true);
390 }
391
392 public void testXPathNoInitialStop() {
393 this.exactSubscription = false;
394 this.initialUpdates = false;
395 persistentSession(true);
396 }
397
398 public void testXPathInitialRunlevelChange() {
399 this.persistent = true;
400 this.exactSubscription = false;
401 this.initialUpdates = true;
402 persistentSession(false);
403 }
404
405 // -----------------------------------------------------------------
406 private Global createConnection(Global parentGlobal, String sessionName, boolean isPersistent, boolean expectEx) {
407 try {
408 Global ret = parentGlobal.getClone(null);
409 I_XmlBlasterAccess con = ret.getXmlBlasterAccess(); // Find orb
410 ConnectQos connectQos = new ConnectQos(glob); // == "<qos>...</qos>";
411 connectQos.setSessionName(new SessionName(ret, sessionName));
412 // set the persistent connection
413 connectQos.setPersistent(isPersistent);
414 // Setup fail save handling for connection ...
415 Address addressProp = new Address(glob);
416 addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
417 addressProp.setRetries(-1); // -1 == forever
418 addressProp.setPingInterval(-1L); // switched off
419 connectQos.setAddress(addressProp);
420
421 // setup failsafe handling for callback ...
422 if (this.failsafeCallback) {
423 CallbackAddress cbAddress = new CallbackAddress(this.glob);
424 cbAddress.setRetries(-1);
425 cbAddress.setPingInterval(-1);
426 cbAddress.setDelay(1000L);
427 connectQos.addCallbackAddress(cbAddress);
428 }
429 con.connect(connectQos, this); // Login to xmlBlaster, register for updates
430 if (expectEx) assertTrue("an exception was expected here because of overflow: Configuration of session queue probably not working", false);
431 return ret;
432 }
433 catch (XmlBlasterException ex) {
434 if (expectEx) log.info("createConnection: exception was OK since overflow was expected");
435 else assertTrue("an exception should not occur here", false);
436 }
437 return null; //to make compiler happy
438 }
439
440
441 /**
442 * Tests the requirement:
443 * - If the storage for the sessions is overflown, it should throw an exception
444 *
445 */
446 public void testOverflow() {
447 // to change the configuration on server side (limit the queue sizes)
448 tearDown();
449 setup(true);
450 Global[] globals = new Global[5];
451 try {
452 globals[0] = createConnection(this.origGlobal, "bjoern/1", true , false);
453 globals[1] = createConnection(this.origGlobal, "fritz/2", false, false);
454 globals[3] = createConnection(this.origGlobal, "dimitri/3", true , true); // <-- exception (since main connection also persistent)
455 globals[2] = createConnection(this.origGlobal, "pandora/4", false , false); // OK since transient
456 globals[4] = createConnection(this.origGlobal, "jonny/5", true, true);
457 }
458 finally {
459 for (int i=0; i < globals.length; i++) {
460 if (globals[i] != null) globals[i].getXmlBlasterAccess().disconnect(new DisconnectQos(globals[i]));
461 }
462 }
463 }
464
465 /**
466 * Invoke: java org.xmlBlaster.test.client.TestPersistentSession
467 * <p />
468 * @deprecated Use the TestRunner from the testsuite to run it:<p />
469 * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestPersistentSession</pre>
470 */
471 public static void main(String args[])
472 {
473 Global glob = new Global();
474 if (glob.init(args) != 0) {
475 System.out.println(ME + ": Init failed");
476 System.exit(1);
477 }
478
479 TestPersistentSession testSub = new TestPersistentSession(glob, "TestPersistentSession/1");
480
481 testSub.setUp();
482 testSub.testXPathInitialStop();
483 testSub.tearDown();
484
485 testSub.setUp();
486 testSub.testXPathNoInitialStop();
487 testSub.tearDown();
488
489 testSub.setUp();
490 testSub.testXPathInitialRunlevelChange();
491 testSub.tearDown();
492
493 testSub.setUp();
494 testSub.testOverflow();
495 testSub.tearDown();
496 }
497 }
syntax highlighted by Code2HTML, v. 0.9.1