1 package org.xmlBlaster.test.cluster;
2
3 import java.util.logging.Logger;
4 import org.xmlBlaster.util.Global;
5
6 // for client connections:
7 import org.xmlBlaster.util.*;
8 import org.xmlBlaster.client.I_Callback;
9 import org.xmlBlaster.client.key.PublishKey;
10 import org.xmlBlaster.client.key.EraseKey;
11 import org.xmlBlaster.client.key.SubscribeKey;
12 import org.xmlBlaster.client.key.UnSubscribeKey;
13 import org.xmlBlaster.client.key.UpdateKey;
14 import org.xmlBlaster.client.qos.PublishQos;
15 import org.xmlBlaster.client.qos.PublishReturnQos;
16 import org.xmlBlaster.client.qos.UpdateQos;
17 import org.xmlBlaster.client.qos.SubscribeQos;
18 import org.xmlBlaster.client.qos.UnSubscribeQos;
19 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
20 import org.xmlBlaster.client.qos.EraseQos;
21 import org.xmlBlaster.client.qos.EraseReturnQos;
22 import org.xmlBlaster.client.I_XmlBlasterAccess;
23 import org.xmlBlaster.util.MsgUnit;
24
25 import junit.framework.*;
26
27 /**
28 * Test publishing a message from bilbo to heron.
29 * <p />
30 * <pre>
31 * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.cluster.SubscribeTest
32 * </pre>
33 * NOTE: asserts() in update() methods are routed back to server and are not handled
34 * by the junit testsuite, so we check double (see code).
35 * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">Cluster requirement</a>
36 */
37 public class SubscribeTest extends TestCase {
38 private String ME = "SubscribeTest";
39 private Global glob;
40 private static Logger log = Logger.getLogger(SubscribeTest.class.getName());
41 private ServerHelper serverHelper;
42
43 private I_XmlBlasterAccess heronCon, avalonCon, golanCon, frodoCon, bilboCon, bilboCon2;
44
45 private int updateCounterBilbo = 0;
46 private int updateCounterBilbo2 = 0;
47 private String oid = "SubscribeToBilbo";
48 private String domain = "RUGBY_NEWS"; // heron is master for RUGBY_NEWS
49 private String contentStr = "We win";
50
51 public SubscribeTest(String name) {
52 super(name);
53 this.glob = new Global(null, true, false);
54 }
55
56 /**
57 * Initialize the test ...
58 */
59 protected void setUp() {
60
61 log.info("Entering setUp(), test starts");
62
63 updateCounterBilbo = 0;
64 updateCounterBilbo2 = 0;
65
66
67 serverHelper = new ServerHelper(glob, log, ME);
68
69 // Starts a cluster node
70 serverHelper.startHeron();
71 serverHelper.startAvalon();
72 //serverHelper.startGolan();
73 serverHelper.startFrodo();
74 serverHelper.startBilbo();
75 }
76
77 /**
78 * cleaning up ...
79 */
80 protected void tearDown() {
81 log.info("Entering tearDown(), test is finished");
82 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
83
84 if (bilboCon != null) { bilboCon.disconnect(null); bilboCon = null; }
85 if (bilboCon2 != null) { bilboCon2.disconnect(null); bilboCon2 = null; }
86 if (frodoCon != null) { frodoCon.disconnect(null); frodoCon = null; }
87 if (golanCon != null) { golanCon.disconnect(null); golanCon = null; }
88 if (avalonCon != null) { avalonCon.disconnect(null); avalonCon = null; }
89 if (heronCon != null) { heronCon.disconnect(null); heronCon = null; }
90
91 serverHelper.tearDown();
92 }
93
94 /**
95 * We start all nodes as described in requirement
96 * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
97 * <p />
98 * - Subscribe to RUGBY messages from bilbo twice<br />
99 * - publish RUGBY messages to avalon (heron is the master)<br />
100 * - Kill bilbo, restart bilbo and check if we still get them
101 */
102 public void testSubscribeTwice() {
103 System.err.println("***SubscribeTest.testSubscribeTwice: Subscribe a message from a cluster slave ...");
104 try {
105 System.err.println("->Connect to avalon ...");
106 avalonCon = serverHelper.connect(serverHelper.getAvalonGlob(), null);
107
108 {
109 System.err.println("->Connect to bilbo ...");
110 bilboCon = serverHelper.connect(serverHelper.getBilboGlob(), new I_Callback() { // Login to xmlBlaster, register for updates
111 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
112 if (updateQos.isErased()) {
113 log.info("Ignoring erase message");
114 return "";
115 }
116 updateCounterBilbo++;
117 log.info(
118 "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo + " ...");
119 assertEquals("Wrong message updated", oid, updateKey.getOid());
120 return "";
121 }
122 });
123
124 System.err.println("->Subscribe from bilbo ...");
125 SubscribeKey sk = new SubscribeKey(glob, oid);
126 sk.setDomain(domain);
127 SubscribeQos sq = new SubscribeQos(glob);
128 bilboCon.subscribe(sk.toXml(), sq.toXml());
129 }
130
131 {
132 System.err.println("->Connect to bilbo 2 ...");
133 final Global bilboGlob2 = serverHelper.getBilboGlob().getClone(null);
134 bilboCon2 = serverHelper.connect(bilboGlob2, new I_Callback() { // Login to xmlBlaster, register for updates
135 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
136 if (updateQos.isErased()) {
137 log.info("Ignoring erase message");
138 return "";
139 }
140 updateCounterBilbo2++;
141 log.info(
142 "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo2 + " ...");
143 assertEquals("#2 Wrong message updated", oid, updateKey.getOid());
144 return "";
145 }
146 });
147
148 System.err.println("->Subscribe from bilbo 2 ...");
149 SubscribeKey sk = new SubscribeKey(glob, oid);
150 sk.setDomain(domain);
151 SubscribeQos sq = new SubscribeQos(glob);
152 bilboCon2.subscribe(sk.toXml(), sq.toXml());
153 }
154
155 // First test subscribe ...
156 {
157 System.err.println("->Publish to avalon ...");
158 PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
159 PublishQos avalon_pq = new PublishQos(glob);
160 MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
161 PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
162 assertEquals("oid changed", oid, avalon_prq.getKeyOid());
163
164
165 try { Thread.sleep(2000); } catch( InterruptedException i) {}
166 if (1 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
167 assertEquals("message from avalon", 1, updateCounterBilbo);
168 if (1 != updateCounterBilbo2) log.severe("Did not expect " + updateCounterBilbo2 + " updates");
169 assertEquals("message from avalon #2", 1, updateCounterBilbo2);
170 updateCounterBilbo = 0;
171 updateCounterBilbo2 = 0;
172 }
173
174 System.err.println("->testSubscribeTwice done, SUCCESS.");
175
176 // ... and now test unSubscribe
177 {
178 System.err.println("->UnSubscribe from bilbo ...");
179 UnSubscribeKey usk = new UnSubscribeKey(glob, oid);
180 usk.setDomain(domain);
181 UnSubscribeQos usq = new UnSubscribeQos(glob);
182 UnSubscribeReturnQos[] usrq = bilboCon.unSubscribe(usk, usq);
183 assertEquals("", 1, usrq.length);
184
185 System.err.println("->Publish to avalon ...");
186 PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
187 PublishQos avalon_pq = new PublishQos(glob);
188 MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
189 PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
190 assertEquals("oid changed", oid, avalon_prq.getKeyOid());
191
192
193 try { Thread.sleep(2000); } catch( InterruptedException i) {}
194 if (0 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
195 assertEquals("message from avalon", 0, updateCounterBilbo);
196 if (1 != updateCounterBilbo2) log.severe("Did not expect " + updateCounterBilbo2 + " updates");
197 assertEquals("message from avalon #2", 1, updateCounterBilbo2);
198 updateCounterBilbo = 0;
199 updateCounterBilbo2 = 0;
200 }
201
202 System.err.println("->Trying to erase the message at the slave node ...");
203 EraseKey ek = new EraseKey(glob, oid);
204 ek.setDomain(domain);
205 EraseQos eq = new EraseQos(glob);
206 EraseReturnQos[] arr = avalonCon.erase(ek.toXml(), eq.toXml());
207 assertEquals("Erase", 1, arr.length);
208 }
209 catch (XmlBlasterException e) {
210 e.printStackTrace();
211 fail("SubscribeToBilbo-Exception: " + e.toString());
212 }
213 finally {
214 if (bilboCon != null) {
215 bilboCon.disconnect(null);
216 bilboCon = null;
217 }
218 if (bilboCon2 != null) {
219 bilboCon2.disconnect(null);
220 bilboCon2 = null;
221 }
222 if (avalonCon != null) {
223 avalonCon.disconnect(null);
224 avalonCon = null;
225 }
226 }
227
228 System.err.println("***SubscribeTest.testSubscribeTwice: testSubscribeTwice [SUCCESS]");
229 }
230
231 /**
232 * We start all nodes as described in requirement
233 * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
234 * <p />
235 * 1. publish RUGBY messages to avalon (heron is the master)<br />
236 * 2. Subscribe those messages from bilbo<br />
237 * 3. Kill bilbo, restart bilbo and check if we still get them
238 */
239 public void testSubscribe() {
240 System.err.println("***SubscribeTest: Subscribe a message from a cluster slave ...");
241
242 int num = 2;
243 I_XmlBlasterAccess[] bilboCons = new I_XmlBlasterAccess[num];
244
245 try {
246 System.err.println("->Connect to avalon ...");
247 avalonCon = serverHelper.connect(serverHelper.getAvalonGlob(), null);
248 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
249
250 for (int ii=0; ii<num; ii++) {
251 System.err.println("->Connect to bilbo #" + ii + " ...");
252 final Global bilboGlobii = serverHelper.getBilboGlob().getClone(null);
253 bilboCons[ii] = serverHelper.connect(bilboGlobii, new I_Callback() { // Login to xmlBlaster, register for updates
254 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
255 log.info(
256 "Receiving update '" + updateKey.getOid() + "' state=" + updateQos.getState() + ", " + updateCounterBilbo + " ...");
257 if (updateQos.isErased()) {
258 log.info("Ignoring erase message");
259 return "";
260 }
261 updateCounterBilbo++;
262 log.info(
263 "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo + " ...");
264 assertEquals("Wrong message updated", oid, updateKey.getOid());
265 return "";
266 }
267 });
268
269 System.err.println("->Publish to avalon #" + ii + " ...");
270 PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
271 PublishQos avalon_pq = new PublishQos(glob);
272 MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
273 PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
274 assertEquals("oid changed", oid, avalon_prq.getKeyOid());
275
276 try { Thread.sleep(1000L); } catch( InterruptedException i) {}
277
278 System.err.println("->Subscribe from bilbo #" + ii + ", the message from avalon should arrive ...");
279 SubscribeKey sk = new SubscribeKey(glob, oid);
280 sk.setDomain(domain);
281 SubscribeQos sq = new SubscribeQos(glob);
282 bilboCons[ii].subscribe(sk.toXml(), sq.toXml());
283
284 waitOnUpdate(2000L, 1);
285 try { Thread.sleep(1000); } catch( InterruptedException i) {} // wait longer to check if too many arrive
286 if (1 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
287 assertEquals("message from avalon", 1, updateCounterBilbo);
288 updateCounterBilbo = 0;
289
290 System.err.println("->Trying to erase the message at the slave node ...");
291 EraseKey ek = new EraseKey(glob, oid);
292 ek.setDomain(domain);
293 EraseQos eq = new EraseQos(glob);
294 EraseReturnQos[] arr = avalonCon.erase(ek.toXml(), eq.toXml());
295 assertEquals("Erase", 1, arr.length);
296
297 // Wait on erase events
298 try { Thread.sleep(1000); } catch( InterruptedException i) {}
299 updateCounterBilbo = 0;
300 updateCounterBilbo2 = 0;
301
302 // We stay logged in but kill over callback server ...
303 bilboCons[ii].getCbServer().shutdown();
304 }
305
306 System.err.println("->testSubscribe done, SUCCESS.");
307 }
308 catch (XmlBlasterException e) {
309 e.printStackTrace();
310 fail("SubscribeToBilbo-Exception: " + e.toString());
311 }
312 finally {
313 for (int jj=0; jj<bilboCons.length; jj++) {
314 if (bilboCons[jj] != null) {
315 bilboCons[jj].disconnect(null);
316 bilboCons[jj] = null;
317 }
318 }
319 if (avalonCon != null) {
320 avalonCon.disconnect(null);
321 avalonCon = null;
322 }
323 }
324
325 System.err.println("***SubscribeTest: testSubscribe [SUCCESS]");
326
327 }
328
329 private void waitOnUpdate(final long timeout, final int numWait) {
330 long pollingInterval = 50L; // check every 0.05 seconds
331 if (timeout < 50) pollingInterval = timeout / 10L;
332 long sum = 0L;
333 while (updateCounterBilbo < numWait) {
334 try {
335 Thread.sleep(pollingInterval);
336 }
337 catch( InterruptedException i)
338 {}
339 sum += pollingInterval;
340 if (sum > timeout) {
341 log.warning("Timeout of " + timeout + " occurred");
342 break;
343 }
344 }
345 }
346 }
syntax highlighted by Code2HTML, v. 0.9.1