1 /*------------------------------------------------------------------------------
  2 Name:      TestPtPDispatch.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.client;
  7 
  8 import java.util.logging.Logger;
  9 import java.util.logging.Level;
 10 import org.xmlBlaster.util.Global;
 11 import org.xmlBlaster.util.SessionName;
 12 import org.xmlBlaster.client.key.EraseKey;
 13 import org.xmlBlaster.client.key.PublishKey;
 14 import org.xmlBlaster.client.qos.ConnectQos;
 15 import org.xmlBlaster.client.qos.EraseQos;
 16 import org.xmlBlaster.util.XmlBlasterException;
 17 import org.xmlBlaster.util.qos.address.Destination;
 18 import org.xmlBlaster.client.qos.PublishQos;
 19 import org.xmlBlaster.client.I_XmlBlasterAccess;
 20 import org.xmlBlaster.util.MsgUnit;
 21 
 22 import org.xmlBlaster.test.util.PtPDestination;
 23 
 24 import junit.framework.TestCase;
 25 
 26 
 27 /**
 28  * <p>
 29  * Invoke examples:<br />
 30  * <pre>
 31  *   java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestPtPDispatch
 32  *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestPtPDispatch
 33  * </pre>
 34  * @see org.xmlBlaster.client.I_XmlBlasterAccess
 35  */
 36 public class TestPtPDispatch extends TestCase {
 37 
 38    private static String ME = "TestPtPDispatch";
 39    private final static long TIMEOUT = 5000L;
 40    private Global glob;
 41    private static Logger log = Logger.getLogger(TestPtPDispatch.class.getName());
 42    private PtPDestination[] destinations;
 43    private int numDestinations = 4;
 44    private int counter = 0;
 45    private String subjectName;
 46    //private boolean persistentMsg = true; 
 47 
 48    public TestPtPDispatch(String testName) {
 49       this(null, testName);
 50    }
 51 
 52    public TestPtPDispatch(Global glob, String testName) {
 53       super(testName);
 54       this.glob = glob;
 55       this.subjectName = testName;
 56    }
 57 
 58    /**
 59     * Sets up the fixture.
 60     * <p />
 61     * Connect to xmlBlaster and login
 62     */
 63    protected void setUp() {
 64       this.glob = (this.glob == null) ? Global.instance() : this.glob;
 65 
 66       // this.counter = 0;
 67 
 68       this.destinations = new PtPDestination[this.numDestinations];
 69       for (int i=0; i < this.numDestinations; i++) 
 70          this.destinations[i] = new PtPDestination(this.glob, this.subjectName + "/" + (i+1));
 71       log.info("XmlBlaster is ready for testing");
 72       try {
 73          I_XmlBlasterAccess con = glob.getXmlBlasterAccess(); // Find orb
 74          String passwd = "secret";
 75          ConnectQos connectQos = new ConnectQos(glob, "src_" + this.subjectName, passwd); // == "<qos>...</qos>";
 76          if (log.isLoggable(Level.FINE)) log.fine("setUp: connectQos '" + connectQos.toXml() + "'");
 77          con.connect(connectQos, null);  // Login to xmlBlaster, register for updates
 78 
 79       }
 80       catch (XmlBlasterException e) {
 81           log.warning("setUp() - login failed: " + e.getMessage());
 82           fail("setUp() - login fail: " + e.getMessage());
 83       }
 84       catch (Exception e) {
 85           log.severe("setUp() - login failed: " + e.toString());
 86           e.printStackTrace();
 87           fail("setUp() - login fail: " + e.toString());
 88       }
 89    }
 90 
 91    private void prepare(boolean shutdownCb) {
 92       try {
 93          // init(boolean wantsPtP, boolean shutdownCb, long cbMaxEntries, long cbMaxEntriesCache, long subjMaxEntries, long subjMaxEntriesCache)
 94          this.destinations[0].init(true, shutdownCb, 1, 1, 3, 1);
 95          this.destinations[1].init(false, shutdownCb, 1, 1, 3, 1);
 96       }
 97       catch (XmlBlasterException ex) {
 98          ex.printStackTrace();
 99          assertTrue(false);
100       }
101    }
102    
103    private void cleanup() {
104       for (int i=0; i < this.numDestinations-2; i++) this.destinations[i].shutdown(true);         
105    }
106 
107    /**
108     * Tears down the fixture.
109     * <p />
110     * cleaning up .... erase() the previous message OID and logout
111     */
112    protected void tearDown() {
113       log.info("Entering tearDown(), test is finished");
114       I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
115       try {
116          EraseKey key = new EraseKey(this.glob, "testPtPDispatch"); 
117          EraseQos qos = new EraseQos(this.glob);
118          con.erase(key, qos);
119       }
120       catch(XmlBlasterException ex) {
121          ex.printStackTrace();
122       }
123       finally {
124          con.disconnect(null);
125          // Global.instance().shutdown();
126          // the unknown destinations must be handled inside the specific tests
127          this.glob.shutdown();
128          this.glob = null;
129       }
130    }
131 
132    /**
133     * 
134     * @param destNum the number of the destination int the destinations array
135     *        for which the message is intended. If you want to send it to the
136     *        subject (i.e. no specific session) you pass a negative value.
137     * @param forceQueuing if true it will force queuing (refering to the destination).
138     * @param expectEx true if you expect an exception here, false otherwise.
139     * @param counts an int[] containing the expected amount of updates for each
140     *        destination. NOTE this has to be filled out even if you expect an
141     */
142    private void doPublish(int destNum, boolean forceQueuing, boolean expectEx, int[] counts, long timeout, boolean persistent, String contentPrefix) {
143 
144       SessionName toSessionName = null;
145       if (destNum < 0) toSessionName = new SessionName(this.glob, this.subjectName);
146       else toSessionName = this.destinations[destNum].getSessionName();
147       
148       // String oid = "Message" + "-" + counter;
149       log.info("Publishing a message " + toSessionName.getRelativeName() + " ...");
150       PublishKey key = new PublishKey(this.glob, "testPtPDispatch");
151       
152       Destination destination = new Destination(this.glob, toSessionName);
153       destination.forceQueuing(forceQueuing);
154       PublishQos qos = new PublishQos(this.glob, destination);
155       qos.setPersistent(persistent);
156    
157       String content = contentPrefix + "-" + this.counter;
158       this.counter++;
159       MsgUnit msgUnit = new MsgUnit(key, content.getBytes(), qos);
160 
161       try {
162          this.glob.getXmlBlasterAccess().publish(msgUnit);
163          assertTrue("did expect an exception after publishing to " + toSessionName.getRelativeName() + " here but got none", !expectEx);
164       }
165       catch (XmlBlasterException ex) {
166          if (!expectEx) ex.printStackTrace();
167          assertTrue("did'nt expect an exception after publishing to " + toSessionName.getRelativeName() + " here but got one: " + ex.getMessage(), expectEx);
168       }
169       log.info("Success: Publishing of message for " + toSessionName.getRelativeName() + " done");
170 
171       for (int i=0; i < this.destinations.length; i++) 
172          this.destinations[i].check(timeout, counts[i]);
173    }
174 
175 
176    private void checkWithReconnect(int dest, boolean wantsPtP, int expected, long delay) {
177       try {
178          this.destinations[dest].shutdown(false);
179          String sessionName = this.destinations[dest].getSessionName().getRelativeName();
180          this.destinations[dest] = new PtPDestination(this.glob, sessionName);
181          this.destinations[dest] .init(wantsPtP, false, 1, 1, 3, 1);
182          this.destinations[dest] .check(delay, expected);
183       }
184       catch (XmlBlasterException ex) {
185          ex.printStackTrace();
186          assertTrue(false);
187       }
188    }
189 
190    /**
191     * Does a connect, waits for updates, compares the number of updates
192     * with the expected and makes a disconnect.
193     * 
194     * @param dest the destination to use (to check)
195     * @param expected the number of updates expected after a connect
196     * @param delay the time in ms to wait between connect and check
197     */
198    private void checkWithoutPublish(PtPDestination dest, boolean wantsPtP, int expected, long delay) {
199       try {
200          dest.init(wantsPtP, false, 1, 1, 3, 1);
201          dest.check(delay, expected);
202          dest.shutdown(true);
203       }
204       catch (XmlBlasterException ex) {
205          ex.printStackTrace();
206          assertTrue(false);
207       }
208    }
209 
210 // -----------------------------------------------------------------------
211    /** 5 messages are sent */
212    private void noQueuingNoOverflow(boolean isPersistent, String msgPrefix) {
213       boolean forceQueuing = false;
214       boolean shutdownCb = false;
215       prepare(shutdownCb);
216       doPublish(-1, forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
217       doPublish(0 , forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
218       doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
219       doPublish(2 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
220       doPublish(3 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
221       
222       checkWithoutPublish(this.destinations[2], true, 0, TIMEOUT);
223       checkWithoutPublish(this.destinations[3], false,0, TIMEOUT);
224       cleanup();
225    }
226 
227    /**
228     * TEST: <br />
229     */
230    public void testNoQueuingNoOverflowTransient() {
231       noQueuingNoOverflow(false, "NoQueuingNoOverflowTransient");
232    }
233 
234    /**
235     * TEST: <br />
236     */
237    public void testNoQueuingNoOverflowPersistent() {
238       noQueuingNoOverflow(true, "NoQueuingNoOverflowPersistent");
239    }
240 
241 // -----------------------------------------------------------------------
242    /** 12 messages are sent */
243    private void noQueuingOverflow(boolean isPersistent, String msgPrefix) {
244       boolean forceQueuing = false;
245       boolean shutdownCb = true;
246       prepare(shutdownCb);
247       
248       doPublish(0 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
249       doPublish(0 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
250       // allow one overflow but now an exception should come ...
251       doPublish(0 , forceQueuing, true, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
252       // ... and again just to make sure ...
253       doPublish(0 , forceQueuing, true, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
254 
255       doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
256       doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
257       doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
258       doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
259       
260       doPublish(2 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
261       doPublish(2 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
262       doPublish(2 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
263       doPublish(2 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
264 
265       // TODO add the tests on subject queue overflow here (configure subject queue first)
266       //doPublish(-1, forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT);
267       //doPublish(-1, forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT);
268       //doPublish(-1, forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT);
269       //doPublish(-1, forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT);
270 
271       cleanup();
272    }
273 
274    /**
275     * TEST: <br />
276     */
277    public void testNoQueuingOverflowTransient() {
278       noQueuingOverflow(false, "NoQueuingOverflowTransient");
279    }
280 
281    /**
282     * TEST: <br />
283     */
284    public void testNoQueuingOverflowPersistent() {
285       noQueuingOverflow(true, "NoQueuingOverflowPersistent");
286    }
287 
288 // -----------------------------------------------------------------------
289    /** 5 messages are sent */
290    private void queuingNoOverflow(boolean isPersistent, String msgPrefix) {
291       boolean forceQueuing = true;
292       boolean shutdownCb = false;
293       
294       prepare(shutdownCb); // creates session TestPtPDispatch/1 und TestPtPDispatch/2
295       
296       // doPublish(int destNum, boolean forceQueuing, boolean expectEx, int[] counts, long timeout, boolean persistent, String contentPrefix)
297       doPublish(-1, forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
298       doPublish(0 , forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
299       doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
300       doPublish(2 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix); // session TestPtPDispatch/3 will be dynamically created
301       doPublish(3 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix); // session TestPtPDispatch/4 will be dynamically created
302 
303       // checkWithoutPublish(PtPDestination dest, boolean wantsPtP, int expected, long delay)
304       checkWithoutPublish(this.destinations[2], true, 1, TIMEOUT);
305       checkWithoutPublish(this.destinations[3], false,0, TIMEOUT); // wantsPtP==false is too late as doPublish(3,...) dynamically created one with default settings      // TODO check for dead letters. There should be one here  
306 
307       cleanup();
308    }
309 
310    /**
311     * TEST: <br />
312     */
313    public void testQueuingNoOverflowTransient() {
314       queuingNoOverflow(false, "QueuingNoOverflowTransient");
315    }
316    
317    /**
318     * TEST: <br />
319     */
320    public void testQueuingNoOverflowPersistent() {
321       queuingNoOverflow(true, "QueuingNoOverflowPersistent");
322    }
323    
324 // -----------------------------------------------------------------------
325    /** 12 messages are sent */
326    private void queuingOverflow(boolean isPersistent, String msgPrefix) {
327       boolean forceQueuing = true;
328       boolean shutdownCb = true;
329       prepare(shutdownCb);
330       doPublish(0 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
331       doPublish(0 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
332       doPublish(0 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
333       doPublish(0 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
334 
335       doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
336       doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
337       doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
338       doPublish(1 , forceQueuing, true , new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
339    
340       checkWithReconnect(0, true, 2, TIMEOUT);
341    
342       // this should not throw an exception since default queue configuration 
343       // which allows many entries
344       //doPublish(2 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
345       //doPublish(2 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
346       //doPublish(2 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
347       //doPublish(2 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
348 
349       //doPublish(-1, forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT);
350 
351       //checkForUnknown(this.destinations[2], true, 1, TIMEOUT);
352       //checkForUnknown(this.destinations[3], false,0, TIMEOUT);
353       // TODO check for dead letters. There should be one here  
354 
355       cleanup();
356    }
357 
358    public void testQueuingOverflowTransient() {
359       queuingOverflow(false, "QueuingOverflowTransient");
360    }
361    
362    public void testQueuingOverflowPersistent() {
363       queuingOverflow(true, "QueuingOverflowPersistent");
364    }
365    
366 // -----------------------------------------------------------------------
367 
368    private void subjectQueueNoOverflow(boolean isPersistent, String msgPrefix) {
369       boolean forceQueuing = false;
370       boolean shutdownCb = false;
371       prepare(shutdownCb);
372       
373       doPublish(-1 , forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
374       doPublish(-1 , forceQueuing, false, new int[] {1,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
375       cleanup();
376    }
377 
378    public void testSubjectQueueNoOverflowTransient() {
379       subjectQueueNoOverflow(false, "SubjectQueueNoOverflowTransient");
380    }
381    
382    public void testSubjectQueueNoOverflowPersistent() {
383       subjectQueueNoOverflow(true, "SubjectQueueNoOverflowPersistent");
384    }
385    
386 // ------------------------------------------------------------------------
387 /*
388    private void subjectQueueOverflow(boolean isPersistent, String msgPrefix) {
389       boolean shutdownCb = true;
390       prepare(shutdownCb);
391       
392       boolean forceQueuing = false;
393       doPublish(-1 , forceQueuing, true, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
394 
395       forceQueuing = true;
396       doPublish(-1 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
397       doPublish(-1 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
398       doPublish(-1 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
399       doPublish(-1 , forceQueuing, false, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
400       doPublish(-1 , forceQueuing, true, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
401       doPublish(-1 , forceQueuing, true, new int[] {0,0,0,0}, TIMEOUT, isPersistent, msgPrefix);
402       cleanup();
403    }
404 */
405    public void testSubjectQueueOverflowTransient() {
406       subjectQueueNoOverflow(false, "SubjectQueueNoOverflowTransient");
407    }
408    
409    public void testSubjectQueueOverflowPersistent() {
410       subjectQueueNoOverflow(true, "SubjectQueueNoOverflowPersistent");
411    }
412    
413 // -----------------------------------------------------------------------
414 
415    /**
416     * Invoke: java org.xmlBlaster.test.client.TestPtPDispatch
417     * <p />
418     * @deprecated Use the TestRunner from the testsuite to run it:<p />
419     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestPtPDispatch</pre>
420     */
421    public static void main(String args[]) {
422       Global glob = new Global();
423       if (glob.init(args) != 0) {
424          System.out.println(ME + ": Init failed");
425          System.exit(1);
426       }
427 
428       TestPtPDispatch testSub = new TestPtPDispatch(glob, "TestPtPDispatch");
429 
430       testSub.setUp();
431       testSub.testNoQueuingNoOverflowPersistent();
432       testSub.tearDown();
433 
434       testSub.setUp();
435       testSub.testNoQueuingNoOverflowTransient();
436       testSub.tearDown();
437 
438       testSub.setUp();
439       testSub.testQueuingNoOverflowPersistent();
440       testSub.tearDown();
441       
442       testSub.setUp();
443       testSub.testQueuingNoOverflowTransient();
444       testSub.tearDown();
445 
446       testSub.setUp();
447       testSub.testNoQueuingOverflowPersistent();
448       testSub.tearDown();
449       
450       testSub.setUp();
451       testSub.testNoQueuingOverflowTransient();
452       testSub.tearDown();
453 
454       testSub.setUp();
455       testSub.testQueuingOverflowPersistent();
456       testSub.tearDown();
457       
458       testSub.setUp();
459       testSub.testQueuingOverflowTransient();
460       testSub.tearDown();
461 
462       testSub.setUp();
463       testSub.testSubjectQueueNoOverflowPersistent();
464       testSub.tearDown();
465 
466       testSub.setUp();
467       testSub.testSubjectQueueNoOverflowTransient();
468       testSub.tearDown();
469       
470       testSub.setUp();
471       testSub.testSubjectQueueOverflowPersistent();
472       testSub.tearDown();
473 
474       testSub.setUp();
475       testSub.testSubjectQueueOverflowTransient();
476       testSub.tearDown();
477       
478    }
479 }


syntax highlighted by Code2HTML, v. 0.9.1