1 /*------------------------------------------------------------------------------
  2 Name:      TestAdminGet.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.admin;
  7 
  8 import java.util.logging.Logger;
  9 import java.util.logging.Level;
 10 import org.xmlBlaster.util.Global;
 11 import org.xmlBlaster.util.SessionName;
 12 import org.xmlBlaster.util.XmlBlasterException;
 13 import org.xmlBlaster.util.def.Constants;
 14 import org.xmlBlaster.util.property.PropString;
 15 import org.xmlBlaster.util.qos.QuerySpecQos;
 16 import org.xmlBlaster.util.MsgUnit;
 17 import org.xmlBlaster.client.I_Callback;
 18 import org.xmlBlaster.client.I_XmlBlasterAccess;
 19 import org.xmlBlaster.client.key.GetKey;
 20 import org.xmlBlaster.client.key.PublishKey;
 21 import org.xmlBlaster.client.key.SubscribeKey;
 22 import org.xmlBlaster.client.key.UnSubscribeKey;
 23 import org.xmlBlaster.client.key.UpdateKey;
 24 import org.xmlBlaster.client.qos.*;
 25 
 26 import org.xmlBlaster.test.MsgInterceptor;
 27 
 28 import junit.framework.*;
 29 
 30 
 31 /**
 32  * Tests the activation/deactivation of the DispatchManager.
 33  * <br />
 34  * If the DispatchManager is disactivated, asynchronous dispatch should not
 35  * be possible.
 36  * <p>
 37  * Invoke examples:<br />
 38  * <pre>
 39  *   java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestAdminGet
 40  *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestAdminGet
 41  * </pre>
 42  */
 43 public class TestAdminGet extends TestCase implements I_Callback
 44 {
 45    
 46    public class PublisherThread extends Thread {
 47       private Global global;
 48       private long delay;
 49       private MsgUnit[] msgUnits;
 50       private Exception ex;
 51       
 52       public PublisherThread(Global global, long timeToWaitBeforePublishing, MsgUnit[] msgUnits) {
 53          this.global = global;
 54          this.delay = timeToWaitBeforePublishing;
 55          this.msgUnits = msgUnits;
 56          start();
 57       }
 58       
 59       public boolean hasException() {
 60          return (this.ex != null);
 61       }
 62       
 63       public void run() {
 64          try {
 65             if (this.delay > 0L) sleep(this.delay);
 66             this.global.getXmlBlasterAccess().publishArr(this.msgUnits);
 67          }
 68          catch (Exception ex) {
 69             ex.printStackTrace();
 70             this.ex = ex;
 71          }
 72       }
 73    }
 74    
 75    private static String ME = "TestAdminGet";
 76    
 77    private Global glob;
 78    private static Logger log = Logger.getLogger(TestAdminGet.class.getName());
 79 
 80    private MsgInterceptor updateInterceptor;
 81    private String senderName;
 82 
 83    private final String contentMime = "text/plain";
 84    
 85    private String sessionName = "dispatchTester/1";
 86 
 87    public TestAdminGet(String testName) {
 88       this(null, testName);
 89    }
 90 
 91    public TestAdminGet(Global glob, String testName) {
 92       super(testName);
 93       this.senderName = testName;
 94    }
 95 
 96    /**
 97     * Sets up the fixture.
 98     * <p />
 99     * Connect to xmlBlaster and login
100     */
101    protected void setUp() {
102       this.glob = (this.glob == null) ? Global.instance() : this.glob;
103 
104       this.updateInterceptor = new MsgInterceptor(this.glob, log, null);
105       
106       try {
107          I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess(); // Find orb
108 
109          String passwd = "secret";
110          ConnectQos connectQos = new ConnectQos(this.glob, senderName, passwd); // == "<qos>...</qos>";
111          connectQos.setSessionName(new SessionName(this.glob, this.sessionName));
112          con.connect(connectQos, this);  // Login to xmlBlaster, register for updates
113       }
114       catch (XmlBlasterException e) {
115           log.warning("setUp() - login failed: " + e.getMessage());
116           fail("setUp() - login fail: " + e.getMessage());
117       }
118       catch (Exception e) {
119           log.severe("setUp() - login failed: " + e.toString());
120           e.printStackTrace();
121           fail("setUp() - login fail: " + e.toString());
122       }
123    }
124    
125    /**
126     * Tears down the fixture.
127     * <p />
128     * cleaning up .... erase() the previous message OID and logout
129     */
130    protected void tearDown() {
131       log.info("Entering tearDown(), test is finished");
132       String xmlKey = "<key oid='' queryType='XPATH'>\n" +
133                       "   //TestAdminGet-AGENT" +
134                       "</key>";
135 
136       String qos = "<qos><forceDestroy>true</forceDestroy></qos>";
137       I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
138       try {
139          EraseReturnQos[] arr = con.erase(xmlKey, qos);
140 
141          PropString defaultPlugin = new PropString("CACHE,1.0");
142          String propName = defaultPlugin.setFromEnv(this.glob, glob.getStrippedId(), null, "persistence", Constants.RELATING_TOPICSTORE, "defaultPlugin");
143          log.info("Lookup of propName=" + propName + " defaultValue=" + defaultPlugin.getValue());
144       }
145       catch(XmlBlasterException e) {
146          log.severe("XmlBlasterException: " + e.getMessage());
147       }
148       finally {
149          con.disconnect(null);
150          // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
151          this.glob = null;
152          con = null;
153          Global.instance().shutdown();
154       }
155    }
156 
157    /**
158     * TEST: Subscribe to a specific oid
159     */
160    private void doSubscribe(String oid) {
161       try {
162          SubscribeKey key = new SubscribeKey(this.glob, oid);
163 
164          SubscribeQos qos = new SubscribeQos(this.glob); // "<qos><persistent>true</persistent></qos>";
165          qos.setWantNotify(false); // to avoig getting erased messages
166 
167          SubscribeReturnQos subscriptionId = this.glob.getXmlBlasterAccess().subscribe(key, qos, this.updateInterceptor);
168 
169          log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done");
170          assertTrue("returned null subscriptionId", subscriptionId != null);
171       } catch(XmlBlasterException e) {
172          log.warning("XmlBlasterException: " + e.getMessage());
173          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
174       }
175    }
176  
177    private void doUnSubscribe(String oid) {
178       try {
179          UnSubscribeKey key = new UnSubscribeKey(this.glob, oid);
180 
181          UnSubscribeQos qos = new UnSubscribeQos(this.glob);
182          this.glob.getXmlBlasterAccess().unSubscribe(key, qos);
183       } 
184       catch(XmlBlasterException e) {
185          log.warning("XmlBlasterException: " + e.getMessage());
186          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
187       }
188    }
189  
190    /**
191     * TEST: Construct a message and publish it.
192     * If the counter is negative, the content of the message will be an empty string.
193     * <p />
194     */
195    public void doPublish(int counter, String oid) throws XmlBlasterException {
196       log.info("Publishing a message " + oid + " ...");
197       String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'><test></test></key>";
198       String content = "" + counter;
199       PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>"
200       MsgUnit msgUnit = null;
201       if (counter > -1) msgUnit = new MsgUnit(xmlKey, content.getBytes(), qosWrapper.toXml());
202       else  msgUnit = new MsgUnit(xmlKey, "", qosWrapper.toXml());
203 
204       this.glob.getXmlBlasterAccess().publish(msgUnit);
205       log.info("Success: Publishing of " + oid + " done");
206    }
207 
208    /**
209     * Tests the activation flag setting and getting, i.e. disactivating/activating of the
210     * dispatcher.
211     */
212    public void testActivationFlag() {
213       try {
214          String oid = "TestActivationFlag";
215          log.info("Going to publish 3 times on message '" + oid + "' (first time before subscribing)");
216          doPublish(1, oid);
217          doSubscribe(oid);
218          doPublish(2, oid);
219          doPublish(3, oid);
220          assertEquals("wrong number of updates received", 3, this.updateInterceptor.waitOnUpdate(500L));
221          this.updateInterceptor.clear();
222 
223          String getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive=one,two,three";
224          MsgUnit[] msg = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), new GetQos(this.glob));
225          assertEquals("wrong number of messages returned", 1, msg.length);
226          for (int i=0; i < msg.length; i++) {
227             log.info("testActivationFlag: dispatcherActive: (" + i + ") : '" + msg[i].getContentStr() + "'");
228             assertEquals("wrong return value", "true", msg[i].getContentStr());
229          }
230       
231          getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive=false";
232          doPublish(-1, getOid);
233 
234          getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive";
235          msg = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), new GetQos(this.glob));
236          assertEquals("wrong number of messages returned", 1, msg.length);
237          for (int i=0; i < msg.length; i++) {
238             log.info("testActivationFlag: dispatcherActive (result): (" + i + ") : '" + msg[i].getContentStr() + "'");
239             assertEquals("wrong return value", "false", msg[i].getContentStr());
240          }
241 
242          doPublish(4, oid);
243          doPublish(5, oid);
244          int numArrived = this.updateInterceptor.waitOnUpdate(2000L);
245          assertEquals("wrong number of messages arrived", 0, numArrived);
246                   
247          getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive=true";
248          doPublish(-1, getOid);
249 
250          getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive";
251          msg = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), new GetQos(this.glob));
252          assertEquals("wrong number of messages returned", 1, msg.length);
253          for (int i=0; i < msg.length; i++) {
254             log.info("testActivationFlag: dispatcherActive (result): (" + i + ") : '" + msg[i].getContentStr() + "'");
255             assertEquals("wrong return value", "true", msg[i].getContentStr());
256          }
257          
258          numArrived = this.updateInterceptor.waitOnUpdate(2000L);
259          assertEquals("wrong number of messages arrived", 2, numArrived);
260          
261       }
262       catch (XmlBlasterException ex) {
263          ex.printStackTrace();
264          assertTrue("exception should not occur here", false);
265       }
266    }
267 
268    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
269       String contentStr = new String(content);
270       String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;
271       log.info("Receiving update of a message oid=" + updateKey.getOid() +
272                         " priority=" + updateQos.getPriority() +
273                         " state=" + updateQos.getState() +
274                         " content=" + cont);
275       log.info("further log for receiving update of a message cbSessionId=" + cbSessionId +
276                      updateKey.toXml() + "\n" + new String(content) + updateQos.toXml());
277       log.severe("update: should never be invoked (msgInterceptors take care of it since they are passed on subscriptions)");
278       return "OK";
279    }
280 
281 
282    /**
283     * Testing the getting of queue entries without removing them from the queue.
284     * TEST: <br />
285     */
286    public void testGetQueueEntries() {
287       try {
288          String oid = "TestGetQueueEntries";
289          doSubscribe(oid);
290 
291          String getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive=false";
292          doPublish(-1, getOid);
293 
294          doPublish(1, oid);
295          doPublish(2, oid);
296          doPublish(3, oid);
297          log.info("Going to publish 3 times on message '" + oid + "'");
298          // should not receive anything yet since the dispatcher is not active anymore 
299          assertEquals("wrong number of updates received", 0, this.updateInterceptor.waitOnUpdate(500L));
300          this.updateInterceptor.clear();
301 
302          // query with a given GetQos ...         
303          GetQos getQos = new GetQos(this.glob);
304          // HistoryQos historyQos = new HistoryQos(this.glob);
305          // historyQos.setNumEntries(3);
306          // getQos.setHistoryQos(historyQos);
307          QuerySpecQos querySpecQos = new QuerySpecQos(this.glob, "QueueQuery", "1.0", "maxEntries=3&maxSize=-1&consumable=false&waitingDelay=0");
308          getQos.addQuerySpec(querySpecQos);
309 
310          getOid = "__cmd:client/" + this.sessionName + "/?cbQueueEntries";
311          MsgUnit[] mu = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), getQos);
312          assertEquals("wrong number of retreived entries", 3, mu.length);
313          
314       }
315       catch (XmlBlasterException ex) {
316          ex.printStackTrace();
317          assertTrue("exception should not occur here", false);
318       }
319    }
320 
321    private void doActivateDispatch(boolean doDispatch) throws XmlBlasterException {
322       // inhibit delivery of subscribed messages ...
323       String getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive=" + doDispatch;
324       doPublish(-1, getOid);
325       // query with a given GetQos ...         
326       getOid = "__cmd:client/" + this.sessionName + "/?dispatcherActive";
327       MsgUnit[] msg = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), new GetQos(this.glob));
328       assertEquals("wrong number of messages returned", 1, msg.length);
329       assertEquals("wrong return value", "" + doDispatch, msg[0].getContentStr());
330    } 
331 
332 
333    /**
334     * Testing the getting of queue entries. Note that before this method is called, the queue must be empty
335     * TEST: <br />
336     */
337    private void adminGet(String oid, boolean consumable, long waitingDelay, int maxEntries, int initialEntries, int endEntries, int entriesExpected) {
338       try {
339          int sizePerMsg = 0;
340          doActivateDispatch(false);
341          assertEquals("wrong prerequisite: entries have arrived before starting the test: probably coming from an inconsistency in the previous test", 0, this.updateInterceptor.count());
342          this.updateInterceptor.clear();
343          for (int i=0; i < initialEntries; i++) doPublish(i, oid);
344          log.info("In the callback queue there should now be '" + initialEntries + "' entries");
345          int ret = this.updateInterceptor.waitOnUpdate(200L);
346          assertEquals("no update should arrive here ", 0, ret);
347          
348          // prepare the messages to be published
349          // wait a third of the total waiting time before publishing in a separate thread (while we wait for updates)
350          
351          int extraEntries = endEntries - initialEntries;
352          MsgUnit[] msgs = new MsgUnit[extraEntries];
353          for (int i=0; i < extraEntries; i++) {
354             String content = "extraMsg" + i;
355             msgs[i] = new MsgUnit(new PublishKey(this.glob, oid), content, new PublishQos(this.glob));
356          }
357          long delay = waitingDelay / 3 + 10L;
358          PublisherThread pubThread = new PublisherThread(this.glob, delay, msgs);
359 
360          // query with a given GetQos ...         
361          GetQos getQos = new GetQos(this.glob);
362          QuerySpecQos querySpecQos = new QuerySpecQos(this.glob, "QueueQuery", "1.0", "maxEntries=" + maxEntries + "&maxSize=-1&consumable=" + consumable + "&waitingDelay=" + waitingDelay);
363          getQos.addQuerySpec(querySpecQos);
364          String getOid = "__cmd:client/" + this.sessionName + "/?cbQueueEntries";
365          MsgUnit[] mu = this.glob.getXmlBlasterAccess().get(new GetKey(this.glob, getOid), getQos);
366          assertEquals("an exception occured when it should not", false, pubThread.hasException());
367          assertEquals("wrong number of retreived entries", entriesExpected, mu.length);
368 
369          assertEquals("messages should not arrive here", 0, this.updateInterceptor.count());
370          doActivateDispatch(true);
371          if (consumable) {
372             int rest = endEntries-mu.length;
373             int arrived = 0;
374             if (rest < 1) {
375                arrived = this.updateInterceptor.waitOnUpdate(500L, 1);
376             }
377             else arrived = this.updateInterceptor.waitOnUpdate(500L, rest);
378             assertEquals("wrong number of messages arrived (some should have been consumed by the get", rest, arrived);
379          }
380          else {
381             int arrived = this.updateInterceptor.waitOnUpdate(200L, endEntries);
382             assertEquals("all published messages should arrive here", endEntries, arrived);
383          }
384          this.updateInterceptor.clear();
385       }
386       catch (XmlBlasterException ex) {
387          ex.printStackTrace();
388          assertTrue("exception should not occur here", false);
389       }
390    }
391 
392    public void testGetNonConsumableNoWaiting() {
393       String oid = "NonConsumableNoWaiting";
394       doSubscribe(oid);
395       boolean consumable = false;
396       long waiting = 0L; // no waiting
397       int maxEntries = 3;
398       adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
399       adminGet(oid, consumable, waiting, maxEntries, 0, 4, 0);
400       adminGet(oid, consumable, waiting, maxEntries, 2, 2, 2);
401       adminGet(oid, consumable, waiting, maxEntries, 0, 2, 0);
402       doUnSubscribe(oid);
403    }
404 
405    public void testGetConsumableNoWaiting() {
406       String oid = "ConsumableNoWaiting";
407       doSubscribe(oid);
408       boolean consumable = true;
409       long waiting = 0L; // no waiting
410       int maxEntries = 3;
411       adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
412       adminGet(oid, consumable, waiting, maxEntries, 0, 4, 0);
413       adminGet(oid, consumable, waiting, maxEntries, 2, 2, 2);
414       adminGet(oid, consumable, waiting, maxEntries, 0, 2, 0);
415       doUnSubscribe(oid);
416    }
417 
418    public void testGetNonConsumableDoWaiting() {
419       String oid = "NonConsumableDoWaiting";
420       doSubscribe(oid);
421       boolean consumable = false;
422       long waiting = 200L; // no waiting
423       int maxEntries = 3;
424       adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
425       adminGet(oid, consumable, waiting, maxEntries, 0, 4, maxEntries);
426       adminGet(oid, consumable, waiting, maxEntries, 2, 2, 2);
427       adminGet(oid, consumable, waiting, maxEntries, 0, 2, 2);
428       adminGet(oid, consumable, waiting, maxEntries, 1, maxEntries, maxEntries);
429       waiting = -1L;
430       adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
431       adminGet(oid, consumable, waiting, maxEntries, 0, 4, maxEntries);
432       adminGet(oid, consumable, waiting, maxEntries, 1, maxEntries, maxEntries);
433       doUnSubscribe(oid);
434    }
435 
436    public void testGetConsumableDoWaiting() {
437       String oid = "ConsumableDoWaiting";
438       doSubscribe(oid);
439       boolean consumable = false;
440       long waiting = 200L; // no waiting
441       int maxEntries = 3;
442       adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
443       adminGet(oid, consumable, waiting, maxEntries, 0, 4, maxEntries);
444       adminGet(oid, consumable, waiting, maxEntries, 2, 2, 2);
445       adminGet(oid, consumable, waiting, maxEntries, 0, 2, 2);
446       adminGet(oid, consumable, waiting, maxEntries, 1, maxEntries, maxEntries);
447       waiting = -1L;
448       adminGet(oid, consumable, waiting, maxEntries, 4, 4, maxEntries);
449       adminGet(oid, consumable, waiting, maxEntries, 0, 4, maxEntries);
450       adminGet(oid, consumable, waiting, maxEntries, 1, maxEntries, maxEntries);
451       doUnSubscribe(oid);
452    }
453 
454    
455    /**
456     * Invoke: java org.xmlBlaster.test.client.TestAdminGet
457     * <p />
458     * @deprecated Use the TestRunner from the testsuite to run it:<p />
459     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestAdminGet</pre>
460     */
461    public static void main(String args[])
462    {
463       Global glob = new Global();
464       if (glob.init(args) != 0) {
465          System.out.println(ME + ": Init failed");
466          System.exit(1);
467       }
468 
469       TestAdminGet testSub = new TestAdminGet(glob, "TestAdminGet/1");
470 
471       testSub.setUp();
472       testSub.testActivationFlag();
473       testSub.tearDown();
474 
475       testSub.setUp();
476       testSub.testGetQueueEntries();
477       testSub.tearDown();
478 
479       testSub.setUp();
480       testSub.testGetNonConsumableNoWaiting();
481       testSub.tearDown();
482 
483       testSub.setUp();
484       testSub.testGetNonConsumableDoWaiting();
485       testSub.tearDown();
486 
487       testSub.setUp();
488       testSub.testGetConsumableNoWaiting();
489       testSub.tearDown();
490 
491       testSub.setUp();
492       testSub.testGetConsumableDoWaiting();
493       testSub.tearDown();
494 
495    }
496 }


syntax highlighted by Code2HTML, v. 0.9.1