1 /*------------------------------------------------------------------------------
  2 Name:      TestPersistentSession.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.client;
  7 
  8 import java.util.logging.Logger;
  9 import org.xmlBlaster.util.Global;
 10 import org.xmlBlaster.util.SessionName;
 11 import org.xmlBlaster.util.XmlBlasterException;
 12 import org.xmlBlaster.util.def.ErrorCode;
 13 import org.xmlBlaster.util.def.Constants;
 14 import org.xmlBlaster.util.property.PropString;
 15 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 16 import org.xmlBlaster.util.qos.address.Address;
 17 import org.xmlBlaster.util.qos.address.CallbackAddress;
 18 import org.xmlBlaster.util.MsgUnit;
 19 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 20 import org.xmlBlaster.client.qos.PublishQos;
 21 import org.xmlBlaster.client.I_Callback;
 22 import org.xmlBlaster.client.I_ConnectionStateListener;
 23 import org.xmlBlaster.client.I_XmlBlasterAccess;
 24 import org.xmlBlaster.client.key.SubscribeKey;
 25 import org.xmlBlaster.client.key.UpdateKey;
 26 import org.xmlBlaster.client.qos.*;
 27 
 28 import org.xmlBlaster.test.Util;
 29 import org.xmlBlaster.test.MsgInterceptor;
 30 
 31 import junit.framework.*;
 32 
 33 
 34 /**
 35  * Tests the persistent sessions .
 36  * <br />For a description of what this persistent sessions and subscriptions are
 37  * please read the requirement engine.persistence.session.
 38  * <p>
 39  * This is an interesting example, since it creates a XmlBlaster server instance
 40  * in the same JVM , but in a separate thread, talking over CORBA with it.
 41  * <p>
 42  * Invoke examples:<br />
 43  * <pre>
 44  *   java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestPersistentSession
 45  *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestPersistentSession
 46  * </pre>
 47  * @see org.xmlBlaster.client.I_XmlBlasterAccess
 48  */
 49 public class TestPersistentSession extends TestCase implements I_ConnectionStateListener, I_Callback
 50 {
 51    private static String ME = "TestPersistentSession";
 52    private static final boolean TRANSIENT = false;
 53    private static final boolean PERSISTENT = true;
 54    
 55    private Global glob;
 56    private Global origGlobal;
 57    private Global serverGlobal;
 58    private static Logger log = Logger.getLogger(TestPersistentSession.class.getName());
 59 
 60    private int serverPort = 7604;
 61    private EmbeddedXmlBlaster serverThread;
 62 
 63    private MsgInterceptor[] updateInterceptors;
 64    //private I_XmlBlasterAccess con;
 65    private String senderName;
 66 
 67    private int numPublish = 8;
 68    private int numStop = 3;
 69    private int numStart = 5;
 70    private final String contentMime = "text/plain";
 71 
 72    private final long reconnectDelay = 2000L;
 73    private boolean failsafeCallback = true;
 74    /** the session is persistent from the beginning */
 75    private boolean persistent = true;
 76    private boolean exactSubscription = false;
 77    private boolean initialUpdates = true;
 78    private int numSubscribers = 4;
 79 
 80    public TestPersistentSession(String testName) {
 81       this(null, testName);
 82    }
 83 
 84    public TestPersistentSession(Global glob, String testName) {
 85       super(testName);
 86       this.origGlobal = glob;
 87       this.senderName = testName;
 88       this.updateInterceptors = new MsgInterceptor[this.numSubscribers];
 89    }
 90 
 91    /**
 92     * Sets up the fixture.
 93     * <p />
 94     * Connect to xmlBlaster and login
 95     */
 96    protected void setUp() {
 97       setup(false);
 98    }
 99    
100    
101    private void setup(boolean restrictedEntries) {
102       this.origGlobal = (this.origGlobal == null) ? Global.instance() : this.origGlobal;
103 
104       
105       this.origGlobal.init(Util.getOtherServerPorts(serverPort));
106       this.glob = this.origGlobal.getClone(null);
107 
108       String[] args = null;
109       if (restrictedEntries) {
110          args = new String[] {"-persistence/session/maxEntriesCache", "1",
111                        "-persistence/session/maxEntries","2",
112                        "-persistence/subscribe/maxEntriesCache", "2",
113                        "-persistence/subscribe/maxEntries","3",
114                       };
115       }
116       this.serverGlobal = this.origGlobal.getClone(args);
117       serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.serverGlobal);
118       log.info("XmlBlaster is ready for testing on bootstrapPort " + serverPort);
119 
120       try { // we just connect and disconnect to make sure all resources are really cleaned up
121          Global tmpGlobal = this.origGlobal.getClone(null);
122          I_XmlBlasterAccess con = tmpGlobal.getXmlBlasterAccess(); // Find orb
123 
124          String passwd = "secret";
125          ConnectQos connectQos = new ConnectQos(tmpGlobal, senderName, passwd); // == "<qos>...</qos>";
126          connectQos.setSessionName(new SessionName(tmpGlobal, "general/1"));
127          // set the persistent connection 
128          connectQos.setPersistent(this.persistent);
129          // Setup fail save handling for connection ...
130          Address addressProp = new Address(tmpGlobal);
131          addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
132          addressProp.setRetries(-1);       // -1 == forever
133          addressProp.setPingInterval(-1L); // switched off
134          con.registerConnectionListener(this);
135          connectQos.setAddress(addressProp);
136          
137          // setup failsafe handling for callback ...
138          if (this.failsafeCallback) {
139             CallbackAddress cbAddress = new CallbackAddress(tmpGlobal);
140             cbAddress.setRetries(-1);
141             cbAddress.setPingInterval(-1);
142             cbAddress.setDelay(1000L);
143             cbAddress.setSecretCbSessionId("someSecredSessionId");
144             connectQos.addCallbackAddress(cbAddress);
145          }
146          con.connect(connectQos, this);
147          DisconnectQos disconnectQos = new DisconnectQos(tmpGlobal);
148          con.disconnect(disconnectQos);
149       }
150       catch (XmlBlasterException e) {
151           log.warning("setUp() - login failed: " + e.getMessage());
152           fail("setUp() - login fail: " + e.getMessage());
153       }
154       catch (Exception e) {
155           log.severe("setUp() - login failed: " + e.toString());
156           e.printStackTrace();
157           fail("setUp() - login fail: " + e.toString());
158       }
159       
160       try {
161          I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess(); // Find orb
162 
163          String passwd = "secret";
164          ConnectQos connectQos = new ConnectQos(this.glob, senderName, passwd); // == "<qos>...</qos>";
165          connectQos.setSessionName(new SessionName(this.glob, "general/1"));
166          // set the persistent connection 
167          connectQos.setPersistent(this.persistent);
168          // Setup fail save handling for connection ...
169          Address addressProp = new Address(glob);
170          addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
171          addressProp.setRetries(-1);       // -1 == forever
172          addressProp.setPingInterval(-1L); // switched off
173          con.registerConnectionListener(this);
174          connectQos.setAddress(addressProp);
175          
176          // setup failsafe handling for callback ...
177          if (this.failsafeCallback) {
178             CallbackAddress cbAddress = new CallbackAddress(this.glob);
179             cbAddress.setRetries(-1);
180             cbAddress.setPingInterval(-1);
181             cbAddress.setDelay(1000L);
182             cbAddress.setSecretCbSessionId("someSecredSessionId");
183             connectQos.addCallbackAddress(cbAddress);
184          }
185 
186          con.connect(connectQos, this);  // Login to xmlBlaster, register for updates
187       }
188       catch (XmlBlasterException e) {
189           log.warning("setUp() - login failed: " + e.getMessage());
190           fail("setUp() - login fail: " + e.getMessage());
191       }
192       catch (Exception e) {
193           log.severe("setUp() - login failed: " + e.toString());
194           e.printStackTrace();
195           fail("setUp() - login fail: " + e.toString());
196       }
197    }
198 
199    /**
200     * Tears down the fixture.
201     * <p />
202     * cleaning up .... erase() the previous message OID and logout
203     */
204    protected void tearDown() {
205       log.info("Entering tearDown(), test is finished");
206       String xmlKey = "<key oid='' queryType='XPATH'>\n" +
207                       "   //TestPersistentSession-AGENT" +
208                       "</key>";
209 
210       String qos = "<qos><forceDestroy>true</forceDestroy></qos>";
211       I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
212       try {
213          con.erase(xmlKey, qos);
214 
215          PropString defaultPlugin = new PropString("CACHE,1.0");
216          String propName = defaultPlugin.setFromEnv(this.glob, glob.getStrippedId(), null, "persistence", Constants.RELATING_TOPICSTORE, "defaultPlugin");
217          log.info("Lookup of propName=" + propName + " defaultValue=" + defaultPlugin.getValue());
218       }
219       catch(XmlBlasterException e) {
220          log.severe("XmlBlasterException: " + e.getMessage());
221       }
222       finally {
223          con.disconnect(null);
224          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
225          this.serverThread = null;
226          // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
227          Util.resetPorts(this.serverGlobal);
228          Util.resetPorts(this.glob);
229          Util.resetPorts(this.origGlobal);
230          this.glob = null;
231          this.serverGlobal = null;
232          con = null;
233          Global.instance().shutdown();
234       }
235    }
236 
237    /**
238     * TEST: Subscribe to messages with XPATH.
239     */
240    private void doSubscribe(int num, boolean isExact, boolean isPersistent) {
241       try {
242          SubscribeKey key = null;
243          if (isExact)  key = new SubscribeKey(this.glob, "Message-1");
244          else key = new SubscribeKey(this.glob, "//TestPersistentSession-AGENT", "XPATH");
245 
246          SubscribeQos qos = new SubscribeQos(this.glob); // "<qos><persistent>true</persistent></qos>";
247          qos.setPersistent(isPersistent);
248          qos.setWantInitialUpdate(this.initialUpdates);
249          qos.setWantNotify(false); // to avoig getting erased messages
250 
251          this.updateInterceptors[num] = new MsgInterceptor(this.glob, log, null); // Collect received msgs
252          this.updateInterceptors[num].setLogPrefix("interceptor-" + num);
253          SubscribeReturnQos subscriptionId = this.glob.getXmlBlasterAccess().subscribe(key, qos, this.updateInterceptors[num]);
254 
255          log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done");
256          assertTrue("returned null subscriptionId", subscriptionId != null);
257       } catch(XmlBlasterException e) {
258          log.warning("XmlBlasterException: " + e.getMessage());
259          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
260       }
261    }
262  
263    /**
264     * TEST: Construct a message and publish it.
265     * <p />
266     */
267    public void doPublish(int counter) throws XmlBlasterException {
268       String oid = "Message" + "-" + counter;
269       log.info("Publishing a message " + oid + " ...");
270       String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'>\n" +
271                       "   <TestPersistentSession-AGENT id='192.168.124.10' subId='1' type='generic'>" +
272                       "   </TestPersistentSession-AGENT>" +
273                       "</key>";
274       String content = "" + counter;
275       PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>"
276       MsgUnit msgUnit = new MsgUnit(xmlKey, content.getBytes(), qosWrapper.toXml());
277 
278       this.glob.getXmlBlasterAccess().publish(msgUnit);
279       log.info("Success: Publishing of " + oid + " done");
280    }
281 
282    /**
283     * TEST: <br />
284     */
285    public void persistentSession(boolean doStop) {
286       //doSubscribe(); -> see reachedAlive()
287       log.info("Going to publish " + numPublish + " messages, xmlBlaster will be down for message 3 and 4");
288       // 
289       doSubscribe(0, this.exactSubscription, TRANSIENT);
290       doSubscribe(1, this.exactSubscription, PERSISTENT);
291       
292       for (int i=0; i<numPublish; i++) {
293          try {
294             if (i == numStop) { // 3
295                if (doStop) {
296                   log.info("Stopping xmlBlaster, but continue with publishing ...");
297                   EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
298                   this.serverThread = null;
299                }
300                else {
301                   log.info("changing run level but continue with publishing ...");
302                   this.serverThread.changeRunlevel(0, true);
303                }
304             }
305             if (i == numStart) {
306                if (doStop) {
307                   log.info("Starting xmlBlaster again, expecting the previous published two messages ...");
308                   // serverThread = EmbeddedXmlBlaster.startXmlBlaster(serverPort);
309                   serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.serverGlobal);
310                   log.info("xmlBlaster started, waiting on tail back messsages");
311                }
312                else {
313                   log.info("changing runlevel again to runlevel 9. Expecting the previous published two messages ...");
314                   this.serverThread.changeRunlevel(9, true);
315                   log.info("xmlBlaster runlevel 9 reached, waiting on tail back messsages");
316                }
317                
318                // Message-4 We need to wait until the client reconnected (reconnect interval)
319                // Message-5
320                assertEquals("", 2, this.updateInterceptors[1].waitOnUpdate(reconnectDelay*2L, 2));
321                assertEquals("", 2, this.updateInterceptors[3].waitOnUpdate(reconnectDelay*2L, 2));
322                
323                for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear();
324             }
325             doPublish(i+1);
326             if (i == 0) {
327                doSubscribe(2, this.exactSubscription, TRANSIENT);
328                doSubscribe(3, this.exactSubscription, PERSISTENT);
329             }
330 
331             if (i < numStop || i >= numStart ) {
332                int n = 1;
333                if (i == 0 && !this.initialUpdates) n = 0;
334                assertEquals("Message nr. " + (i+1), 1, this.updateInterceptors[1].waitOnUpdate(4000L, 1));
335                assertEquals("Message nr. " + (i+1), n, this.updateInterceptors[3].waitOnUpdate(4000L, n));
336             }
337             for (int j=0; j < this.numSubscribers; j++) this.updateInterceptors[j].clear();
338          }
339          catch(XmlBlasterException e) {
340             if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_POLLING)
341                log.warning("Lost connection, my connection layer is polling: " + e.getMessage());
342             else if (e.getErrorCode() == ErrorCode.COMMUNICATION_NOCONNECTION_DEAD)
343                assertTrue("Lost connection, my connection layer is NOT polling", false);
344             else
345                assertTrue("Publishing problems: " + e.getMessage(), false);
346          }
347       }
348       doSubscribe(0, this.exactSubscription, TRANSIENT);
349       doSubscribe(1, this.exactSubscription, PERSISTENT);
350    }
351 
352    /**
353     * This is the callback method invoked from I_XmlBlasterAccess
354     * informing the client in an asynchronous mode if the connection was established.
355     * <p />
356     * This method is enforced through interface I_ConnectionStateListener
357     */
358    public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
359       log.info("I_ConnectionStateListener: We were lucky, reconnected to xmlBlaster");
360       // doSubscribe();    // initialize on startup and on reconnect
361    }
362 
363    public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
364       log.warning("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING);
365    }
366 
367    public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
368       log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
369    }
370 
371    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
372       String contentStr = new String(content);
373       String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;
374       log.info("Receiving update of a message oid=" + updateKey.getOid() +
375                         " priority=" + updateQos.getPriority() +
376                         " state=" + updateQos.getState() +
377                         " content=" + cont);
378       log.info("further log for receiving update of a message cbSessionId=" + cbSessionId +
379                      updateKey.toXml() + "\n" + new String(content) + updateQos.toXml());
380       log.severe("update: should never be invoked (msgInterceptors take care of it since they are passed on subscriptions)");
381 
382       return "OK";
383    }
384 
385 
386    public void testXPathInitialStop() {
387       this.exactSubscription = false;
388       this.initialUpdates = true;
389       persistentSession(true);
390    }
391 
392    public void testXPathNoInitialStop() {
393       this.exactSubscription = false;
394       this.initialUpdates = false;
395       persistentSession(true);
396    }
397 
398    public void testXPathInitialRunlevelChange() {
399       this.persistent = true;
400       this.exactSubscription = false;
401       this.initialUpdates = true;
402       persistentSession(false);
403    }
404 
405    // -----------------------------------------------------------------
406    private Global createConnection(Global parentGlobal, String sessionName, boolean isPersistent, boolean expectEx) {
407       try {
408          Global ret = parentGlobal.getClone(null);
409          I_XmlBlasterAccess con = ret.getXmlBlasterAccess(); // Find orb
410          ConnectQos connectQos = new ConnectQos(glob); // == "<qos>...</qos>";
411          connectQos.setSessionName(new SessionName(ret, sessionName));
412          // set the persistent connection 
413          connectQos.setPersistent(isPersistent);
414          // Setup fail save handling for connection ...
415          Address addressProp = new Address(glob);
416          addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
417          addressProp.setRetries(-1);       // -1 == forever
418          addressProp.setPingInterval(-1L); // switched off
419          connectQos.setAddress(addressProp);
420       
421          // setup failsafe handling for callback ...
422          if (this.failsafeCallback) {
423             CallbackAddress cbAddress = new CallbackAddress(this.glob);
424             cbAddress.setRetries(-1);
425             cbAddress.setPingInterval(-1);
426             cbAddress.setDelay(1000L);
427             connectQos.addCallbackAddress(cbAddress);
428          }
429          con.connect(connectQos, this);  // Login to xmlBlaster, register for updates
430          if (expectEx) assertTrue("an exception was expected here because of overflow: Configuration of session queue probably not working", false);
431          return ret;
432       }
433       catch (XmlBlasterException ex) {
434          if (expectEx) log.info("createConnection: exception was OK since overflow was expected");
435          else assertTrue("an exception should not occur here", false);
436       }
437       return null; //to make compiler happy
438    }      
439 
440    
441    /**
442     * Tests the requirement:
443     * - If the storage for the sessions is overflown, it should throw an exception
444     *
445     */
446    public void testOverflow() {
447       // to change the configuration on server side (limit the queue sizes)
448       tearDown();
449       setup(true);
450       Global[] globals = new Global[5];
451       try {
452          globals[0] = createConnection(this.origGlobal, "bjoern/1", true , false);
453          globals[1] = createConnection(this.origGlobal, "fritz/2", false, false);
454          globals[3] = createConnection(this.origGlobal, "dimitri/3", true , true); // <-- exception (since main connection also persistent)
455          globals[2] = createConnection(this.origGlobal, "pandora/4", false , false); // OK since transient
456          globals[4] = createConnection(this.origGlobal, "jonny/5", true, true);
457       }
458       finally {
459          for (int i=0; i < globals.length; i++) {
460             if (globals[i] != null) globals[i].getXmlBlasterAccess().disconnect(new DisconnectQos(globals[i]));
461          }
462       }
463    }
464 
465    /**
466     * Invoke: java org.xmlBlaster.test.client.TestPersistentSession
467     * <p />
468     * @deprecated Use the TestRunner from the testsuite to run it:<p />
469     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestPersistentSession</pre>
470     */
471    public static void main(String args[])
472    {
473       Global glob = new Global();
474       if (glob.init(args) != 0) {
475          System.out.println(ME + ": Init failed");
476          System.exit(1);
477       }
478 
479       TestPersistentSession testSub = new TestPersistentSession(glob, "TestPersistentSession/1");
480 
481       testSub.setUp();
482       testSub.testXPathInitialStop();
483       testSub.tearDown();
484 
485       testSub.setUp();
486       testSub.testXPathNoInitialStop();
487       testSub.tearDown();
488 
489       testSub.setUp();
490       testSub.testXPathInitialRunlevelChange();
491       testSub.tearDown();
492 
493       testSub.setUp();
494       testSub.testOverflow();
495       testSub.tearDown();
496    }
497 }


syntax highlighted by Code2HTML, v. 0.9.1