1 package org.xmlBlaster.test.cluster;
2
3 import java.util.logging.Logger;
4
5 import org.xmlBlaster.test.util.Client;
6 import org.xmlBlaster.util.Global;
7
8 // for client connections:
9 import org.xmlBlaster.util.*;
10 import org.xmlBlaster.client.I_Callback;
11 import org.xmlBlaster.client.key.PublishKey;
12 import org.xmlBlaster.client.key.EraseKey;
13 import org.xmlBlaster.client.key.SubscribeKey;
14 import org.xmlBlaster.client.key.UnSubscribeKey;
15 import org.xmlBlaster.client.key.UpdateKey;
16 import org.xmlBlaster.client.qos.PublishQos;
17 import org.xmlBlaster.client.qos.PublishReturnQos;
18 import org.xmlBlaster.client.qos.UpdateQos;
19 import org.xmlBlaster.client.qos.SubscribeQos;
20 import org.xmlBlaster.client.qos.UnSubscribeQos;
21 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
22 import org.xmlBlaster.client.qos.EraseQos;
23 import org.xmlBlaster.client.qos.EraseReturnQos;
24 import org.xmlBlaster.client.I_XmlBlasterAccess;
25 import org.xmlBlaster.util.MsgUnit;
26
27 import junit.framework.*;
28
29 /**
30 * Test publishing a message from bilbo to heron.
31 * <p />
32 * <pre>
33 * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.cluster.SubscribeTest
34 * </pre>
35 * NOTE: asserts() in update() methods are routed back to server and are not handled
36 * by the junit testsuite, so we check double (see code).
37 * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">Cluster requirement</a>
38 */
39 public class SubscribeTest extends TestCase {
40 private String ME = "SubscribeTest";
41 private Global glob;
42 private static Logger log = Logger.getLogger(SubscribeTest.class.getName());
43 private ServerHelper serverHelper;
44
45 private I_XmlBlasterAccess heronCon, avalonCon, golanCon, frodoCon, bilboCon, bilboCon2;
46
47 private int updateCounterBilbo = 0;
48 private int updateCounterBilbo2 = 0;
49 private String oid = "SubscribeToBilbo";
50 private String domain = "RUGBY_NEWS"; // heron is master for RUGBY_NEWS
51 private String contentStr = "We win";
52
53 public SubscribeTest(String name) {
54 super(name);
55 this.glob = new Global(null, true, false);
56 }
57
58 /**
59 * Initialize the test ...
60 */
61 protected void setUp() {
62
63 log.info("Entering setUp(), test starts");
64
65 updateCounterBilbo = 0;
66 updateCounterBilbo2 = 0;
67
68
69 serverHelper = new ServerHelper(glob, log, ME);
70
71 // Starts a cluster node
72 serverHelper.startHeron();
73 serverHelper.startAvalon();
74 //serverHelper.startGolan();
75 serverHelper.startFrodo();
76 serverHelper.startBilbo();
77 }
78
79 /**
80 * cleaning up ...
81 */
82 protected void tearDown() {
83 log.info("Entering tearDown(), test is finished");
84 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
85
86 if (bilboCon != null) { bilboCon.disconnect(null); bilboCon = null; }
87 if (bilboCon2 != null) { bilboCon2.disconnect(null); bilboCon2 = null; }
88 if (frodoCon != null) { frodoCon.disconnect(null); frodoCon = null; }
89 if (golanCon != null) { golanCon.disconnect(null); golanCon = null; }
90 if (avalonCon != null) { avalonCon.disconnect(null); avalonCon = null; }
91 if (heronCon != null) { heronCon.disconnect(null); heronCon = null; }
92
93 serverHelper.tearDown();
94 }
95
96 /**
97 * We start all nodes as described in requirement
98 * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
99 * <p />
100 * - Subscribe to RUGBY messages from bilbo twice<br />
101 * - publish RUGBY messages to avalon (heron is the master)<br />
102 * - Kill bilbo, restart bilbo and check if we still get them
103 */
104 public void testSubscribeTwice() {
105 System.err.println("***SubscribeTest.testSubscribeTwice: Subscribe a message from a cluster slave ...");
106 try {
107 System.err.println("->Connect to avalon ...");
108 avalonCon = serverHelper.connect(serverHelper.getAvalonGlob(), null);
109
110 {
111 System.err.println("->Connect to bilbo ...");
112 bilboCon = serverHelper.connect(serverHelper.getBilboGlob(), new I_Callback() { // Login to xmlBlaster, register for updates
113 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
114 if (updateQos.isErased()) {
115 log.info("Ignoring erase message");
116 return "";
117 }
118 updateCounterBilbo++;
119 log.info(
120 "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo + " ...");
121 assertEquals("Wrong message updated", oid, updateKey.getOid());
122 return "";
123 }
124 });
125
126 System.err.println("->Subscribe from bilbo ...");
127 SubscribeKey sk = new SubscribeKey(glob, oid);
128 sk.setDomain(domain);
129 SubscribeQos sq = new SubscribeQos(glob);
130 bilboCon.subscribe(sk.toXml(), sq.toXml());
131 }
132
133 {
134 System.err.println("->Connect to bilbo 2 ...");
135 final Global bilboGlob2 = serverHelper.getBilboGlob().getClone(null);
136 bilboCon2 = serverHelper.connect(bilboGlob2, new I_Callback() { // Login to xmlBlaster, register for updates
137 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
138 if (updateQos.isErased()) {
139 log.info("Ignoring erase message");
140 return "";
141 }
142 updateCounterBilbo2++;
143 log.info(
144 "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo2 + " ...");
145 assertEquals("#2 Wrong message updated", oid, updateKey.getOid());
146 return "";
147 }
148 });
149
150 System.err.println("->Subscribe from bilbo 2 ...");
151 SubscribeKey sk = new SubscribeKey(glob, oid);
152 sk.setDomain(domain);
153 SubscribeQos sq = new SubscribeQos(glob);
154 bilboCon2.subscribe(sk.toXml(), sq.toXml());
155 }
156
157 // First test subscribe ...
158 {
159 System.err.println("->Publish to avalon ...");
160 PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
161 PublishQos avalon_pq = new PublishQos(glob);
162 MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
163 PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
164 assertEquals("oid changed", oid, avalon_prq.getKeyOid());
165
166
167 try { Thread.sleep(2000); } catch( InterruptedException i) {}
168 if (1 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
169 assertEquals("message from avalon", 1, updateCounterBilbo);
170 if (1 != updateCounterBilbo2) log.severe("Did not expect " + updateCounterBilbo2 + " updates");
171 assertEquals("message from avalon #2", 1, updateCounterBilbo2);
172 updateCounterBilbo = 0;
173 updateCounterBilbo2 = 0;
174 }
175
176 System.err.println("->testSubscribeTwice done, SUCCESS.");
177
178 // ... and now test unSubscribe
179 {
180 System.err.println("->UnSubscribe from bilbo ...");
181 UnSubscribeKey usk = new UnSubscribeKey(glob, oid);
182 usk.setDomain(domain);
183 UnSubscribeQos usq = new UnSubscribeQos(glob);
184 UnSubscribeReturnQos[] usrq = bilboCon.unSubscribe(usk, usq);
185 assertEquals("", 1, usrq.length);
186
187 System.err.println("->Publish to avalon ...");
188 PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
189 PublishQos avalon_pq = new PublishQos(glob);
190 MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
191 PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
192 assertEquals("oid changed", oid, avalon_prq.getKeyOid());
193
194
195 try { Thread.sleep(2000); } catch( InterruptedException i) {}
196 if (0 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
197 assertEquals("message from avalon", 0, updateCounterBilbo);
198 if (1 != updateCounterBilbo2) log.severe("Did not expect " + updateCounterBilbo2 + " updates");
199 assertEquals("message from avalon #2", 1, updateCounterBilbo2);
200 updateCounterBilbo = 0;
201 updateCounterBilbo2 = 0;
202 }
203
204 System.err.println("->Trying to erase the message at the slave node ...");
205 EraseKey ek = new EraseKey(glob, oid);
206 ek.setDomain(domain);
207 EraseQos eq = new EraseQos(glob);
208 EraseReturnQos[] arr = avalonCon.erase(ek.toXml(), eq.toXml());
209 assertEquals("Erase", 1, arr.length);
210 }
211 catch (XmlBlasterException e) {
212 e.printStackTrace();
213 fail("SubscribeToBilbo-Exception: " + e.toString());
214 }
215 finally {
216 if (bilboCon != null) {
217 bilboCon.disconnect(null);
218 bilboCon = null;
219 }
220 if (bilboCon2 != null) {
221 bilboCon2.disconnect(null);
222 bilboCon2 = null;
223 }
224 if (avalonCon != null) {
225 avalonCon.disconnect(null);
226 avalonCon = null;
227 }
228 }
229
230 System.err.println("***SubscribeTest.testSubscribeTwice: testSubscribeTwice [SUCCESS]");
231 }
232
233 /**
234 * We start all nodes as described in requirement
235 * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
236 * <p />
237 * 1. publish RUGBY messages to avalon (heron is the master)<br />
238 * 2. Subscribe those messages from bilbo<br />
239 * 3. Kill bilbo, restart bilbo and check if we still get them
240 */
241 public void testSubscribe() {
242 System.err.println("***SubscribeTest: Subscribe a message from a cluster slave ...");
243
244 int num = 2;
245 I_XmlBlasterAccess[] bilboCons = new I_XmlBlasterAccess[num];
246
247 try {
248 System.err.println("->Connect to avalon ...");
249 avalonCon = serverHelper.connect(serverHelper.getAvalonGlob(), null);
250 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
251
252 for (int ii=0; ii<num; ii++) {
253 System.err.println("->Connect to bilbo #" + ii + " ...");
254 final Global bilboGlobii = serverHelper.getBilboGlob().getClone(null);
255 bilboCons[ii] = serverHelper.connect(bilboGlobii, new I_Callback() { // Login to xmlBlaster, register for updates
256 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
257 log.info(
258 "Receiving update '" + updateKey.getOid() + "' state=" + updateQos.getState() + ", " + updateCounterBilbo + " ...");
259 if (updateQos.isErased()) {
260 log.info("Ignoring erase message");
261 return "";
262 }
263 updateCounterBilbo++;
264 log.info(
265 "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo + " ...");
266 assertEquals("Wrong message updated", oid, updateKey.getOid());
267 return "";
268 }
269 });
270
271 System.err.println("->Publish to avalon #" + ii + " ...");
272 PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
273 PublishQos avalon_pq = new PublishQos(glob);
274 MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
275 PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
276 assertEquals("oid changed", oid, avalon_prq.getKeyOid());
277
278 try { Thread.sleep(1000L); } catch( InterruptedException i) {}
279
280 System.err.println("->Subscribe from bilbo #" + ii + ", the message from avalon should arrive ...");
281 SubscribeKey sk = new SubscribeKey(glob, oid);
282 sk.setDomain(domain);
283 SubscribeQos sq = new SubscribeQos(glob);
284 bilboCons[ii].subscribe(sk.toXml(), sq.toXml());
285
286 waitOnUpdate(2000L, 1);
287 try { Thread.sleep(1000); } catch( InterruptedException i) {} // wait longer to check if too many arrive
288 if (1 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
289 assertEquals("message from avalon", 1, updateCounterBilbo);
290 updateCounterBilbo = 0;
291
292 System.err.println("->Trying to erase the message at the slave node ...");
293 EraseKey ek = new EraseKey(glob, oid);
294 ek.setDomain(domain);
295 EraseQos eq = new EraseQos(glob);
296 EraseReturnQos[] arr = avalonCon.erase(ek.toXml(), eq.toXml());
297 assertEquals("Erase", 1, arr.length);
298
299 // Wait on erase events
300 try { Thread.sleep(1000); } catch( InterruptedException i) {}
301 updateCounterBilbo = 0;
302 updateCounterBilbo2 = 0;
303
304 // We stay logged in but kill over callback server ...
305 Client.shutdownCb(bilboCons[ii], Client.Shutdown.KEEP_LOGGED_IN);
306 }
307
308 System.err.println("->testSubscribe done, SUCCESS.");
309 }
310 catch (XmlBlasterException e) {
311 e.printStackTrace();
312 fail("SubscribeToBilbo-Exception: " + e.toString());
313 }
314 finally {
315 for (int jj=0; jj<bilboCons.length; jj++) {
316 if (bilboCons[jj] != null) {
317 bilboCons[jj].disconnect(null);
318 bilboCons[jj] = null;
319 }
320 }
321 if (avalonCon != null) {
322 avalonCon.disconnect(null);
323 avalonCon = null;
324 }
325 }
326
327 System.err.println("***SubscribeTest: testSubscribe [SUCCESS]");
328
329 }
330
331 private void waitOnUpdate(final long timeout, final int numWait) {
332 long pollingInterval = 50L; // check every 0.05 seconds
333 if (timeout < 50) pollingInterval = timeout / 10L;
334 long sum = 0L;
335 while (updateCounterBilbo < numWait) {
336 try {
337 Thread.sleep(pollingInterval);
338 }
339 catch( InterruptedException i)
340 {}
341 sum += pollingInterval;
342 if (sum > timeout) {
343 log.warning("Timeout of " + timeout + " occurred");
344 break;
345 }
346 }
347 }
348 }
syntax highlighted by Code2HTML, v. 0.9.1