1 /*------------------------------------------------------------------------------
2 Name: TestSessionReconnect.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.authentication;
7
8 import java.util.logging.Logger;
9 import org.xmlBlaster.util.Global;
10 import org.xmlBlaster.util.qos.HistoryQos;
11 import org.xmlBlaster.util.def.PriorityEnum;
12 import org.xmlBlaster.util.def.Constants;
13 import org.xmlBlaster.util.qos.TopicProperty;
14 import org.xmlBlaster.util.qos.address.CallbackAddress;
15 import org.xmlBlaster.client.qos.ConnectQos;
16 import org.xmlBlaster.client.qos.ConnectReturnQos;
17 import org.xmlBlaster.util.XmlBlasterException;
18 import org.xmlBlaster.util.EmbeddedXmlBlaster;
19 import org.xmlBlaster.client.key.PublishKey;
20 import org.xmlBlaster.client.key.SubscribeKey;
21 import org.xmlBlaster.client.qos.PublishQos;
22 import org.xmlBlaster.client.qos.PublishReturnQos;
23 import org.xmlBlaster.client.qos.SubscribeQos;
24 import org.xmlBlaster.client.I_XmlBlasterAccess;
25 import org.xmlBlaster.util.MsgUnit;
26
27 import org.xmlBlaster.test.Util;
28 import org.xmlBlaster.test.MsgInterceptor;
29
30 import junit.framework.*;
31
32
33 /**
34 * This client does test if a subscriber can reconnect to its session and
35 * its callback queue holded the messages during downtime.
36 * <p>
37 * This client may be invoked multiple time on the same xmlBlaster server,
38 * as it cleans up everything after his tests are done.
39 * </p>
40 * <p>
41 * Invoke examples:
42 * </p>
43 * <pre>
44 * java junit.textui.TestRunner org.xmlBlaster.test.authentication.TestSessionReconnect
45 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.authentication.TestSessionReconnect
46 * </pre>
47 */
48 public class TestSessionReconnect extends TestCase
49 {
50 private final Global glob;
51 private static Logger log = Logger.getLogger(TestSessionReconnect.class.getName());
52 private String passwd = "secret";
53 private int serverPort = 7615;
54 private String oid = "TestSessionReconnect.Msg";
55 private EmbeddedXmlBlaster serverThread = null;
56 private String sessionNameSub = "TestSessionReconnectSubscriber";
57 private I_XmlBlasterAccess conSub;
58 private I_XmlBlasterAccess conSub2;
59 private MsgInterceptor updateInterceptorSub;
60
61 private String sessionNamePub = "TestSessionReconnectPublisher";
62 private I_XmlBlasterAccess conPub;
63
64 /** For Junit */
65 public TestSessionReconnect() {
66 this(new Global(), "TestSessionReconnect");
67 }
68
69 /**
70 * Constructs the TestSessionReconnect object.
71 * <p />
72 * @param testName The name used in the test suite and to login to xmlBlaster
73 */
74 public TestSessionReconnect(Global glob, String testName) {
75 super(testName);
76 this.glob = glob;
77
78 }
79
80 /**
81 * Sets up the fixture.
82 * <p />
83 * Connect to xmlBlaster and login
84 */
85 protected void setUp() {
86 glob.init(Util.getOtherServerPorts(serverPort));
87 serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
88 log.info("XmlBlaster is ready for testing");
89 }
90
91 /**
92 * Cleaning up.
93 */
94 protected void tearDown() {
95 try { Thread.sleep(1000);} catch(Exception ex) {}
96 if (serverThread != null)
97 serverThread.stopServer(true);
98 // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
99 Util.resetPorts();
100 }
101
102 /**
103 */
104 public void testSessionReconnect() {
105 log.info("testSessionReconnect("+sessionNameSub+") ...");
106
107 try {
108 log.info("============ STEP 1: Start subscriber");
109
110 Global globSub = glob.getClone(null);
111 // A testsuite helper to collect update messages
112 this.updateInterceptorSub = new MsgInterceptor(globSub, log, null);
113
114 conSub = globSub.getXmlBlasterAccess();
115
116 ConnectReturnQos crqSub = null;
117 {
118 ConnectQos qosSub = new ConnectQos(globSub, sessionNameSub, passwd);
119
120 CallbackAddress addr = new CallbackAddress(globSub);
121 addr.setRetries(-1);
122 String secretCbSessionId = "TrustMeSub";
123 addr.setSecretCbSessionId(secretCbSessionId);
124 qosSub.getSessionCbQueueProperty().setCallbackAddress(addr);
125
126 log.info("First subscribe connect QoS = " + qosSub.toXml());
127 crqSub = conSub.connect(qosSub, this.updateInterceptorSub); // Login to xmlBlaster
128 log.info("Connect as subscriber '" + crqSub.getSessionName() + "' success");
129 }
130
131 SubscribeKey sk = new SubscribeKey(globSub, oid);
132 SubscribeQos sq = new SubscribeQos(globSub);
133 sq.setWantInitialUpdate(false);
134 sq.setWantLocal(true);
135 sq.setWantContent(true);
136
137 HistoryQos historyQos = new HistoryQos(globSub);
138 historyQos.setNumEntries(1);
139 sq.setHistoryQos(historyQos);
140
141 /*SubscribeReturnQos srq = */conSub.subscribe(sk.toXml(), sq.toXml());
142 log.info("Subscription to '" + oid + "' done");
143
144 log.info("============ STEP 2: Start publisher");
145 Global globPub = glob.getClone(null);
146 conPub = globPub.getXmlBlasterAccess();
147 ConnectQos qosPub = new ConnectQos(globPub, sessionNamePub, passwd);
148 ConnectReturnQos crqPub = conPub.connect(qosPub, null); // Login to xmlBlaster, no updates
149 log.info("Connect success as " + crqPub.getSessionName());
150
151 log.info("============ STEP 3: Stop subscriber callback");
152 try {
153 conSub.getCbServer().shutdown();
154 }
155 catch (XmlBlasterException e) {
156 fail("ShutdownCB: " + e.getMessage());
157 }
158
159 log.info("============ STEP 4: Publish messages");
160 int numPub = 8;
161 MsgUnit[] sentArr = new MsgUnit[numPub];
162 PublishReturnQos[] sentQos = new PublishReturnQos[numPub];
163 for(int i=0; i<numPub; i++) {
164 PublishKey pk = new PublishKey(globPub, oid, "text/xml", "1.0");
165 pk.setClientTags("<org.xmlBlaster><demo/></org.xmlBlaster>");
166 PublishQos pq = new PublishQos(globPub);
167 pq.setPriority(PriorityEnum.NORM_PRIORITY);
168 pq.setPersistent(false);
169 pq.setLifeTime(60000L);
170 if (i == 0) {
171 TopicProperty topicProperty = new TopicProperty(globPub);
172 topicProperty.setDestroyDelay(60000L);
173 topicProperty.setCreateDomEntry(true);
174 topicProperty.setReadonly(false);
175 topicProperty.getHistoryQueueProperty().setMaxEntries(numPub+5);
176 pq.setTopicProperty(topicProperty);
177 log.info("Added TopicProperty on first publish: " + topicProperty.toXml());
178 }
179
180 byte[] content = "Hello".getBytes();
181 MsgUnit msgUnit = new MsgUnit(pk, content, pq);
182 sentArr[i] = msgUnit;
183 PublishReturnQos prq = conPub.publish(msgUnit);
184 sentQos[i] = prq;
185 log.info("Got status='" + prq.getState() + "' rcvTimestamp=" + prq.getRcvTimestamp().toString() +
186 " for published message '" + prq.getKeyOid() + "'");
187 }
188
189 log.info("============ STEP 5: Start subscriber callback with same public sessionId");
190 Global globSub2 = glob.getClone(null);
191 MsgInterceptor updateInterceptorSub2 = new MsgInterceptor(globSub2, log, null);
192 updateInterceptorSub2.setLogPrefix("TrustMeSub2");
193
194 conSub2 = globSub2.getXmlBlasterAccess(); // Create a new client
195 String secretCbSessionId2 = "TrustMeSub2";
196 {
197 ConnectQos qosSub = new ConnectQos(globSub, sessionNameSub, passwd);
198 CallbackAddress addr = new CallbackAddress(globSub);
199 addr.setRetries(-1);
200 addr.setSecretCbSessionId(secretCbSessionId2);
201 qosSub.getSessionCbQueueProperty().setCallbackAddress(addr);
202 qosSub.getSessionQos().setSessionName(crqSub.getSessionQos().getSessionName());
203
204 log.info("Second subscribe connect QoS = " + qosSub.toXml());
205 ConnectReturnQos crqSub2 = conSub2.connect(qosSub, updateInterceptorSub2); // Login to xmlBlaster
206 log.info("Connect as subscriber '" + crqSub2.getSessionName() + "' success");
207 }
208
209 assertEquals("", 0, updateInterceptorSub.count()); // The first login session should not receive anything
210
211 assertEquals("", numPub, updateInterceptorSub2.waitOnUpdate(4000L, oid, Constants.STATE_OK));
212 updateInterceptorSub2.compareToReceived(sentArr, secretCbSessionId2);
213 updateInterceptorSub2.compareToReceived(sentQos);
214
215 updateInterceptorSub2.clear();
216 }
217 catch (XmlBlasterException e) {
218 log.severe(e.toString());
219 fail(e.toString());
220 }
221 finally { // clean up
222 log.info("Disconnecting '" + sessionNameSub + "'");
223 conSub.disconnect(null);
224 conSub2.disconnect(null);
225 }
226 log.info("Success in testSessionReconnect()");
227 }
228
229 /**
230 * Method is used by TestRunner to load these tests
231 */
232 public static Test suite() {
233 TestSuite suite= new TestSuite();
234 suite.addTest(new TestSessionReconnect(Global.instance(), "testSessionReconnect"));
235 return suite;
236 }
237
238 /**
239 * Invoke:
240 * <pre>
241 * java org.xmlBlaster.test.authentication.TestSessionReconnect
242 * java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.authentication.TestSessionReconnect
243 * <pre>
244 */
245 public static void main(String args[]) {
246 TestSessionReconnect testSub = new TestSessionReconnect(new Global(args), "TestSessionReconnect");
247 testSub.setUp();
248 testSub.testSessionReconnect();
249 testSub.tearDown();
250 }
251 }
syntax highlighted by Code2HTML, v. 0.9.1