1 /*------------------------------------------------------------------------------
2 Name: TestPtPDispatch.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.client;
7
8 import java.util.logging.Logger;
9 import java.util.logging.Level;
10 import org.xmlBlaster.util.Global;
11 import org.xmlBlaster.util.SessionName;
12 import org.xmlBlaster.client.key.EraseKey;
13 import org.xmlBlaster.client.key.PublishKey;
14 import org.xmlBlaster.client.qos.ConnectQos;
15 import org.xmlBlaster.client.qos.EraseQos;
16 import org.xmlBlaster.util.XmlBlasterException;
17 import org.xmlBlaster.util.qos.address.Destination;
18 import org.xmlBlaster.client.qos.PublishQos;
19 import org.xmlBlaster.client.I_XmlBlasterAccess;
20 import org.xmlBlaster.util.MsgUnit;
21
22 import org.xmlBlaster.test.util.PtPDestination;
23
24 import junit.framework.TestCase;
25
26
27 /**
28 * <p>
29 * Invoke examples:<br />
30 * <pre>
31 * java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestPtPDispatch
32 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestPtPDispatch
33 * </pre>
34 * @see org.xmlBlaster.client.I_XmlBlasterAccess
35 */
36 public class TestPtPDispatch extends TestCase {
37
38 private static String ME = "TestPtPDispatch";
39 private final static long TIMEOUT = 5000L;
40 private Global glob;
41 private static Logger log = Logger.getLogger(TestPtPDispatch.class.getName());
42 private PtPDestination[] destinations;
43 private int numDestinations = 4;
44 private int counter = 0;
45 private String subjectName;
46 //private boolean persistentMsg = true;
47
48 public TestPtPDispatch(String testName) {
49 this(null, testName);
50 }
51
52 public TestPtPDispatch(Global glob, String testName) {
53 super(testName);
54 this.glob = glob;
55 this.subjectName = testName;
56 }
57
58 /**
59 * Sets up the fixture.
60 * <p />
61 * Connect to xmlBlaster and login
62 */
63 protected void setUp() {
64 this.glob = (this.glob == null) ? Global.instance() : this.glob;
65
66 // this.counter = 0;
67
68 this.destinations = new PtPDestination[this.numDestinations];
69 for (int i=0; i < this.numDestinations; i++)
70 this.destinations[i] = new PtPDestination(this.glob, this.subjectName + "/" + (i+1));
71 log.info("XmlBlaster is ready for testing");
72 try {
73 I_XmlBlasterAccess con = glob.getXmlBlasterAccess(); // Find orb
74 String passwd = "secret";
75 ConnectQos connectQos = new ConnectQos(glob, "src_" + this.subjectName, passwd); // == "<qos>...</qos>";
76 if (log.isLoggable(Level.FINE)) log.fine("setUp: connectQos '" + connectQos.toXml() + "'");
77 con.connect(connectQos, null); // Login to xmlBlaster, register for updates
78
79 }
80 catch (XmlBlasterException e) {
81 log.warning("setUp() - login failed: " + e.getMessage());
82 fail("setUp() - login fail: " + e.getMessage());
83 }
84 catch (Exception e) {
85 log.severe("setUp() - login failed: " + e.toString());
86 e.printStackTrace();
87 fail("setUp() - login fail: " + e.toString());
88 }
89 }
90
91 private void prepare(boolean shutdownCb) {
92 try {
93 // init(boolean wantsPtP, boolean shutdownCb, long cbMaxEntries, long cbMaxEntriesCache, long subjMaxEntries, long subjMaxEntriesCache)
94 this.destinations[0].init(true, shutdownCb, 1, 1, 3, 1);
95 this.destinations[1].init(false, shutdownCb, 1, 1, 3, 1);
96 }
97 catch (XmlBlasterException ex) {
98 ex.printStackTrace();
99 assertTrue(false);
100 }
101 }
102
103 private void cleanup() {
104 for (int i=0; i < this.numDestinations-2; i++) this.destinations[i].shutdown(true);
105 }
106
107 /**
108 * Tears down the fixture.
109 * <p />
110 * cleaning up .... erase() the previous message OID and logout
111 */
112 protected void tearDown() {
113 log.info("Entering tearDown(), test is finished");
114 I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
115 try {
116 EraseKey key = new EraseKey(this.glob, "testPtPDispatch");
117 EraseQos qos = new EraseQos(this.glob);
118 con.erase(key, qos);
119 }
120 catch(XmlBlasterException ex) {
121 ex.printStackTrace();
122 }
123 finally {
124 con.disconnect(null);
125 // Global.instance().shutdown();
126 // the unknown destinations must be handled inside the specific tests
127 this.glob.shutdown();
128 this.glob = null;
129 }
130 }
131
132 /**
133 *
134 * @param destNum the number of the destination int the destinations array
135 * for which the message is intended. If you want to send it to the
136 * subject (i.e. no specific session) you pass a negative value.
137 * @param forceQueuing if true it will force queuing (refering to the destination).
138 * @param expectEx true if you expect an exception here, false otherwise.
139 * @param counts an int[] containing the expected amount of updates for each
140 * destination. NOTE this has to be filled out even if you expect an
141 */
142 private void doPublish(int destNum, boolean forceQueuing, boolean expectEx, int[] counts, long timeout, boolean persistent, String contentPrefix) {
143
144 SessionName toSessionName = null;
145 if (destNum < 0) toSessionName = new SessionName(this.glob, this.subjectName);
146 else toSessionName = this.destinations[destNum].getSessionName();
147
148 // String oid = "Message" + "-" + counter;
149 log.info("Publishing a message " + toSessionName.getRelativeName() + " ...");
150 PublishKey key = new PublishKey(this.glob, "testPtPDispatch");
151
152 Destination destination = new Destination(this.glob, toSessionName);
153 destination.forceQueuing(forceQueuing);
154 PublishQos qos = new PublishQos(this.glob, destination);
155 qos.setPersistent(persistent);
156
157 String content = contentPrefix + "-" + this.counter;
158 this.counter++;
159 MsgUnit msgUnit = new MsgUnit(key, content.getBytes(), qos);
160
161 try {
162 this.glob.getXmlBlasterAccess().publish(msgUnit);
163 assertTrue("did expect an exception after publishing to " + toSessionName.getRelativeName() + " here but got none", !expectEx);
164 }
165 catch (XmlBlasterException ex) {
166 if (!expectEx) ex.printStackTrace();
167 assertTrue("did'nt expect an exception after publishing to " + toSessionName.getRelativeName() + " here but got one: " + ex.getMessage(), expectEx);
168 }
169 log.info("Success: Publishing of message for " + toSessionName.getRelativeName() + " done");
170
171 for (int i=0; i < this.destinations.length; i++)
172 this.destinations[i].check(timeout, counts[i]);
173 }
174
175
176 private void checkWithReconnect(int dest, boolean wantsPtP, int expected, long delay) {
177 try {
178 this.destinations[dest].shutdown(false);
179 String sessionName = this.destinations[dest].getSessionName().getRelativeName();
180 this.destinations[dest] = new PtPDestination(this.glob, sessionName);
181 this.destinations[dest] .init(wantsPtP, false, 1, 1, 3, 1);
182 this.destinations[dest] .check(delay, expected);
183 }
184 catch (XmlBlasterException ex) {
185 ex.printStackTrace();
186 assertTrue(false);
187 }
188 }
189
190 /**
191 * Does a connect, waits for updates, compares the number of updates
192 * with the expected and makes a disconnect.
193 *
194 * @param dest the destination to use (to check)
195 * @param expected the number of updates expected after a connect
196 * @param delay the time in ms to wait between connect and check
197 */
198 private void checkWithoutPublish(PtPDestination dest, boolean wantsPtP, int expected, long delay) {
199 try {
200 dest.init(wantsPtP, false, 1, 1, 3, 1);
201 dest.check(delay, expected);
202 dest.shutdown(true);
203 }
204 catch (XmlBlasterException ex) {
205 ex.printStackTrace();
206 assertTrue(false);
207 }
208 }
209
210 // -----------------------------------------------------------------------
211 /** 5 messages are sent */
212 private void noQueuingNoOverflow(boolean isPersistent, String msgPrefix) {
213 boolean forceQueuing = false;
214 boolean shutdownCb = false;
215 prepare(shutdownCb);
216 doPublish(-1, forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
217 doPublish(0 , forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
218 doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
219 doPublish(2 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
220 doPublish(3 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
221
222 checkWithoutPublish(this.destinations[2], true, 0, TIMEOUT);
223 checkWithoutPublish(this.destinations[3], false,0, TIMEOUT);
224 cleanup();
225 }
226
227 /**
228 * TEST: <br />
229 */
230 public void testNoQueuingNoOverflowTransient() {
231 noQueuingNoOverflow(false, "NoQueuingNoOverflowTransient");
232 }
233
234 /**
235 * TEST: <br />
236 */
237 public void testNoQueuingNoOverflowPersistent() {
238 noQueuingNoOverflow(true, "NoQueuingNoOverflowPersistent");
239 }
240
241 // -----------------------------------------------------------------------
242 /** 12 messages are sent */
243 private void noQueuingOverflow(boolean isPersistent, String msgPrefix) {
244 boolean forceQueuing = false;
245 boolean shutdownCb = true;
246 prepare(shutdownCb);
247
248 doPublish(0 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
249 doPublish(0 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
250 // allow one overflow but now an exception should come ...
251 doPublish(0 , forceQueuing, true, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
252 // ... and again just to make sure ...
253 doPublish(0 , forceQueuing, true, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
254
255 doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
256 doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
257 doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
258 doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
259
260 doPublish(2 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
261 doPublish(2 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
262 doPublish(2 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
263 doPublish(2 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
264
265 // TODO add the tests on subject queue overflow here (configure subject queue first)
266 //doPublish(-1, forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT);
267 //doPublish(-1, forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT);
268 //doPublish(-1, forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT);
269 //doPublish(-1, forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT);
270
271 cleanup();
272 }
273
274 /**
275 * TEST: <br />
276 */
277 public void testNoQueuingOverflowTransient() {
278 noQueuingOverflow(false, "NoQueuingOverflowTransient");
279 }
280
281 /**
282 * TEST: <br />
283 */
284 public void testNoQueuingOverflowPersistent() {
285 noQueuingOverflow(true, "NoQueuingOverflowPersistent");
286 }
287
288 // -----------------------------------------------------------------------
289 /** 5 messages are sent */
290 private void queuingNoOverflow(boolean isPersistent, String msgPrefix) {
291 boolean forceQueuing = true;
292 boolean shutdownCb = false;
293
294 prepare(shutdownCb); // creates session TestPtPDispatch/1 und TestPtPDispatch/2
295
296 // doPublish(int destNum, boolean forceQueuing, boolean expectEx, int[] counts, long timeout, boolean persistent, String contentPrefix)
297 doPublish(-1, forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
298 doPublish(0 , forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
299 doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
300 doPublish(2 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix); // session TestPtPDispatch/3 will be dynamically created
301 doPublish(3 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix); // session TestPtPDispatch/4 will be dynamically created
302
303 // checkWithoutPublish(PtPDestination dest, boolean wantsPtP, int expected, long delay)
304 checkWithoutPublish(this.destinations[2], true, 1, TIMEOUT);
305 checkWithoutPublish(this.destinations[3], false,0, TIMEOUT); // wantsPtP==false is too late as doPublish(3,...) dynamically created one with default settings // TODO check for dead letters. There should be one here
306
307 cleanup();
308 }
309
310 /**
311 * TEST: <br />
312 */
313 public void testQueuingNoOverflowTransient() {
314 queuingNoOverflow(false, "QueuingNoOverflowTransient");
315 }
316
317 /**
318 * TEST: <br />
319 */
320 public void testQueuingNoOverflowPersistent() {
321 queuingNoOverflow(true, "QueuingNoOverflowPersistent");
322 }
323
324 // -----------------------------------------------------------------------
325 /** 12 messages are sent */
326 private void queuingOverflow(boolean isPersistent, String msgPrefix) {
327 boolean forceQueuing = true;
328 boolean shutdownCb = true;
329 prepare(shutdownCb);
330 doPublish(0 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
331 doPublish(0 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
332 doPublish(0 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
333 doPublish(0 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
334
335 doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
336 doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
337 doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
338 doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
339
340 checkWithReconnect(0, true, 2, TIMEOUT);
341
342 // this should not throw an exception since default queue configuration
343 // which allows many entries
344 //doPublish(2 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
345 //doPublish(2 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
346 //doPublish(2 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
347 //doPublish(2 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
348
349 //doPublish(-1, forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT);
350
351 //checkForUnknown(this.destinations[2], true, 1, TIMEOUT);
352 //checkForUnknown(this.destinations[3], false,0, TIMEOUT);
353 // TODO check for dead letters. There should be one here
354
355 cleanup();
356 }
357
358 public void testQueuingOverflowTransient() {
359 queuingOverflow(false, "QueuingOverflowTransient");
360 }
361
362 public void testQueuingOverflowPersistent() {
363 queuingOverflow(true, "QueuingOverflowPersistent");
364 }
365
366 // -----------------------------------------------------------------------
367
368 private void subjectQueueNoOverflow(boolean isPersistent, String msgPrefix) {
369 boolean forceQueuing = false;
370 boolean shutdownCb = false;
371 prepare(shutdownCb);
372
373 doPublish(-1 , forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
374 doPublish(-1 , forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
375 cleanup();
376 }
377
378 public void testSubjectQueueNoOverflowTransient() {
379 subjectQueueNoOverflow(false, "SubjectQueueNoOverflowTransient");
380 }
381
382 public void testSubjectQueueNoOverflowPersistent() {
383 subjectQueueNoOverflow(true, "SubjectQueueNoOverflowPersistent");
384 }
385
386 // ------------------------------------------------------------------------
387 /*
388 private void subjectQueueOverflow(boolean isPersistent, String msgPrefix) {
389 boolean shutdownCb = true;
390 prepare(shutdownCb);
391
392 boolean forceQueuing = false;
393 doPublish(-1 , forceQueuing, true, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
394
395 forceQueuing = true;
396 doPublish(-1 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
397 doPublish(-1 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
398 doPublish(-1 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
399 doPublish(-1 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
400 doPublish(-1 , forceQueuing, true, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
401 doPublish(-1 , forceQueuing, true, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
402 cleanup();
403 }
404 */
405 public void testSubjectQueueOverflowTransient() {
406 subjectQueueNoOverflow(false, "SubjectQueueNoOverflowTransient");
407 }
408
409 public void testSubjectQueueOverflowPersistent() {
410 subjectQueueNoOverflow(true, "SubjectQueueNoOverflowPersistent");
411 }
412
413 // -----------------------------------------------------------------------
414
415 /**
416 * Invoke: java org.xmlBlaster.test.client.TestPtPDispatch
417 * <p />
418 * @deprecated Use the TestRunner from the testsuite to run it:<p />
419 * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestPtPDispatch</pre>
420 */
421 public static void main(String args[]) {
422 Global glob = new Global();
423 if (glob.init(args) != 0) {
424 System.out.println(ME + ": Init failed");
425 System.exit(1);
426 }
427
428 TestPtPDispatch testSub = new TestPtPDispatch(glob, "TestPtPDispatch");
429
430 testSub.setUp();
431 testSub.testNoQueuingNoOverflowPersistent();
432 testSub.tearDown();
433
434 testSub.setUp();
435 testSub.testNoQueuingNoOverflowTransient();
436 testSub.tearDown();
437
438 testSub.setUp();
439 testSub.testQueuingNoOverflowPersistent();
440 testSub.tearDown();
441
442 testSub.setUp();
443 testSub.testQueuingNoOverflowTransient();
444 testSub.tearDown();
445
446 testSub.setUp();
447 testSub.testNoQueuingOverflowPersistent();
448 testSub.tearDown();
449
450 testSub.setUp();
451 testSub.testNoQueuingOverflowTransient();
452 testSub.tearDown();
453
454 testSub.setUp();
455 testSub.testQueuingOverflowPersistent();
456 testSub.tearDown();
457
458 testSub.setUp();
459 testSub.testQueuingOverflowTransient();
460 testSub.tearDown();
461
462 testSub.setUp();
463 testSub.testSubjectQueueNoOverflowPersistent();
464 testSub.tearDown();
465
466 testSub.setUp();
467 testSub.testSubjectQueueNoOverflowTransient();
468 testSub.tearDown();
469
470 testSub.setUp();
471 testSub.testSubjectQueueOverflowPersistent();
472 testSub.tearDown();
473
474 testSub.setUp();
475 testSub.testSubjectQueueOverflowTransient();
476 testSub.tearDown();
477
478 }
479 }
syntax highlighted by Code2HTML, v. 0.9.1