1 package org.xmlBlaster.test.classtest.queue;
  2 
  3 import java.util.logging.Logger;
  4 import org.xmlBlaster.util.Global;
  5 import org.xmlBlaster.util.XmlBlasterException;
  6 import org.xmlBlaster.util.def.PriorityEnum;
  7 import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;
  8 import org.xmlBlaster.util.queue.StorageId;
  9 import org.xmlBlaster.util.queue.I_Queue;
 10 import org.xmlBlaster.util.queue.I_QueueEntry;
 11 import org.xmlBlaster.util.queue.I_StorageProblemListener;
 12 import org.xmlBlaster.util.def.Constants;
 13 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
 14 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
 15 
 16 import org.xmlBlaster.util.queuemsg.DummyEntry;
 17 
 18 import java.util.ArrayList;
 19 import java.util.Hashtable;
 20 
 21 import junit.framework.*;
 22 import java.util.Enumeration;
 23 import org.xmlBlaster.util.queue.QueuePluginManager;
 24 import org.xmlBlaster.util.plugin.PluginInfo;
 25 
 26 /**
 27  * Test CacheQueueInterceptorPlugin.
 28  * <p>
 29  * The sorting order is priority,timestamp:
 30  * </p>
 31  * <pre>
 32  *   ->    5,100 - 5,98 - 5,50 - 9,3000 - 9,2500   ->
 33  * </pre>
 34  * <p>
 35  * As 9 is highest priority it is the first to be taken out.<br />
 36  * As we need to maintain the timely sequence and
 37  * id is a timestamp in (more or less) nano seconds elapsed since 1970)
 38  * the id 2500 (it is older) has precedence to the id 3000
 39  * </p>
 40  * <p>
 41  * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.CacheQueueTest
 42  * </p>
 43  * <p>
 44  * Configuration example:
 45  * </p>
 46  * <pre>
 47  * JdbcDriver.drivers=org.postgresql.Driver
 48  * JdbcDriver.postgresql.mapping=string=text,longint=bigint,int=integer,boolean=boolean
 49  * queue.callback.url=jdbc:postgresql://localhost/test
 50  * queue.callback.user=postgres
 51  * queue.callback.password=
 52  * </pre>
 53  * <p>
 54  * Test database with PostgreSQL:
 55  * </p>
 56  * <pre>
 57  * initdb /tmp/postgres
 58  * cp /var/lib/pgsql/data/pg_hba.conf /tmp/postgres    (edit host access)
 59  * createdb test
 60  * postmaster -i -D /tmp/postgres
 61  * </pre>
 62  * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry)
 63  * @see org.xmlBlaster.util.queue.I_Queue
 64  * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin
 65  * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
 66  */
 67 public class CacheQueueTest extends TestCase {
 68    private String ME = "CacheQueueTest";
 69    protected Global glob;
 70    private static Logger log = Logger.getLogger(CacheQueueTest.class.getName());
 71    private CacheQueueInterceptorPlugin queue = null;
 72    private I_Queue[] queues;
 73    public ArrayList queueList = null;
 74 
 75    public CacheQueueTest(String name) {
 76       this(Global.instance(), name);
 77    }
 78 
 79    public CacheQueueTest(Global glob, String name) {
 80       super(name);
 81       this.glob = glob;
 82    }
 83 
 84    protected void setUp() {
 85       glob = Global.instance();
 86 
 87       QueuePropertyBase cbProp = null;
 88 
 89       try {
 90          glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
 91 
 92          cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
 93          StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SetupQueue");
 94 
 95          this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
 96          QueuePluginManager pluginManager = new QueuePluginManager(glob);
 97          PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
 98          java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
 99          prop.put("tableNamePrefix", "TEST");
100          prop.put("entriesTableName", "_entries");
101          this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());
102 
103          pluginInfo = new PluginInfo(glob, pluginManager, "CACHE", "1.0");
104          this.queue = (CacheQueueInterceptorPlugin)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
105          this.queues = new I_Queue[3];
106 
107          pluginInfo = new PluginInfo(glob, pluginManager, "RAM", "1.0");
108          this.queues[0] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
109          pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
110          this.queues[1] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
111          this.queues[2] = queue;
112 
113          for (int i=0; i < 3; i++) this.queues[i].shutdown(); // to allow to initialize again
114       }
115       catch (Exception ex) {
116          log.severe("could not propertly set up the database: " + ex.getMessage());
117       }
118 
119    }
120 
121 
122    public void tearDown() {
123       try {
124          for (int i=0; i < 3; i++) {
125             this.queues[i].clear();
126             this.queues[i].shutdown();
127          }
128       }
129       catch (Exception ex) {
130          log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp");
131       }
132    }
133 
134 
135    public void testConfig() {
136       String queueType = "CACHE";
137       try {
138          config(20L, 10L, 500L, 200L);
139       }
140       catch (XmlBlasterException ex) {
141          fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
142          ex.printStackTrace();
143       }
144    }
145 
146 
147    public StorageId config(long maxEntries, long maxEntriesCache, long maxBytes, long maxBytesCache)
148       throws XmlBlasterException {
149 
150       // set up the queues ....
151       QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
152       prop.setMaxEntries(maxEntries);
153       prop.setMaxEntriesCache(maxEntriesCache);
154       prop.setMaxBytes(maxBytes);
155       prop.setMaxBytesCache(maxBytesCache);
156 
157       StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "CacheQueueTest/config");
158 
159       // this.queue = new CacheQueueInterceptorPlugin();
160       this.queue.initialize(queueId, prop);
161 
162       long persistentSize = this.queue.getPersistentQueue().getMaxNumOfBytes();
163       long persistentMsg  = this.queue.getPersistentQueue().getMaxNumOfEntries();
164       long transientSize  = this.queue.getTransientQueue().getMaxNumOfBytes();
165       long transientMsg   = this.queue.getTransientQueue().getMaxNumOfEntries();
166 
167       assertEquals("Wrong persistent size", maxBytes, persistentSize);
168       assertEquals("Wrong persistent num of msg", maxEntries, persistentMsg);
169       if (maxBytesCache != transientSize)
170          log.severe("ERROR: Wrong transient size" + this.queue.getTransientQueue().toXml(""));
171       assertEquals("Wrong transient size" + this.queue.getTransientQueue().toXml(""), maxBytesCache, transientSize);
172       assertEquals("Wrong num of transient msg", maxEntriesCache, transientMsg);
173       return queueId;
174    }
175 
176    public void testClearWithSwappedEntries() {
177       String queueType = "CACHE";
178       try {
179          StorageId id = config(20L, 3L, 500L, 100L);
180          PriorityEnum prio = PriorityEnum.toPriorityEnum(5);
181          for (int i=0; i < 15; i++) {
182             boolean persistent =  (i | 1) == 0; // some persistent and some transient
183             long entrySize = 10L;
184             DummyEntry entry = new DummyEntry(glob, prio, id, entrySize, persistent);
185             this.queue.put(entry, true);
186          }
187          
188          long ret = this.queue.clear();
189          assertEquals("wrong number of entries returned by clear", 15L, ret);
190          
191          long numOfEntries = this.queue.getNumOfEntries();
192          long numOfBytes = this.queue.getNumOfBytes();
193          assertEquals("the queue should be empty", 0L, numOfEntries);
194          assertEquals("the size of the queue should be 0", 0L, numOfBytes);
195       }
196       catch (XmlBlasterException ex) {
197          fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
198          ex.printStackTrace();
199       }
200    }
201 
202 
203 
204    public void testPutPeekRemove() {
205       String queueType = this.glob.getProperty().get("queueType", "CACHE");
206       log.info("testPutPeekRemove will be done with a queue of type '" + queueType + "'");
207       log.info("if you want to test with another queue type invoke '-queueType $TYPE' on the cmd line where $TYPE is either RAM JDBC or CACHE");
208       int index = 2;
209       if ("RAM".equalsIgnoreCase(queueType)) index = 0;
210       else if ("JDBC".equalsIgnoreCase(queueType)) index = 1;
211 
212       try {
213          putPeekRemove(this.queues[index]);
214       }
215       catch (XmlBlasterException ex) {
216          fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
217          ex.printStackTrace();
218       }
219    }
220 
221 
222    public void putPeekRemove(I_Queue refQueue) throws XmlBlasterException {
223 
224       // set up the queues ....
225 
226       // every content is 80 bytes which gives an entry size of 100 bytes (80+20)
227       long entrySize = 100;
228 
229       String lastSuccessfulLocation = "";
230       long maxNumOfBytesCache[] = {700L, 10000L};
231       long maxNumOfBytes[] = {700L, 50000L};
232       int numOfTransientEntries[] = { 2, 50, 200};
233       int numOfPersistentEntries[] =  { 0, 2, 50, 200};
234 //      int numPrio[] = { 1, 5, 9};
235 
236 //      int it=0, id=0, ic=0, is=0;
237 //      try {
238          for (int ic=0; ic < maxNumOfBytesCache.length; ic++) {
239             for (int is=0; is < maxNumOfBytes.length; is++) {
240                log.info("**** TEST maxNumOfBytesCache["+ic+"]=" + maxNumOfBytesCache[ic] + " maxNumOfBytes["+is+"]=" + maxNumOfBytes[is]);
241                // a new queue each time here ...
242                QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
243                prop.setMaxEntries(2000L);
244                prop.setMaxEntriesCache(1000L);
245                prop.setMaxBytes(maxNumOfBytes[is]);
246                prop.setMaxBytesCache(maxNumOfBytesCache[ic]);
247                StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "CacheQueueTest/jdbc" + maxNumOfBytes[is] + "/ram" + maxNumOfBytesCache[ic]);
248 
249 //               this.queue = new CacheQueueInterceptorPlugin();
250                refQueue.clear();
251                refQueue.shutdown();
252 
253                refQueue.initialize(queueId, prop);
254 
255                for (int it=0; it < numOfTransientEntries.length; it++) {
256                   // entry.setPrio(4+(it%3));
257                   for (int id=0; id < numOfPersistentEntries.length; id++) {
258 
259                      log.info("**** SUB-TEST maxNumOfBytesCache["+ic+"]=" + maxNumOfBytesCache[ic] + " maxNumOfBytes["+is+"]=" + maxNumOfBytes[is] +
260                                    " -> numOfTransientEntries["+it+"]=" + numOfTransientEntries[it] + " numOfPersistentEntries["+id+"]=" + numOfPersistentEntries[id]);
261                      if (!refQueue.isShutdown()) refQueue.shutdown();
262                      refQueue.initialize(queueId, prop);
263                      refQueue.clear();
264 
265                      assertEquals(ME + " the number of bytes of the queue should be zero ", 0L, refQueue.getNumOfBytes());
266                      assertEquals(ME + " the number of entries in the queue should be zero ", 0L, refQueue.getNumOfEntries());
267                      assertEquals(ME + " the number of bytes of the persistent entries in the queue should be zero ", 0L, refQueue.getNumOfPersistentBytes());
268                      assertEquals(ME + " the number of persistent entries in the queue should be zero ", 0L, refQueue.getNumOfPersistentEntries());
269 
270                      assertEquals(ME + " the maximum number of entries is wrong ", maxNumOfBytes[is], refQueue.getMaxNumOfBytes());
271 
272                      try {
273 
274                         refQueue.clear();
275                         // prepare the inputs .
276                         Hashtable[] inputTable = new Hashtable[3];
277                         for (int i=0; i < 3; i++) inputTable[i] = new Hashtable();
278 
279                         DummyEntry[] transients = new DummyEntry[numOfTransientEntries[it]];
280                         DummyEntry[] persistentEntries    = new DummyEntry[numOfPersistentEntries[id]];
281 
282                         log.info("putPeekRemove " + queueId + " persistent: " + persistentEntries.length + " transient: " + transients.length);
283 
284                         boolean persistent = false;
285                         for (int i=0; i < transients.length; i++) {
286                            int prio = i % 3;
287                            PriorityEnum enumer = PriorityEnum.toPriorityEnum(prio+4);
288                            DummyEntry entry = new DummyEntry(glob, enumer, refQueue.getStorageId(), entrySize, persistent);
289                            transients[i] = entry;
290                            inputTable[prio].put(new Long(entry.getUniqueId()), entry);
291                         }
292                         persistent = true;
293                         for (int i=0; i < persistentEntries.length; i++) {
294                            int prio = i % 3;
295                            PriorityEnum enumer = PriorityEnum.toPriorityEnum(prio+4);
296                            DummyEntry entry = new DummyEntry(glob, enumer, refQueue.getStorageId(), entrySize, persistent);
297                            persistentEntries[i] = entry;
298                            inputTable[prio].put(new Long(entry.getUniqueId()), entry);
299                         }
300 
301                         // do the test here ....
302                         assertEquals(ME + " number of persistent entries is wrong ", 0L, refQueue.getNumOfPersistentEntries());
303                         assertEquals(ME + " number of entries is wrong ", 0L, refQueue.getNumOfEntries());
304                         for (int i=0; i < transients.length; i++) {
305                            lastSuccessfulLocation = "transientEntries put #" + i;
306                            refQueue.put(transients[i], false);
307                         }
308                         assertEquals(ME + " number of entries after putting transients is wrong ", transients.length, refQueue.getNumOfEntries());
309                         for (int i=0; i < persistentEntries.length; i++) {
310                            lastSuccessfulLocation = "persistentEntries put #" + i;
311                            refQueue.put(persistentEntries[i], false);
312                         }
313                         assertEquals(ME + " number of entries after putting transients is wrong ", persistentEntries.length + transients.length, refQueue.getNumOfEntries());
314                         long nPersistents  = refQueue.getNumOfPersistentEntries();
315                         long nTransient = refQueue.getNumOfEntries() - nPersistents;
316 
317                         assertEquals(ME + " number of persistent entries is wrong ", persistentEntries.length, nPersistents);
318                         assertEquals(ME + " number of transient entries is wrong ", transients.length, nTransient);
319 
320                         ArrayList total = new ArrayList();
321                         ArrayList ret = refQueue.peekSamePriority(-1, -1L);
322                         refQueue.removeRandom((I_QueueEntry[])ret.toArray(new I_QueueEntry[ret.size()]));
323                         while (ret.size() > 0) {
324                            total.addAll(ret);
325                            ret = refQueue.peekSamePriority(-1, -1L);
326                            if (ret.size() > 0)
327                               refQueue.removeRandom((I_QueueEntry[])ret.toArray(new I_QueueEntry[ret.size()]));
328                         }
329                         int mustEntries = inputTable[0].size() + inputTable[1].size() + inputTable[2].size();
330 
331 
332                         long totNumOfBytes = entrySize * (numOfPersistentEntries[id]+numOfTransientEntries[it]);
333                         log.fine("total number of bytes: " + totNumOfBytes + " maxNumOfBytes: " + maxNumOfBytes[is]);
334                         log.fine("entries must be: " + mustEntries);
335 
336                         assertTrue("Overflow is not allowed " + refQueue.toXml("") + "total number of bytes " + totNumOfBytes + " max number of bytes: " + maxNumOfBytes[is], totNumOfBytes <= maxNumOfBytes[is]);
337 //                        assertTrue(ME + " Overflow is not allowed " + refQueue.toXml("") , checkIfPossible(transientNumOfBytes, persistentNumOfBytes, maxTransientNumOfBytes, maxPersistentNumOfBytes));
338                         assertEquals(ME + " number of returned values differe from input values " + refQueue.toXml(""), mustEntries, total.size());
339                         log.info("SUCCESS: cacheSize=" + maxNumOfBytesCache[ic] + " maxBytes=" + maxNumOfBytes[is] + " .... looks OK");
340 
341                         int count = 0;
342                         for (int j=0; j < 3; j++) {
343                            Hashtable table = inputTable[j];
344                            Enumeration keys = table.keys();
345                            while (keys.hasMoreElements()) {
346                               ((I_QueueEntry)table.get(keys.nextElement())).getUniqueId();
347                               ((I_QueueEntry)total.get(count)).getUniqueId();
348                               assertEquals("uniqueId differe for count " + count + " " + refQueue.toXml(""), mustEntries, total.size());
349                               count++;
350                            }
351                         }
352                      }
353                      catch(XmlBlasterException e) {
354                         log.finest("Exception (might be ok): " + e.toString());
355                         assertTrue("Overflow is not allowed on location '"+ lastSuccessfulLocation + "' " + refQueue.toXml("") + "total number of bytes " + entrySize*(numOfPersistentEntries[id]+numOfTransientEntries[it]) + " max muber of bytes: " + maxNumOfBytes[is], entrySize*(numOfPersistentEntries[id]+numOfTransientEntries[it]) > maxNumOfBytes[is]);
356                         log.info("SUCCESS: Exception is OK: " + e.toString());
357                      }
358                   }
359                }
360             }
361          }
362    }
363 
364 
365    public void testAvailability() {
366       String queueType = "CACHE";
367       try {
368          availability();
369       }
370       catch (XmlBlasterException ex) {
371          fail("Exception when testing availability probably due to failed initialization of the queue of type " + queueType);
372          ex.printStackTrace();
373       }
374    }
375 
376 
377    /**
378     * when queue available:
379     * -fill queue with 3 persistent and 2 transient messages -> RAM:5 JDBC:3
380     * - queue is made unavailable
381     * - queue is filled with 2 persistent and 3 transient msg -> RAM:10 JDBC:3 (since no comm)
382     * - peek and then remove all available entries: -> RAM:0 JDBC:3 (since no comm)
383     */
384    public void availability() throws XmlBlasterException {
385       // set up the queues ....
386       long maxNumOfBytesCache = 10000L;
387       long maxNumOfBytes = 50000L;
388       long entrySize = 100L;
389 
390       QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
391       prop.setMaxEntries(2000L);
392       prop.setMaxEntriesCache(1000L);
393       prop.setMaxBytes(maxNumOfBytes);
394       prop.setMaxBytesCache(maxNumOfBytesCache);
395       StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "CacheQueueTest/jdbc" + maxNumOfBytes + "/ram" + maxNumOfBytesCache);
396       this.queue.clear();
397       this.queue.shutdown();
398       this.queue.initialize(queueId, prop);
399 
400       if (!this.queue.isShutdown()) this.queue.shutdown();
401       this.queue.initialize(queueId, prop);
402       this.queue.clear();
403 
404       int numOfEntries = 20;
405       int entries1 = 5;
406       int entries2 = 10;
407 
408       this.queue.clear();
409       DummyEntry[] entries = new DummyEntry[numOfEntries];
410       PriorityEnum prio = PriorityEnum.toPriorityEnum(4);
411 
412       boolean persistent = false;
413       for (int i=0; i < numOfEntries; i++) {
414          persistent = (i % 2) == 0; // even are persistent uneven are transient
415          entries[i] = new DummyEntry(glob, prio, this.queue.getStorageId(), entrySize, persistent);
416       }
417 
418       // do the test here ....
419       for (int i=0; i < entries1; i++) {
420          this.queue.put(entries[i], false);
421 //         assertEquals(ME + " number of entries after putting transients is wrong ", transients.length, queue.getNumOfEntries());
422       }
423 
424       CacheQueueInterceptorPlugin cacheQueue = (CacheQueueInterceptorPlugin)this.queue;
425       cacheQueue.storageUnavailable(I_StorageProblemListener.AVAILABLE);
426 
427       for (int i=entries1; i < entries2; i++) {
428          this.queue.put(entries[i], false);
429       }
430 
431       ArrayList list = this.queue.peek(-1, -1L);
432       assertEquals(ME + " number of entries when retrieving is wrong ", entries2, list.size());
433       for (int i=0; i < list.size(); i++) {
434          long uniqueId = ((I_QueueEntry)list.get(i)).getUniqueId();
435          assertEquals(ME + " entry sequence is wrong ", entries[i].getUniqueId(), uniqueId);
436       }
437       long ret = 0L;
438       boolean[] tmpArr = this.queue.removeRandom( (I_QueueEntry[])list.toArray(new I_QueueEntry[list.size()]) );
439       for (int i=0; i < tmpArr.length; i++) if (tmpArr[i]) ret++;
440       assertEquals(ME + " number of entries removed is wrong ", (long)entries2, ret);
441 
442       list = this.queue.peek(-1, -1L);
443       assertEquals(ME + " number of entries peeked after removal is wrong ", 0, list.size());
444 
445       long num = this.queue.getNumOfEntries();
446       assertEquals(ME + " number of entries after removal is wrong ", 0L, num);
447 
448       cacheQueue.storageAvailable(I_StorageProblemListener.UNAVAILABLE);
449       list = this.queue.peek(-1, -1L);
450       assertEquals(ME + " number of entries peeked after reconnecting is wrong ", 0, list.size());
451 
452       num = this.queue.getNumOfEntries();
453       assertEquals(ME + " number of entries after reconnecting is wrong ", 0L, num);
454 
455 /*
456       for (int i=entries2; i < numOfEntries; i++) {