1 /*------------------------------------------------------------------------------
  2 Name:      TestSessionReconnect.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.authentication;
  7 
  8 import java.util.logging.Logger;
  9 import org.xmlBlaster.util.Global;
 10 import org.xmlBlaster.util.qos.HistoryQos;
 11 import org.xmlBlaster.util.def.PriorityEnum;
 12 import org.xmlBlaster.util.def.Constants;
 13 import org.xmlBlaster.util.qos.TopicProperty;
 14 import org.xmlBlaster.util.qos.address.CallbackAddress;
 15 import org.xmlBlaster.client.qos.ConnectQos;
 16 import org.xmlBlaster.client.qos.ConnectReturnQos;
 17 import org.xmlBlaster.util.XmlBlasterException;
 18 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 19 import org.xmlBlaster.client.key.PublishKey;
 20 import org.xmlBlaster.client.key.SubscribeKey;
 21 import org.xmlBlaster.client.qos.PublishQos;
 22 import org.xmlBlaster.client.qos.PublishReturnQos;
 23 import org.xmlBlaster.client.qos.SubscribeQos;
 24 import org.xmlBlaster.client.I_XmlBlasterAccess;
 25 import org.xmlBlaster.util.MsgUnit;
 26 
 27 import org.xmlBlaster.test.Util;
 28 import org.xmlBlaster.test.MsgInterceptor;
 29 
 30 import junit.framework.*;
 31 
 32 
 33 /**
 34  * This client does test if a subscriber can reconnect to its session and 
 35  * its callback queue holded the messages during downtime. 
 36  * <p>
 37  * This client may be invoked multiple time on the same xmlBlaster server,
 38  * as it cleans up everything after his tests are done.
 39  * </p>
 40  * <p>
 41  * Invoke examples:
 42  * </p>
 43  * <pre>
 44  *    java junit.textui.TestRunner org.xmlBlaster.test.authentication.TestSessionReconnect
 45  *    java junit.swingui.TestRunner -noloading org.xmlBlaster.test.authentication.TestSessionReconnect
 46  * </pre>
 47  */
 48 public class TestSessionReconnect extends TestCase
 49 {
 50    private final Global glob;
 51    private static Logger log = Logger.getLogger(TestSessionReconnect.class.getName());
 52    private String passwd = "secret";
 53    private int serverPort = 7615;
 54    private String oid = "TestSessionReconnect.Msg";
 55    private EmbeddedXmlBlaster serverThread = null;
 56    private String sessionNameSub = "TestSessionReconnectSubscriber";
 57    private I_XmlBlasterAccess conSub;
 58    private I_XmlBlasterAccess conSub2;
 59    private MsgInterceptor updateInterceptorSub;
 60 
 61    private String sessionNamePub = "TestSessionReconnectPublisher";
 62    private I_XmlBlasterAccess conPub;
 63 
 64    /** For Junit */
 65    public TestSessionReconnect() {
 66       this(new Global(), "TestSessionReconnect");
 67    }
 68 
 69    /**
 70     * Constructs the TestSessionReconnect object.
 71     * <p />
 72     * @param testName   The name used in the test suite and to login to xmlBlaster
 73     */
 74    public TestSessionReconnect(Global glob, String testName) {
 75        super(testName);
 76        this.glob = glob;
 77 
 78    }
 79 
 80    /**
 81     * Sets up the fixture.
 82     * <p />
 83     * Connect to xmlBlaster and login
 84     */
 85    protected void setUp() {
 86       glob.init(Util.getOtherServerPorts(serverPort));
 87       serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
 88       log.info("XmlBlaster is ready for testing");
 89    }
 90 
 91    /**
 92     * Cleaning up. 
 93     */
 94    protected void tearDown() {
 95       try { Thread.sleep(1000);} catch(Exception ex) {} 
 96       if (serverThread != null)
 97          serverThread.stopServer(true);
 98       // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
 99       Util.resetPorts();
100    }
101 
102    /**
103     */
104    public void testSessionReconnect() {
105       log.info("testSessionReconnect("+sessionNameSub+") ...");
106 
107       try {
108          log.info("============ STEP 1: Start subscriber");
109 
110          Global globSub = glob.getClone(null);
111          // A testsuite helper to collect update messages
112          this.updateInterceptorSub = new MsgInterceptor(globSub, log, null);
113 
114          conSub = globSub.getXmlBlasterAccess();
115          
116          ConnectReturnQos crqSub = null;
117          {
118             ConnectQos qosSub = new ConnectQos(globSub, sessionNameSub, passwd);
119 
120             CallbackAddress addr = new CallbackAddress(globSub);
121             addr.setRetries(-1);
122             String secretCbSessionId = "TrustMeSub";
123             addr.setSecretCbSessionId(secretCbSessionId);
124             qosSub.getSessionCbQueueProperty().setCallbackAddress(addr);
125 
126             log.info("First subscribe connect QoS = " + qosSub.toXml());
127             crqSub = conSub.connect(qosSub, this.updateInterceptorSub); // Login to xmlBlaster
128             log.info("Connect as subscriber '" + crqSub.getSessionName() + "' success");
129          }
130 
131          SubscribeKey sk = new SubscribeKey(globSub, oid);
132          SubscribeQos sq = new SubscribeQos(globSub);
133          sq.setWantInitialUpdate(false);
134          sq.setWantLocal(true);
135          sq.setWantContent(true);
136          
137          HistoryQos historyQos = new HistoryQos(globSub);
138          historyQos.setNumEntries(1);
139          sq.setHistoryQos(historyQos);
140 
141          /*SubscribeReturnQos srq = */conSub.subscribe(sk.toXml(), sq.toXml());
142          log.info("Subscription to '" + oid + "' done");
143 
144          log.info("============ STEP 2: Start publisher");
145          Global globPub = glob.getClone(null);
146          conPub = globPub.getXmlBlasterAccess();
147          ConnectQos qosPub = new ConnectQos(globPub, sessionNamePub, passwd);
148          ConnectReturnQos crqPub = conPub.connect(qosPub, null);  // Login to xmlBlaster, no updates
149          log.info("Connect success as " + crqPub.getSessionName());
150 
151          log.info("============ STEP 3: Stop subscriber callback");
152          try {
153             conSub.getCbServer().shutdown();
154          }
155          catch (XmlBlasterException e) {
156             fail("ShutdownCB: " + e.getMessage());
157          }
158 
159          log.info("============ STEP 4: Publish messages");
160          int numPub = 8;
161          MsgUnit[] sentArr = new MsgUnit[numPub];
162          PublishReturnQos[] sentQos = new PublishReturnQos[numPub];
163          for(int i=0; i<numPub; i++) {
164             PublishKey pk = new PublishKey(globPub, oid, "text/xml", "1.0");
165             pk.setClientTags("<org.xmlBlaster><demo/></org.xmlBlaster>");
166             PublishQos pq = new PublishQos(globPub);
167             pq.setPriority(PriorityEnum.NORM_PRIORITY);
168             pq.setPersistent(false);
169             pq.setLifeTime(60000L);
170             if (i == 0) {
171                TopicProperty topicProperty = new TopicProperty(globPub);
172                topicProperty.setDestroyDelay(60000L);
173                topicProperty.setCreateDomEntry(true);
174                topicProperty.setReadonly(false);
175                topicProperty.getHistoryQueueProperty().setMaxEntries(numPub+5);
176                pq.setTopicProperty(topicProperty);
177                log.info("Added TopicProperty on first publish: " + topicProperty.toXml());
178             }
179 
180             byte[] content = "Hello".getBytes();
181             MsgUnit msgUnit = new MsgUnit(pk, content, pq);
182             sentArr[i] = msgUnit;
183             PublishReturnQos prq = conPub.publish(msgUnit);
184             sentQos[i] = prq;
185             log.info("Got status='" + prq.getState() + "' rcvTimestamp=" + prq.getRcvTimestamp().toString() +
186                         " for published message '" + prq.getKeyOid() + "'");
187          }
188 
189          log.info("============ STEP 5: Start subscriber callback with same public sessionId");
190          Global globSub2 = glob.getClone(null);
191          MsgInterceptor updateInterceptorSub2 = new MsgInterceptor(globSub2, log, null);
192          updateInterceptorSub2.setLogPrefix("TrustMeSub2");
193 
194          conSub2 = globSub2.getXmlBlasterAccess(); // Create a new client
195          String secretCbSessionId2 = "TrustMeSub2";
196          {
197             ConnectQos qosSub = new ConnectQos(globSub, sessionNameSub, passwd);
198             CallbackAddress addr = new CallbackAddress(globSub);
199             addr.setRetries(-1);
200             addr.setSecretCbSessionId(secretCbSessionId2);
201             qosSub.getSessionCbQueueProperty().setCallbackAddress(addr);
202             qosSub.getSessionQos().setSessionName(crqSub.getSessionQos().getSessionName());
203 
204             log.info("Second subscribe connect QoS = " + qosSub.toXml());
205             ConnectReturnQos crqSub2 = conSub2.connect(qosSub, updateInterceptorSub2); // Login to xmlBlaster
206             log.info("Connect as subscriber '" + crqSub2.getSessionName() + "' success");
207          }
208 
209          assertEquals("", 0, updateInterceptorSub.count()); // The first login session should not receive anything
210 
211          assertEquals("", numPub, updateInterceptorSub2.waitOnUpdate(4000L, oid, Constants.STATE_OK));
212          updateInterceptorSub2.compareToReceived(sentArr, secretCbSessionId2);
213          updateInterceptorSub2.compareToReceived(sentQos);
214 
215          updateInterceptorSub2.clear();
216       }
217       catch (XmlBlasterException e) {
218          log.severe(e.toString());
219          fail(e.toString());
220       }
221       finally { // clean up
222          log.info("Disconnecting '" + sessionNameSub + "'");
223          conSub.disconnect(null);
224          conSub2.disconnect(null);
225       }
226       log.info("Success in testSessionReconnect()");
227    }
228 
229    /**
230     * Method is used by TestRunner to load these tests
231     */
232    public static Test suite() {
233        TestSuite suite= new TestSuite();
234        suite.addTest(new TestSessionReconnect(Global.instance(), "testSessionReconnect"));
235        return suite;
236    }
237 
238    /**
239     * Invoke: 
240     * <pre>
241     *   java org.xmlBlaster.test.authentication.TestSessionReconnect
242     *   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.authentication.TestSessionReconnect
243     * <pre>
244     */
245    public static void main(String args[]) {
246       TestSessionReconnect testSub = new TestSessionReconnect(new Global(args), "TestSessionReconnect");
247       testSub.setUp();
248       testSub.testSessionReconnect();
249       testSub.tearDown();
250    }
251 }


syntax highlighted by Code2HTML, v. 0.9.1