1 package org.xmlBlaster.test.classtest.queue;
   2 
   3 import java.util.logging.Logger;
   4 import java.util.logging.Level;
   5 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
   6 import org.xmlBlaster.util.Global;
   7 import org.xmlBlaster.util.MsgUnit;
   8 import org.xmlBlaster.util.XmlBlasterException;
   9 import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;
  10 import org.xmlBlaster.util.queue.BlockingQueueWrapper;
  11 import org.xmlBlaster.util.queue.I_Entry;
  12 import org.xmlBlaster.util.queue.I_StorageSizeListener;
  13 import org.xmlBlaster.util.queue.I_Storage;
  14 import org.xmlBlaster.util.queue.StorageId;
  15 import org.xmlBlaster.util.queue.I_Queue;
  16 import org.xmlBlaster.util.queue.I_QueueEntry;
  17 import org.xmlBlaster.util.def.PriorityEnum;
  18 import org.xmlBlaster.util.def.Constants;
  19 import org.xmlBlaster.util.def.ErrorCode;
  20 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
  21 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
  22 import org.xmlBlaster.util.plugin.PluginInfo;
  23 
  24 import org.xmlBlaster.util.queuemsg.DummyEntry;
  25 
  26 import java.util.ArrayList;
  27 
  28 import junit.framework.*;
  29 import org.xmlBlaster.util.queue.QueuePluginManager;
  30 
  31 /**
  32  * Test RamQueuePlugin.
  33  * <p>
  34  * The sorting order is priority,timestamp:
  35  * </p>
  36  * <pre>
  37  *   ->    5,100 - 5,98 - 5,50 - 9,3000 - 9,2500   ->
  38  * </pre>
  39  * <p>
  40  * As 9 is highest priority it is the first to be taken out.<br />
  41  * As we need to maintain the timely sequence and
  42  * id is a timestamp in (more or less) nano seconds elapsed since 1970)
  43  * the id 2500 (it is older) has precedence to the id 3000
  44  * </p>
  45  * <p>
  46  * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.I_QueueTest
  47  * </p>
  48  * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry)
  49  * @see org.xmlBlaster.util.queue.I_Queue
  50  * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin
  51  * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
  52  */
  53 public class I_QueueTest extends TestCase {
  54 
  55 
  56    class QueueSizeListener  implements I_StorageSizeListener {
  57 
  58       private long lastNumEntries = 0L, 
  59                    lastNumBytes = 0L,
  60                    lastIncrementEntries = 0L,
  61                    lastIncrementBytes = 0L;
  62       private int count = 0;
  63       
  64       public long getLastIncrementEntries() {
  65          return this.lastIncrementEntries;
  66       }
  67       
  68       public long getLastIncrementBytes() {
  69          return this.lastIncrementBytes;
  70       }
  71       
  72       public int getCount() {
  73          return this.count;
  74       }
  75       
  76       public void clear() {
  77          this.lastNumEntries = 0L; 
  78          this.lastNumBytes = 0L;
  79          this.lastIncrementEntries = 0L;
  80          this.lastIncrementBytes = 0L;
  81          this.count = 0;
  82       }
  83       
  84       public void changed(I_Storage storage, long numEntries, long numBytes, boolean isShutdown) {
  85          this.lastIncrementEntries = numEntries - this.lastNumEntries;
  86          this.lastIncrementBytes = numBytes - this.lastNumBytes;
  87          this.lastNumEntries = numEntries;
  88          this.lastNumBytes = numBytes;
  89          this.count++;
  90       }
  91 
  92    }
  93    
  94 
  95    private String ME = "I_QueueTest";
  96    protected Global glob;
  97    private static Logger log = Logger.getLogger(I_QueueTest.class.getName());
  98 
  99    private I_Queue queue;
 100    private QueueSizeListener queueSizeListener = new QueueSizeListener();
 101    
 102    static String[] PLUGIN_TYPES = {
 103                    new String("RAM"),
 104                    new String("JDBC"),
 105                    new String("CACHE")
 106                  };
 107 
 108 /*
 109    static I_Queue[] IMPL = {
 110                    new org.xmlBlaster.util.queue.ram.RamQueuePlugin(),
 111                    new org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin(),
 112                    new org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin()
 113                  };
 114 */
 115 
 116    public class QueuePutter extends Thread {
 117       
 118       private I_Queue queue;
 119       private long delay;
 120       private int numOfEntries;
 121       private boolean ignoreInterceptor;
 122       
 123       public QueuePutter(I_Queue queue, long delay, int numOfEntries, boolean ignoreInterceptor) {
 124          this.queue = queue;
 125          this.delay = delay;
 126          this.numOfEntries = numOfEntries;
 127          this.ignoreInterceptor = ignoreInterceptor;
 128       }
 129       
 130       public void run() {
 131          try {
 132             for (int i=0; i < this.numOfEntries; i++) {
 133                Thread.sleep(this.delay);
 134                DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
 135                this.queue.put(queueEntry, this.ignoreInterceptor);
 136             }
 137          }
 138          catch (Exception ex) {
 139             ex.printStackTrace();
 140          }
 141       }
 142       
 143    }
 144    
 145    
 146    public I_QueueTest(String name, int currImpl, Global glob) {
 147       super(name);
 148 //      this.queue = IMPL[currImpl];
 149       //this.ME = "I_QueueTest[" + this.queue.getClass().getName() + "]";
 150 
 151       if (glob == null) this.glob = Global.instance();
 152       else this.glob = glob;
 153 
 154 
 155       try {
 156          String type = PLUGIN_TYPES[currImpl];
 157          this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
 158          QueuePluginManager pluginManager = new QueuePluginManager(glob);
 159 
 160          PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
 161          java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
 162          prop.put("tableNamePrefix", "TEST");
 163          prop.put("entriesTableName", "_entries");
 164          this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());
 165 
 166          pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
 167          StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId");
 168          this.queue = pluginManager.getPlugin(pluginInfo, queueId, new CbQueueProperty(this.glob, Constants.RELATING_CALLBACK, this.glob.getStrippedId()));
 169          this.queue.shutdown(); // to allow to initialize again
 170       }
 171       catch (Exception ex) {
 172          log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
 173       }
 174    }
 175 
 176    protected void setUp() {
 177       // cleaning up the database from previous runs ...
 178       QueuePropertyBase prop = null;
 179       try {
 180          prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 181          StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId");
 182          queue.initialize(queueId, prop);
 183          queue.clear();
 184          queue.shutdown();
 185       }
 186       catch (Exception ex) {
 187          log.severe("could not propertly set up the database: " + ex.getMessage());
 188       }
 189    }
 190 
 191    /**
 192     * Tests QueuePropertyBase() and getStorageId()
 193     * @param queueTypeList A space separated list of names for the
 194     *        implementations to be tested. Valid names are:
 195     *        RamQueuePlugin JdbcQueuePlugin
 196     */
 197    public void testConfig() {
 198       config(this.queue);
 199    }
 200 
 201    /**
 202     * Tests initialize(), getProperties(), setProperties() and capacity()
 203     * @param queue !!!Is not initialized in this case!!!!
 204     */
 205    private void config(I_Queue queue) {
 206       ME = "I_QueueTest.config(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
 207       System.out.println("***" + ME);
 208 
 209       QueuePropertyBase prop1 = null;
 210       QueuePropertyBase prop = null;
 211       try {
 212          // test initialize()
 213          prop1 = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 214          int max = 12;
 215          prop1.setMaxEntries(max);
 216          prop1.setMaxEntriesCache(max);
 217          assertEquals(ME+": Wrong capacity", max, prop1.getMaxEntries());
 218          assertEquals(ME+": Wrong cache capacity", max, prop1.getMaxEntriesCache());
 219          //PluginInfo pluginInfo = new PluginInfo(glob, null, "");
 220          //queue.init(glob, pluginInfo);     // Init from pluginloader is first
 221          StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId");
 222          queue.initialize(queueId, prop1);
 223          assertEquals(ME+": Wrong queue ID", queueId, queue.getStorageId());
 224 
 225          try {
 226             prop = new CbQueueProperty(glob, Constants.RELATING_SUBJECT, "/node/test");
 227             prop.setMaxEntries(99);
 228             prop.setMaxEntriesCache(99);
 229             queue.setProperties(prop);
 230          }
 231          catch(XmlBlasterException e) {
 232             fail("Changing properties failed");
 233          }
 234 
 235       }
 236       catch(XmlBlasterException e) {
 237          fail(ME + ": Exception thrown: " + e.getMessage());
 238       }
 239 
 240       long len = prop.getMaxEntries();
 241       assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), queue.getMaxNumOfEntries());
 242       assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), ((QueuePropertyBase)queue.getProperties()).getMaxEntries());
 243       assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries());
 244 
 245       try {
 246          for (int ii=0; ii<len; ii++) {
 247             queue.put(new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true), false);
 248          }
 249          assertEquals(ME+": Wrong total size", len, queue.getNumOfEntries());
 250 
 251          try {
 252             DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
 253             queue.put(queueEntry, false);
 254             queue.put(queueEntry, false);
 255             fail("Did expect an exception on overflow");
 256          }
 257          catch(XmlBlasterException e) {
 258             log.info("SUCCESS the exception is OK: " + e.getMessage());
 259          }
 260 
 261          log.info("toXml() test:" + queue.toXml(""));
 262          log.info("usage() test:" + queue.usage());
 263 
 264          assertEquals(ME+": should not be shutdown", false, queue.isShutdown());
 265          queue.shutdown();
 266          assertEquals(ME+": should be shutdown", true, queue.isShutdown());
 267 
 268          log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
 269          System.out.println("***" + ME + " [SUCCESS]");
 270          queue.shutdown();
 271          queue = null;
 272       }
 273       catch(XmlBlasterException e) {
 274          fail(ME + ": Exception thrown: " + e.getMessage());
 275       }
 276    }
 277 
 278 //------------------------------------
 279    public void testSize1() {
 280       String queueType = "unknown";
 281       try {
 282          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 283          int max = 1;
 284          prop.setMaxEntries(max);
 285          prop.setMaxEntriesCache(max);
 286          queueType = this.queue.toString();
 287          StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/size1");
 288          this.queue.initialize(queueId, prop);
 289          queue.clear();
 290          assertEquals(ME + "wrong size before starting ", 0L, queue.getNumOfEntries());
 291          assertEquals(ME, 1L, queue.getMaxNumOfEntries());
 292          size1(this.queue);
 293       }
 294       catch (XmlBlasterException ex) {
 295          fail("Exception when testing Size1 probably due to failed initialization of the queue of type " + queueType);
 296       }
 297    }
 298 
 299    /**
 300     * Tests put(MsgQueueEntry[]) and put(MsgQueueEntry) and clear()
 301     */
 302    private void size1(I_Queue queue) {
 303       this.queue = queue;
 304       ME = "I_QueueTest.size1(" + queue.getStorageId() + ")[" + this.queue.getClass().getName() + "]";
 305       System.out.println("***" + ME);
 306       int maxEntries = (int)queue.getMaxNumOfEntries();
 307       try {
 308          //========== Test 1: put(I_QueueEntry[])
 309          int numLoop = 10;
 310          ArrayList list = new ArrayList();
 311 
 312          //========== Test 2: put(I_QueueEntry)
 313          this.queue.removeStorageSizeListener(null);
 314          this.queue.addStorageSizeListener(this.queueSizeListener);
 315          this.queueSizeListener.clear();
 316 
 317          for (int ii=0; ii<numLoop; ii++) {
 318             DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
 319             try {
 320                queue.put(queueEntry, false);
 321                assertEquals("number of entries incremented on last invocation", 1, this.queueSizeListener.getLastIncrementEntries());
 322                assertEquals("number of bytes incremented on last invocation", queueEntry.getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
 323                
 324                if (ii > maxEntries) { // queue allows on overload
 325                   fail("Didn't expect more than " + maxEntries + " entries" + queue.toXml(""));
 326                }
 327                else
 328                   list.add(queueEntry);
 329             }
 330             catch (XmlBlasterException e) {
 331                if (ii <= maxEntries) {
 332                   fail("Didn't expect exception" + e.getMessage());
 333                }
 334             }
 335          }
 336          assertEquals("number of invocations for queue size listener is wrong", maxEntries+1, this.queueSizeListener.getCount());
 337 
 338          // The queues allow temporary oversize (one extra put())
 339          assertEquals(ME+": Wrong total size " + queue.toXml(""), maxEntries+1, queue.getNumOfEntries());
 340          this.checkSizeAndEntries(" put(I_QueueEntry) ", list, queue);
 341          log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
 342 
 343          ArrayList entryList = null;
 344          try {
 345             entryList = queue.peekLowest(1, -1L, null, false);
 346             assertEquals("PEEK #1 failed"+queue.toXml(""), 1, entryList.size());
 347             log.info("curr entries="+queue.getNumOfEntries());
 348          }
 349          catch (XmlBlasterException e) {
 350             if (e.getErrorCode()!=ErrorCode.INTERNAL_NOTIMPLEMENTED) throw e;
 351          }
 352 
 353          
 354          //this.queue.removeStorageSizeListener(null);
 355          //this.queue.addStorageSizeListener(this.queueSizeListener);
 356          //this.queueSizeListener.clear();
 357 
 358          entryList = queue.takeLowest(1, -1L, null, false);
 359          long singleSize = ((I_QueueEntry)entryList.get(0)).getSizeInBytes(); 
 360          assertEquals("TAKE #1 failed"+queue.toXml(""), 1, entryList.size());
 361          log.info("curr entries="+queue.getNumOfEntries());
 362          assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries());
 363          assertEquals("number of bytes incremented on last invocation", -singleSize, this.queueSizeListener.getLastIncrementBytes());
 364 
 365          entryList = queue.takeLowest(1, -1L, null, false);
 366          assertEquals("TAKE #2 failed"+queue.toXml(""), 1, entryList.size());
 367          assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries());
 368          assertEquals("number of bytes incremented on last invocation", -singleSize, this.queueSizeListener.getLastIncrementBytes());
 369 
 370          queue.clear();
 371          assertEquals(ME+": Wrong empty size", 0L, queue.getNumOfEntries());
 372 
 373          System.out.println("***" + ME + " [SUCCESS]");
 374          queue.shutdown();
 375          queue = null;
 376 
 377       }
 378       catch(XmlBlasterException e) {
 379          fail(ME + ": Exception thrown: " + e.getMessage());
 380       }
 381       log.info("SUCCESS");
 382    }
 383 
 384 
 385 //------------------------------------
 386    public void testPutMsg() {
 387       String queueType = "unknown";
 388       try {
 389          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 390          queueType = this.queue.toString();
 391          StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/putMsg");
 392          this.queue.initialize(queueId, prop);
 393          queue.clear();
 394          assertEquals(ME + "wrong size before starting ", 0L, queue.getNumOfEntries());
 395          putMsg(this.queue);
 396       }
 397       catch (XmlBlasterException ex) {
 398          fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
 399       }
 400    }
 401 
 402 
 403    /**
 404     * @see checkSizeAndEntries(String, I_QueueEntry[], I_Queue)
 405     */
 406    private void checkSizeAndEntries(String txt, ArrayList queueEntries, I_Queue queue) {
 407       checkSizeAndEntries(txt, (I_QueueEntry[])queueEntries.toArray(new I_QueueEntry[queueEntries.size()]), queue);
 408    }
 409 
 410 
 411    /**
 412     * Helper method to do a generic size check (size and number of entries)
 413     */
 414    private void checkSizeAndEntries(String txt, I_QueueEntry[] queueEntries, I_Queue queue) {
 415       long sizeOfTransients = 0L;
 416       long numOfPersistents = 0;
 417       long numOfTransients = 0;
 418       long sizeOfPersistents = 0L;
 419 
 420       for (int i=0; i < queueEntries.length; i++) {
 421          I_QueueEntry entry = queueEntries[i];
 422          if (entry.isPersistent()) {
 423             sizeOfPersistents += entry.getSizeInBytes();
 424             numOfPersistents++;
 425          }
 426          else {
 427             sizeOfTransients += entry.getSizeInBytes();
 428             numOfTransients++;
 429          }
 430       }
 431 
 432       long queueNumOfPersistents = queue.getNumOfPersistentEntries();
 433       long queueNumOfTransients = queue.getNumOfEntries() - queueNumOfPersistents;
 434       long queueSizeOfPersistents = queue.getNumOfPersistentBytes();
 435       long queueSizeOfTransients = queue.getNumOfBytes() - queueSizeOfPersistents;
 436 
 437       txt += " NumPersistents=" + queueNumOfPersistents + " NumOfTransients=" + queueNumOfTransients; 
 438       txt += " SizeOfPersistents=" + queueSizeOfPersistents + " SizeOfTransients=" + queueSizeOfTransients;
 439 
 440       assertEquals(ME + ": " + txt + " wrong number of persistents   ", numOfPersistents, queueNumOfPersistents);
 441       assertEquals(ME + ": " + txt + " wrong number of transients ", numOfTransients, queueNumOfTransients);
 442       assertEquals(ME + ": " + txt + " wrong size of persistents     ", sizeOfPersistents, queueSizeOfPersistents);
 443       assertEquals(ME + ": " + txt + " wrong size of transients   ", sizeOfTransients, queueSizeOfTransients);
 444    }
 445 
 446 
 447 
 448    /**
 449     * Tests put(MsgQueueEntry[]) and put(MsgQueueEntry) and clear()
 450     */
 451    private void putMsg(I_Queue queue) {
 452       ME = "I_QueueTest.putMsg(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
 453       System.out.println("***" + ME);
 454       try {
 455          //========== Test 1: put(I_QueueEntry[])
 456          int numLoop = 10;
 457          ArrayList list = new ArrayList();
 458 
 459          this.queue.removeStorageSizeListener(null);
 460          this.queue.addStorageSizeListener(this.queueSizeListener);
 461          this.queueSizeListener.clear();
 462 
 463          for (int ii=0; ii<numLoop; ii++) {
 464             DummyEntry[] queueEntries = {
 465                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 466                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
 467                          new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)};
 468 
 469             queue.put(queueEntries, false);
 470 
 471             assertEquals("number of entries incremented on last invocation", 3, this.queueSizeListener.getLastIncrementEntries());
 472             assertEquals("number of bytes incremented on last invocation", 3*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
 473             for (int i=0; i < 3; i++) list.add(queueEntries[i]);
 474 
 475             this.checkSizeAndEntries(" put(I_QueueEntry[]) ", list, queue);
 476             assertEquals(ME+": Wrong size", (ii+1)*queueEntries.length, queue.getNumOfEntries());
 477          }
 478          assertEquals("number of invocations for queue size listener is wrong", numLoop, this.queueSizeListener.getCount());
 479 
 480          int total = numLoop*3;
 481          assertEquals(ME+": Wrong total size", total, queue.getNumOfEntries());
 482          log.info("#1 Success, filled " + queue.getNumOfEntries() + " messages into queue");
 483 
 484 
 485          //========== Test 2: put(I_QueueEntry)
 486          for (int ii=0; ii<numLoop; ii++) {
 487             DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
 488             list.add(queueEntry);
 489             queue.put(queueEntry, false);
 490          }
 491          assertEquals(ME+": Wrong total size", numLoop+total, queue.getNumOfEntries());
 492          this.checkSizeAndEntries(" put(I_QueueEntry) ", list, queue);
 493          log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
 494 
 495          queue.clear();
 496          assertEquals(ME+": Wrong empty size", 0L, queue.getNumOfEntries());
 497 
 498          System.out.println("***" + ME + " [SUCCESS]");
 499          queue.shutdown();
 500          queue = null;
 501 
 502       }
 503       catch(XmlBlasterException e) {
 504          fail(ME + ": Exception thrown: " + e.getMessage());
 505       }
 506    }
 507 
 508 
 509 // ------------------------------------
 510    public void testPeekMsg() {
 511 
 512       String queueType = "unknown";
 513       try {
 514          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 515          queueType = this.queue.toString();
 516          StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/peekMsg");
 517          this.queue.initialize(queueId, prop);
 518          queue.clear();
 519          assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
 520          peekMsg(this.queue);
 521       }
 522       catch (XmlBlasterException ex) {
 523          log.severe("Exception when testing peekMsg probably due to failed initialization of the queue " + queueType);
 524       }
 525 
 526    }
 527 
 528 
 529 // ------------------------------------
 530    public void testPeekMsgBlocking() {
 531 
 532       String queueType = "unknown";
 533       try {
 534          QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"