1 package org.xmlBlaster.test.classtest.queue;
  2 
  3 import java.util.ArrayList;
  4 import java.util.Enumeration;
  5 import java.util.Hashtable;
  6 import java.util.List;
  7 import java.util.logging.Logger;
  8 
  9 import junit.framework.TestCase;
 10 
 11 import org.xmlBlaster.util.Global;
 12 import org.xmlBlaster.util.XmlBlasterException;
 13 import org.xmlBlaster.util.def.Constants;
 14 import org.xmlBlaster.util.def.PriorityEnum;
 15 import org.xmlBlaster.util.plugin.PluginInfo;
 16 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
 17 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
 18 import org.xmlBlaster.util.queue.I_Entry;
 19 import org.xmlBlaster.util.queue.I_Queue;
 20 import org.xmlBlaster.util.queue.I_QueueEntry;
 21 import org.xmlBlaster.util.queue.I_StorageProblemListener;
 22 import org.xmlBlaster.util.queue.QueuePluginManager;
 23 import org.xmlBlaster.util.queue.StorageId;
 24 import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;
 25 import org.xmlBlaster.util.queuemsg.DummyEntry;
 26 
 27 /**
 28  * Test CacheQueueInterceptorPlugin.
 29  * <p>
 30  * The sorting order is priority,timestamp:
 31  * </p>
 32  * <pre>
 33  *   ->    5,100 - 5,98 - 5,50 - 9,3000 - 9,2500   ->
 34  * </pre>
 35  * <p>
 36  * As 9 is highest priority it is the first to be taken out.<br />
 37  * As we need to maintain the timely sequence and
 38  * id is a timestamp in (more or less) nano seconds elapsed since 1970)
 39  * the id 2500 (it is older) has precedence to the id 3000
 40  * </p>
 41  * <p>
 42  * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.CacheQueueTest
 43  * </p>
 44  * <p>
 45  * Configuration example:
 46  * </p>
 47  * <pre>
 48  * JdbcDriver.drivers=org.postgresql.Driver
 49  * JdbcDriver.postgresql.mapping=string=text,longint=bigint,int=integer,boolean=boolean
 50  * queue.callback.url=jdbc:postgresql://localhost/test
 51  * queue.callback.user=postgres
 52  * queue.callback.password=
 53  * </pre>
 54  * <p>
 55  * Test database with PostgreSQL:
 56  * </p>
 57  * <pre>
 58  * initdb /tmp/postgres
 59  * cp /var/lib/pgsql/data/pg_hba.conf /tmp/postgres    (edit host access)
 60  * createdb test
 61  * postmaster -i -D /tmp/postgres
 62  * </pre>
 63  * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry)
 64  * @see org.xmlBlaster.util.queue.I_Queue
 65  * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin
 66  * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
 67  */
 68 public class CacheQueueTest extends TestCase {
 69    private String ME = "CacheQueueTest";
 70    protected Global glob;
 71    private static Logger log = Logger.getLogger(CacheQueueTest.class.getName());
 72    private CacheQueueInterceptorPlugin queue = null;
 73    private I_Queue[] queues;
 74 
 75    public CacheQueueTest(String name) {
 76       this(Global.instance(), name);
 77    }
 78 
 79    public CacheQueueTest(Global glob, String name) {
 80       super(name);
 81       this.glob = glob;
 82    }
 83 
 84    protected void setUp() {
 85       glob = Global.instance();
 86 
 87       QueuePropertyBase cbProp = null;
 88 
 89       try {
 90          glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
 91 
 92          cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 93          StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "SetupQueue");
 94 
 95          this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
 96          QueuePluginManager pluginManager = new QueuePluginManager(glob);
 97          PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
 98          java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
 99          prop.put("tableNamePrefix", "TEST");
100          prop.put("entriesTableName", "_entries");
101          this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());
102 
103          pluginInfo = new PluginInfo(glob, pluginManager, "CACHE", "1.0");
104          this.queue = (CacheQueueInterceptorPlugin)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
105          this.queues = new I_Queue[3];
106 
107          pluginInfo = new PluginInfo(glob, pluginManager, "RAM", "1.0");
108          this.queues[0] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
109          pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
110          this.queues[1] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
111          this.queues[2] = queue;
112 
113          for (int i=0; i < 3; i++) {
114             this.queues[i].clear();
115             this.queues[i].shutdown(); // to allow to initialize again
116          }
117       }
118       catch (Exception ex) {
119          log.severe("could not propertly set up the database: " + ex.getMessage());
120       }
121 
122    }
123 
124 
125    public void tearDown() {
126       try {
127          for (int i=0; i < 3; i++) {
128             this.queues[i].clear();
129             this.queues[i].shutdown();
130          }
131       }
132       catch (Exception ex) {
133          log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp");
134       }
135    }
136 
137 
138    public void testConfig() {
139       String queueType = "CACHE";
140       try {
141          config(20L, 10L, 500L, 200L);
142       }
143       catch (XmlBlasterException ex) {
144          ex.printStackTrace();
145          fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
146       }
147    }
148 
149 
150    public StorageId config(long maxEntries, long maxEntriesCache, long maxBytes, long maxBytesCache)
151       throws XmlBlasterException {
152 
153       // set up the queues ....
154       QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
155       prop.setMaxEntries(maxEntries);
156       prop.setMaxEntriesCache(maxEntriesCache);
157       prop.setMaxBytes(maxBytes);
158       prop.setMaxBytesCache(maxBytesCache);
159 
160       StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "CacheQueueTest/config");
161 
162       // this.queue = new CacheQueueInterceptorPlugin();
163       this.queue.initialize(queueId, prop);
164       this.queue.clear();
165       long persistentSize = this.queue.getPersistentQueue().getMaxNumOfBytes();
166       long persistentMsg  = this.queue.getPersistentQueue().getMaxNumOfEntries();
167       long transientSize  = this.queue.getTransientQueue().getMaxNumOfBytes();
168       long transientMsg   = this.queue.getTransientQueue().getMaxNumOfEntries();
169 
170       assertEquals("Wrong persistent size", maxBytes, persistentSize);
171       assertEquals("Wrong persistent num of msg", maxEntries, persistentMsg);
172       if (maxBytesCache != transientSize)
173          log.severe("ERROR: Wrong transient size" + this.queue.getTransientQueue().toXml(""));
174       assertEquals("Wrong transient size" + this.queue.getTransientQueue().toXml(""), maxBytesCache, transientSize);
175       assertEquals("Wrong num of transient msg", maxEntriesCache, transientMsg);
176       return queueId;
177    }
178 
179    public void testClearWithSwappedEntries() {
180       String queueType = "CACHE";
181       try {
182          StorageId id = config(20L, 3L, 500L, 100L);
183          PriorityEnum prio = PriorityEnum.toPriorityEnum(5);
184          for (int i=0; i < 15; i++) {
185             boolean persistent =  (i | 1) == 0; // some persistent and some transient
186             long entrySize = 10L;
187             DummyEntry entry = new DummyEntry(glob, prio, id, entrySize, persistent);
188             this.queue.put(entry, true);
189          }
190          
191          long ret = this.queue.clear();
192          assertEquals("wrong number of entries returned by clear", 15L, ret);
193          
194          long numOfEntries = this.queue.getNumOfEntries();
195          long numOfBytes = this.queue.getNumOfBytes();
196          assertEquals("the queue should be empty", 0L, numOfEntries);
197          assertEquals("the size of the queue should be 0", 0L, numOfBytes);
198       }
199       catch (XmlBlasterException ex) {
200          ex.printStackTrace();
201          fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
202       }
203    }
204 
205 
206 
207    public void testPutPeekRemove() {
208       String queueType = this.glob.getProperty().get("queueType", "CACHE");
209       log.info("testPutPeekRemove will be done with a queue of type '" + queueType + "'");
210       log.info("if you want to test with another queue type invoke '-queueType $TYPE' on the cmd line where $TYPE is either RAM JDBC or CACHE");
211       int index = 2;
212       if ("RAM".equalsIgnoreCase(queueType)) index = 0;
213       else if ("JDBC".equalsIgnoreCase(queueType)) index = 1;
214 
215       try {
216          putPeekRemove(this.queues[index]);
217       }
218       catch (XmlBlasterException ex) {
219          ex.printStackTrace();
220          fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
221       }
222    }
223 
224 
225    public void putPeekRemove(I_Queue refQueue) throws XmlBlasterException {
226 
227       // set up the queues ....
228 
229       // every content is 80 bytes which gives an entry size of 100 bytes (80+20)
230       long entrySize = 100;
231 
232       String lastSuccessfulLocation = "";
233       long maxNumOfBytesCache[] = {700L, 10000L};
234       long maxNumOfBytes[] = {700L, 50000L};
235       int numOfTransientEntries[] = { 2, 50, 200};
236       int numOfPersistentEntries[] =  { 0, 2, 50, 200};
237 //      int numPrio[] = { 1, 5, 9};
238 
239 //      int it=0, id=0, ic=0, is=0;
240 //      try {
241          for (int ic=0; ic < maxNumOfBytesCache.length; ic++) {
242             for (int is=0; is < maxNumOfBytes.length; is++) {
243                log.info("**** TEST maxNumOfBytesCache["+ic+"]=" + maxNumOfBytesCache[ic] + " maxNumOfBytes["+is+"]=" + maxNumOfBytes[is]);
244                // a new queue each time here ...
245                QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
246                prop.setMaxEntries(2000L);
247                prop.setMaxEntriesCache(1000L);
248                prop.setMaxBytes(maxNumOfBytes[is]);
249                prop.setMaxBytesCache(maxNumOfBytesCache[ic]);
250                StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "CacheQueueTest/jdbc" + maxNumOfBytes[is] + "/ram" + maxNumOfBytesCache[ic]);
251 
252 //               this.queue = new CacheQueueInterceptorPlugin();
253                refQueue.clear();
254                refQueue.shutdown();
255 
256                refQueue.initialize(queueId, prop);
257 
258                for (int it=0; it < numOfTransientEntries.length; it++) {
259                   // entry.setPrio(4+(it%3));
260                   for (int id=0; id < numOfPersistentEntries.length; id++) {
261 
262                      log.info("**** SUB-TEST maxNumOfBytesCache["+ic+"]=" + maxNumOfBytesCache[ic] + " maxNumOfBytes["+is+"]=" + maxNumOfBytes[is] +
263                                    " -> numOfTransientEntries["+it+"]=" + numOfTransientEntries[it] + " numOfPersistentEntries["+id+"]=" + numOfPersistentEntries[id]);
264                      if (!refQueue.isShutdown()) refQueue.shutdown();
265                      refQueue.initialize(queueId, prop);
266                      refQueue.clear();
267 
268                      assertEquals(ME + " the number of bytes of the queue should be zero ", 0L, refQueue.getNumOfBytes());
269                      assertEquals(ME + " the number of entries in the queue should be zero ", 0L, refQueue.getNumOfEntries());
270                      assertEquals(ME + " the number of bytes of the persistent entries in the queue should be zero ", 0L, refQueue.getNumOfPersistentBytes());
271                      assertEquals(ME + " the number of persistent entries in the queue should be zero ", 0L, refQueue.getNumOfPersistentEntries());
272 
273                      assertEquals(ME + " the maximum number of entries is wrong ", maxNumOfBytes[is], refQueue.getMaxNumOfBytes());
274 
275                      try {
276 
277                         refQueue.clear();
278                         // prepare the inputs .
279                         Hashtable[] inputTable = new Hashtable[3];
280                         for (int i=0; i < 3; i++) inputTable[i] = new Hashtable();
281 
282                         DummyEntry[] transients = new DummyEntry[numOfTransientEntries[it]];
283                         DummyEntry[] persistentEntries    = new DummyEntry[numOfPersistentEntries[id]];
284 
285                         log.info("putPeekRemove " + queueId + " persistent: " + persistentEntries.length + " transient: " + transients.length);
286 
287                         boolean persistent = false;
288                         for (int i=0; i < transients.length; i++) {
289                            int prio = i % 3;
290                            PriorityEnum enumer = PriorityEnum.toPriorityEnum(prio+4);
291                            DummyEntry entry = new DummyEntry(glob, enumer, refQueue.getStorageId(), entrySize, persistent);
292                            transients[i] = entry;
293                            inputTable[prio].put(new Long(entry.getUniqueId()), entry);
294                         }
295                         persistent = true;
296                         for (int i=0; i < persistentEntries.length; i++) {
297                            int prio = i % 3;
298                            PriorityEnum enumer = PriorityEnum.toPriorityEnum(prio+4);
299                            DummyEntry entry = new DummyEntry(glob, enumer, refQueue.getStorageId(), entrySize, persistent);
300                            persistentEntries[i] = entry;
301                            inputTable[prio].put(new Long(entry.getUniqueId()), entry);
302                         }
303 
304                         // do the test here ....
305                         assertEquals(ME + " number of persistent entries is wrong ", 0L, refQueue.getNumOfPersistentEntries());
306                         assertEquals(ME + " number of entries is wrong ", 0L, refQueue.getNumOfEntries());
307                         for (int i=0; i < transients.length; i++) {
308                            lastSuccessfulLocation = "transientEntries put #" + i;
309                            refQueue.put(transients[i], false);
310                         }
311                         assertEquals(ME + " number of entries after putting transients is wrong ", transients.length, refQueue.getNumOfEntries());
312                         for (int i=0; i < persistentEntries.length; i++) {
313                            lastSuccessfulLocation = "persistentEntries put #" + i;
314                            refQueue.put(persistentEntries[i], false);
315                         }
316                         assertEquals(ME + " number of entries after putting transients is wrong ", persistentEntries.length + transients.length, refQueue.getNumOfEntries());
317                         long nPersistents  = refQueue.getNumOfPersistentEntries();
318                         long nTransient = refQueue.getNumOfEntries() - nPersistents;
319 
320                         assertEquals(ME + " number of persistent entries is wrong ", persistentEntries.length, nPersistents);
321                         assertEquals(ME + " number of transient entries is wrong ", transients.length, nTransient);
322 
323                         List<I_Entry> total = new ArrayList();
324                         List<I_Entry> ret = refQueue.peekSamePriority(-1, -1L);
325                         refQueue.removeRandom((I_QueueEntry[])ret.toArray(new I_QueueEntry[ret.size()]));
326                         while (ret.size() > 0) {
327                            total.addAll(ret);
328                            ret = refQueue.peekSamePriority(-1, -1L);
329                            if (ret.size() > 0)
330                               refQueue.removeRandom((I_QueueEntry[])ret.toArray(new I_QueueEntry[ret.size()]));
331                         }
332                         int mustEntries = inputTable[0].size() + inputTable[1].size() + inputTable[2].size();
333 
334 
335                         long totNumOfBytes = entrySize * (numOfPersistentEntries[id]+numOfTransientEntries[it]);
336                         log.fine("total number of bytes: " + totNumOfBytes + " maxNumOfBytes: " + maxNumOfBytes[is]);
337                         log.fine("entries must be: " + mustEntries);
338 
339                         assertTrue("Overflow is not allowed " + refQueue.toXml("") + "total number of bytes " + totNumOfBytes + " max number of bytes: " + maxNumOfBytes[is], totNumOfBytes <= maxNumOfBytes[is]);
340 //                        assertTrue(ME + " Overflow is not allowed " + refQueue.toXml("") , checkIfPossible(transientNumOfBytes, persistentNumOfBytes, maxTransientNumOfBytes, maxPersistentNumOfBytes));
341                         assertEquals(ME + " number of returned values differe from input values " + refQueue.toXml(""), mustEntries, total.size());
342                         log.info("SUCCESS: cacheSize=" + maxNumOfBytesCache[ic] + " maxBytes=" + maxNumOfBytes[is] + " .... looks OK");
343 
344                         int count = 0;
345                         for (int j=0; j < 3; j++) {
346                            Hashtable table = inputTable[j];
347                            Enumeration keys = table.keys();
348                            while (keys.hasMoreElements()) {
349                               ((I_QueueEntry)table.get(keys.nextElement())).getUniqueId();
350                               ((I_QueueEntry)total.get(count)).getUniqueId();
351                               assertEquals("uniqueId differe for count " + count + " " + refQueue.toXml(""), mustEntries, total.size());
352                               count++;
353                            }
354                         }
355                      }
356                      catch(XmlBlasterException e) {
357                         log.finest("Exception (might be ok): " + e.toString());
358                         assertTrue("Overflow is not allowed on location '"+ lastSuccessfulLocation + "' " + refQueue.toXml("") + "total number of bytes " + entrySize*(numOfPersistentEntries[id]+numOfTransientEntries[it]) + " max muber of bytes: " + maxNumOfBytes[is], entrySize*(numOfPersistentEntries[id]+numOfTransientEntries[it]) > maxNumOfBytes[is]);
359                         log.info("SUCCESS: Exception is OK: " + e.toString());
360                      }
361                   }
362                }
363             }
364          }
365    }
366 
367 
368    public void testAvailability() {
369       String queueType = "CACHE";
370       try {
371          availability();
372       }
373       catch (XmlBlasterException ex) {
374          ex.printStackTrace();
375          fail("Exception when testing availability probably due to failed initialization of the queue of type " + queueType);
376       }
377    }
378 
379 
380    /**
381     * when queue available:
382     * -fill queue with 3 persistent and 2 transient messages -> RAM:5 JDBC:3
383     * - queue is made unavailable
384     * - queue is filled with 2 persistent and 3 transient msg -> RAM:10 JDBC:3 (since no comm)
385     * - peek and then remove all available entries: -> RAM:0 JDBC:3 (since no comm)
386     */
387    public void availability() throws XmlBlasterException {
388       // set up the queues ....
389       long maxNumOfBytesCache = 10000L;
390       long maxNumOfBytes = 50000L;
391       long entrySize = 100L;
392 
393       QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
394       prop.setMaxEntries(2000L);
395       prop.setMaxEntriesCache(1000L);
396       prop.setMaxBytes(maxNumOfBytes);
397       prop.setMaxBytesCache(maxNumOfBytesCache);
398       StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "CacheQueueTest/jdbc" + maxNumOfBytes + "/ram" + maxNumOfBytesCache);
399       this.queue.clear();
400       this.queue.shutdown();
401       this.queue.initialize(queueId, prop);
402 
403       if (!this.queue.isShutdown()) this.queue.shutdown();
404       this.queue.initialize(queueId, prop);
405       this.queue.clear();
406 
407       int numOfEntries = 20;
408       int entries1 = 5;
409       int entries2 = 10;
410 
411       this.queue.clear();
412       DummyEntry[] entries = new DummyEntry[numOfEntries];
413       PriorityEnum prio = PriorityEnum.toPriorityEnum(4);
414 
415       boolean persistent = false;
416       for (int i=0; i < numOfEntries; i++) {
417          persistent = (i % 2) == 0; // even are persistent uneven are transient
418          entries[i] = new DummyEntry(glob, prio, this.queue.getStorageId(), entrySize, persistent);
419       }
420 
421       // do the test here ....
422       for (int i=0; i < entries1; i++) {
423          this.queue.put(entries[i], false);
424 //         assertEquals(ME + " number of entries after putting transients is wrong ", transients.length, queue.getNumOfEntries());
425       }
426 
427       CacheQueueInterceptorPlugin cacheQueue = (CacheQueueInterceptorPlugin)this.queue;
428       cacheQueue.storageUnavailable(I_StorageProblemListener.AVAILABLE);
429 
430       for (int i=entries1; i < entries2; i++) {
431          this.queue.put(entries[i], false);
432       }
433 
434       List<I_Entry> list = this.queue.peek(-1, -1L);
435       assertEquals(ME + " number of entries when retrieving is wrong ", entries2, list.size());
436       for (int i=0; i < list.size(); i++) {
437          long uniqueId = ((I_QueueEntry)list.get(i)).getUniqueId();
438          assertEquals(ME + " entry sequence is wrong ", entries[i].getUniqueId(), uniqueId);
439       }
440       long ret = 0L;
441       boolean[] tmpArr = this.queue.removeRandom( (I_QueueEntry[])list.toArray(new I_QueueEntry[list.size()]) );
442       for (int i=0; i < tmpArr.length; i++) if (tmpArr[i]) ret++;
443       assertEquals(ME + " number of entries removed is wrong ", (long)entries2, ret);
444 
445       list = this.queue.peek(-1, -1L);
446       assertEquals(ME + " number of entries peeked after removal is wrong ", 0, list.size());
447 
448       long num = this.queue.getNumOfEntries();
449       assertEquals(ME + " number of entries after removal is wrong ", 0L, num);
450 
451       cacheQueue.storageAvailable(I_StorageProblemListener.UNAVAILABLE);
452       list = this.queue.peek(-1, -1L);
453       assertEquals(ME + " number of entries peeked after reconnecting is wrong ", 0, list.size());
454 
455       num = this.queue.getNumOfEntries();
456       assertEquals(ME + " number of entries after reconnecting is wrong ", 0L, num);
457 
458 /*
459       for (int i=entries2; i < numOfEntries; i++) {
460          this.queue.put(entries[i], false);
461       }
462 */
463 
464    }
465 
466 
467    /**
468     * <pre>
469     *  java org.xmlBlaster.test.classtest.queue.CacheQueueTest
470     * </pre>
471     */
472    public static void main(String args[]) {
473 
474       Global glob = new Global(args);
475       CacheQueueTest testSub = new CacheQueueTest(glob, "CacheQueueTest");
476 
477       long startTime = System.currentTimeMillis();
478 
479       testSub.setUp();
480       testSub.testAvailability();
481       testSub.tearDown();
482 
483       testSub.setUp();
484       testSub.testConfig();
485       testSub.tearDown();
486 
487       testSub.setUp();
488       testSub.testPutPeekRemove();
489       testSub.tearDown();
490 
491       testSub.setUp();
492       testSub.testClearWithSwappedEntries();
493       testSub.tearDown();
494 
495       long usedTime = System.currentTimeMillis() - startTime;
496       log.info("time used for tests: " + usedTime/1000 + " seconds");
497    }
498 }
499 
500                                                                        


syntax highlighted by Code2HTML, v. 0.9.1