1 package org.xmlBlaster.test.classtest.queue;
2
3 import java.util.ArrayList;
4 import java.util.Enumeration;
5 import java.util.Hashtable;
6 import java.util.List;
7 import java.util.logging.Logger;
8
9 import junit.framework.TestCase;
10
11 import org.xmlBlaster.util.Global;
12 import org.xmlBlaster.util.XmlBlasterException;
13 import org.xmlBlaster.util.def.Constants;
14 import org.xmlBlaster.util.def.PriorityEnum;
15 import org.xmlBlaster.util.plugin.PluginInfo;
16 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
17 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
18 import org.xmlBlaster.util.queue.I_Entry;
19 import org.xmlBlaster.util.queue.I_Queue;
20 import org.xmlBlaster.util.queue.I_QueueEntry;
21 import org.xmlBlaster.util.queue.I_StorageProblemListener;
22 import org.xmlBlaster.util.queue.QueuePluginManager;
23 import org.xmlBlaster.util.queue.StorageId;
24 import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;
25 import org.xmlBlaster.util.queuemsg.DummyEntry;
26
27 /**
28 * Test CacheQueueInterceptorPlugin.
29 * <p>
30 * The sorting order is priority,timestamp:
31 * </p>
32 * <pre>
33 * -> 5,100 - 5,98 - 5,50 - 9,3000 - 9,2500 ->
34 * </pre>
35 * <p>
36 * As 9 is highest priority it is the first to be taken out.<br />
37 * As we need to maintain the timely sequence and
38 * id is a timestamp in (more or less) nano seconds elapsed since 1970)
39 * the id 2500 (it is older) has precedence to the id 3000
40 * </p>
41 * <p>
42 * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.CacheQueueTest
43 * </p>
44 * <p>
45 * Configuration example:
46 * </p>
47 * <pre>
48 * JdbcDriver.drivers=org.postgresql.Driver
49 * JdbcDriver.postgresql.mapping=string=text,longint=bigint,int=integer,boolean=boolean
50 * queue.callback.url=jdbc:postgresql://localhost/test
51 * queue.callback.user=postgres
52 * queue.callback.password=
53 * </pre>
54 * <p>
55 * Test database with PostgreSQL:
56 * </p>
57 * <pre>
58 * initdb /tmp/postgres
59 * cp /var/lib/pgsql/data/pg_hba.conf /tmp/postgres (edit host access)
60 * createdb test
61 * postmaster -i -D /tmp/postgres
62 * </pre>
63 * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry)
64 * @see org.xmlBlaster.util.queue.I_Queue
65 * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin
66 * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
67 */
68 public class CacheQueueTest extends TestCase {
69 private String ME = "CacheQueueTest";
70 protected Global glob;
71 private static Logger log = Logger.getLogger(CacheQueueTest.class.getName());
72 private CacheQueueInterceptorPlugin queue = null;
73 private I_Queue[] queues;
74
75 public CacheQueueTest(String name) {
76 this(Global.instance(), name);
77 }
78
79 public CacheQueueTest(Global glob, String name) {
80 super(name);
81 this.glob = glob;
82 }
83
84 protected void setUp() {
85 glob = Global.instance();
86
87 QueuePropertyBase cbProp = null;
88
89 try {
90 glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
91
92 cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
93 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "SetupQueue");
94
95 this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
96 QueuePluginManager pluginManager = new QueuePluginManager(glob);
97 PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
98 java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
99 prop.put("tableNamePrefix", "TEST");
100 prop.put("entriesTableName", "_entries");
101 this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());
102
103 pluginInfo = new PluginInfo(glob, pluginManager, "CACHE", "1.0");
104 this.queue = (CacheQueueInterceptorPlugin)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
105 this.queues = new I_Queue[3];
106
107 pluginInfo = new PluginInfo(glob, pluginManager, "RAM", "1.0");
108 this.queues[0] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
109 pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
110 this.queues[1] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
111 this.queues[2] = queue;
112
113 for (int i=0; i < 3; i++) {
114 this.queues[i].clear();
115 this.queues[i].shutdown(); // to allow to initialize again
116 }
117 }
118 catch (Exception ex) {
119 log.severe("could not propertly set up the database: " + ex.getMessage());
120 }
121
122 }
123
124
125 public void tearDown() {
126 try {
127 for (int i=0; i < 3; i++) {
128 this.queues[i].clear();
129 this.queues[i].shutdown();
130 }
131 }
132 catch (Exception ex) {
133 log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp");
134 }
135 }
136
137
138 public void testConfig() {
139 String queueType = "CACHE";
140 try {
141 config(20L, 10L, 500L, 200L);
142 }
143 catch (XmlBlasterException ex) {
144 ex.printStackTrace();
145 fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
146 }
147 }
148
149
150 public StorageId config(long maxEntries, long maxEntriesCache, long maxBytes, long maxBytesCache)
151 throws XmlBlasterException {
152
153 // set up the queues ....
154 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
155 prop.setMaxEntries(maxEntries);
156 prop.setMaxEntriesCache(maxEntriesCache);
157 prop.setMaxBytes(maxBytes);
158 prop.setMaxBytesCache(maxBytesCache);
159
160 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "CacheQueueTest/config");
161
162 // this.queue = new CacheQueueInterceptorPlugin();
163 this.queue.initialize(queueId, prop);
164 this.queue.clear();
165 long persistentSize = this.queue.getPersistentQueue().getMaxNumOfBytes();
166 long persistentMsg = this.queue.getPersistentQueue().getMaxNumOfEntries();
167 long transientSize = this.queue.getTransientQueue().getMaxNumOfBytes();
168 long transientMsg = this.queue.getTransientQueue().getMaxNumOfEntries();
169
170 assertEquals("Wrong persistent size", maxBytes, persistentSize);
171 assertEquals("Wrong persistent num of msg", maxEntries, persistentMsg);
172 if (maxBytesCache != transientSize)
173 log.severe("ERROR: Wrong transient size" + this.queue.getTransientQueue().toXml(""));
174 assertEquals("Wrong transient size" + this.queue.getTransientQueue().toXml(""), maxBytesCache, transientSize);
175 assertEquals("Wrong num of transient msg", maxEntriesCache, transientMsg);
176 return queueId;
177 }
178
179 public void testClearWithSwappedEntries() {
180 String queueType = "CACHE";
181 try {
182 StorageId id = config(20L, 3L, 500L, 100L);
183 PriorityEnum prio = PriorityEnum.toPriorityEnum(5);
184 for (int i=0; i < 15; i++) {
185 boolean persistent = (i | 1) == 0; // some persistent and some transient
186 long entrySize = 10L;
187 DummyEntry entry = new DummyEntry(glob, prio, id, entrySize, persistent);
188 this.queue.put(entry, true);
189 }
190
191 long ret = this.queue.clear();
192 assertEquals("wrong number of entries returned by clear", 15L, ret);
193
194 long numOfEntries = this.queue.getNumOfEntries();
195 long numOfBytes = this.queue.getNumOfBytes();
196 assertEquals("the queue should be empty", 0L, numOfEntries);
197 assertEquals("the size of the queue should be 0", 0L, numOfBytes);
198 }
199 catch (XmlBlasterException ex) {
200 ex.printStackTrace();
201 fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
202 }
203 }
204
205
206
207 public void testPutPeekRemove() {
208 String queueType = this.glob.getProperty().get("queueType", "CACHE");
209 log.info("testPutPeekRemove will be done with a queue of type '" + queueType + "'");
210 log.info("if you want to test with another queue type invoke '-queueType $TYPE' on the cmd line where $TYPE is either RAM JDBC or CACHE");
211 int index = 2;
212 if ("RAM".equalsIgnoreCase(queueType)) index = 0;
213 else if ("JDBC".equalsIgnoreCase(queueType)) index = 1;
214
215 try {
216 putPeekRemove(this.queues[index]);
217 }
218 catch (XmlBlasterException ex) {
219 ex.printStackTrace();
220 fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
221 }
222 }
223
224
225 public void putPeekRemove(I_Queue refQueue) throws XmlBlasterException {
226
227 // set up the queues ....
228
229 // every content is 80 bytes which gives an entry size of 100 bytes (80+20)
230 long entrySize = 100;
231
232 String lastSuccessfulLocation = "";
233 long maxNumOfBytesCache[] = {700L, 10000L};
234 long maxNumOfBytes[] = {700L, 50000L};
235 int numOfTransientEntries[] = { 2, 50, 200};
236 int numOfPersistentEntries[] = { 0, 2, 50, 200};
237 // int numPrio[] = { 1, 5, 9};
238
239 // int it=0, id=0, ic=0, is=0;
240 // try {
241 for (int ic=0; ic < maxNumOfBytesCache.length; ic++) {
242 for (int is=0; is < maxNumOfBytes.length; is++) {
243 log.info("**** TEST maxNumOfBytesCache["+ic+"]=" + maxNumOfBytesCache[ic] + " maxNumOfBytes["+is+"]=" + maxNumOfBytes[is]);
244 // a new queue each time here ...
245 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
246 prop.setMaxEntries(2000L);
247 prop.setMaxEntriesCache(1000L);
248 prop.setMaxBytes(maxNumOfBytes[is]);
249 prop.setMaxBytesCache(maxNumOfBytesCache[ic]);
250 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "CacheQueueTest/jdbc" + maxNumOfBytes[is] + "/ram" + maxNumOfBytesCache[ic]);
251
252 // this.queue = new CacheQueueInterceptorPlugin();
253 refQueue.clear();
254 refQueue.shutdown();
255
256 refQueue.initialize(queueId, prop);
257
258 for (int it=0; it < numOfTransientEntries.length; it++) {
259 // entry.setPrio(4+(it%3));
260 for (int id=0; id < numOfPersistentEntries.length; id++) {
261
262 log.info("**** SUB-TEST maxNumOfBytesCache["+ic+"]=" + maxNumOfBytesCache[ic] + " maxNumOfBytes["+is+"]=" + maxNumOfBytes[is] +
263 " -> numOfTransientEntries["+it+"]=" + numOfTransientEntries[it] + " numOfPersistentEntries["+id+"]=" + numOfPersistentEntries[id]);
264 if (!refQueue.isShutdown()) refQueue.shutdown();
265 refQueue.initialize(queueId, prop);
266 refQueue.clear();
267
268 assertEquals(ME + " the number of bytes of the queue should be zero ", 0L, refQueue.getNumOfBytes());
269 assertEquals(ME + " the number of entries in the queue should be zero ", 0L, refQueue.getNumOfEntries());
270 assertEquals(ME + " the number of bytes of the persistent entries in the queue should be zero ", 0L, refQueue.getNumOfPersistentBytes());
271 assertEquals(ME + " the number of persistent entries in the queue should be zero ", 0L, refQueue.getNumOfPersistentEntries());
272
273 assertEquals(ME + " the maximum number of entries is wrong ", maxNumOfBytes[is], refQueue.getMaxNumOfBytes());
274
275 try {
276
277 refQueue.clear();
278 // prepare the inputs .
279 Hashtable[] inputTable = new Hashtable[3];
280 for (int i=0; i < 3; i++) inputTable[i] = new Hashtable();
281
282 DummyEntry[] transients = new DummyEntry[numOfTransientEntries[it]];
283 DummyEntry[] persistentEntries = new DummyEntry[numOfPersistentEntries[id]];
284
285 log.info("putPeekRemove " + queueId + " persistent: " + persistentEntries.length + " transient: " + transients.length);
286
287 boolean persistent = false;
288 for (int i=0; i < transients.length; i++) {
289 int prio = i % 3;
290 PriorityEnum enumer = PriorityEnum.toPriorityEnum(prio+4);
291 DummyEntry entry = new DummyEntry(glob, enumer, refQueue.getStorageId(), entrySize, persistent);
292 transients[i] = entry;
293 inputTable[prio].put(new Long(entry.getUniqueId()), entry);
294 }
295 persistent = true;
296 for (int i=0; i < persistentEntries.length; i++) {
297 int prio = i % 3;
298 PriorityEnum enumer = PriorityEnum.toPriorityEnum(prio+4);
299 DummyEntry entry = new DummyEntry(glob, enumer, refQueue.getStorageId(), entrySize, persistent);
300 persistentEntries[i] = entry;
301 inputTable[prio].put(new Long(entry.getUniqueId()), entry);
302 }
303
304 // do the test here ....
305 assertEquals(ME + " number of persistent entries is wrong ", 0L, refQueue.getNumOfPersistentEntries());
306 assertEquals(ME + " number of entries is wrong ", 0L, refQueue.getNumOfEntries());
307 for (int i=0; i < transients.length; i++) {
308 lastSuccessfulLocation = "transientEntries put #" + i;
309 refQueue.put(transients[i], false);
310 }
311 assertEquals(ME + " number of entries after putting transients is wrong ", transients.length, refQueue.getNumOfEntries());
312 for (int i=0; i < persistentEntries.length; i++) {
313 lastSuccessfulLocation = "persistentEntries put #" + i;
314 refQueue.put(persistentEntries[i], false);
315 }
316 assertEquals(ME + " number of entries after putting transients is wrong ", persistentEntries.length + transients.length, refQueue.getNumOfEntries());
317 long nPersistents = refQueue.getNumOfPersistentEntries();
318 long nTransient = refQueue.getNumOfEntries() - nPersistents;
319
320 assertEquals(ME + " number of persistent entries is wrong ", persistentEntries.length, nPersistents);
321 assertEquals(ME + " number of transient entries is wrong ", transients.length, nTransient);
322
323 List<I_Entry> total = new ArrayList();
324 List<I_Entry> ret = refQueue.peekSamePriority(-1, -1L);
325 refQueue.removeRandom((I_QueueEntry[])ret.toArray(new I_QueueEntry[ret.size()]));
326 while (ret.size() > 0) {
327 total.addAll(ret);
328 ret = refQueue.peekSamePriority(-1, -1L);
329 if (ret.size() > 0)
330 refQueue.removeRandom((I_QueueEntry[])ret.toArray(new I_QueueEntry[ret.size()]));
331 }
332 int mustEntries = inputTable[0].size() + inputTable[1].size() + inputTable[2].size();
333
334
335 long totNumOfBytes = entrySize * (numOfPersistentEntries[id]+numOfTransientEntries[it]);
336 log.fine("total number of bytes: " + totNumOfBytes + " maxNumOfBytes: " + maxNumOfBytes[is]);
337 log.fine("entries must be: " + mustEntries);
338
339 assertTrue("Overflow is not allowed " + refQueue.toXml("") + "total number of bytes " + totNumOfBytes + " max number of bytes: " + maxNumOfBytes[is], totNumOfBytes <= maxNumOfBytes[is]);
340 // assertTrue(ME + " Overflow is not allowed " + refQueue.toXml("") , checkIfPossible(transientNumOfBytes, persistentNumOfBytes, maxTransientNumOfBytes, maxPersistentNumOfBytes));
341 assertEquals(ME + " number of returned values differe from input values " + refQueue.toXml(""), mustEntries, total.size());
342 log.info("SUCCESS: cacheSize=" + maxNumOfBytesCache[ic] + " maxBytes=" + maxNumOfBytes[is] + " .... looks OK");
343
344 int count = 0;
345 for (int j=0; j < 3; j++) {
346 Hashtable table = inputTable[j];
347 Enumeration keys = table.keys();
348 while (keys.hasMoreElements()) {
349 ((I_QueueEntry)table.get(keys.nextElement())).getUniqueId();
350 ((I_QueueEntry)total.get(count)).getUniqueId();
351 assertEquals("uniqueId differe for count " + count + " " + refQueue.toXml(""), mustEntries, total.size());
352 count++;
353 }
354 }
355 }
356 catch(XmlBlasterException e) {
357 log.finest("Exception (might be ok): " + e.toString());
358 assertTrue("Overflow is not allowed on location '"+ lastSuccessfulLocation + "' " + refQueue.toXml("") + "total number of bytes " + entrySize*(numOfPersistentEntries[id]+numOfTransientEntries[it]) + " max muber of bytes: " + maxNumOfBytes[is], entrySize*(numOfPersistentEntries[id]+numOfTransientEntries[it]) > maxNumOfBytes[is]);
359 log.info("SUCCESS: Exception is OK: " + e.toString());
360 }
361 }
362 }
363 }
364 }
365 }
366
367
368 public void testAvailability() {
369 String queueType = "CACHE";
370 try {
371 availability();
372 }
373 catch (XmlBlasterException ex) {
374 ex.printStackTrace();
375 fail("Exception when testing availability probably due to failed initialization of the queue of type " + queueType);
376 }
377 }
378
379
380 /**
381 * when queue available:
382 * -fill queue with 3 persistent and 2 transient messages -> RAM:5 JDBC:3
383 * - queue is made unavailable
384 * - queue is filled with 2 persistent and 3 transient msg -> RAM:10 JDBC:3 (since no comm)
385 * - peek and then remove all available entries: -> RAM:0 JDBC:3 (since no comm)
386 */
387 public void availability() throws XmlBlasterException {
388 // set up the queues ....
389 long maxNumOfBytesCache = 10000L;
390 long maxNumOfBytes = 50000L;
391 long entrySize = 100L;
392
393 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
394 prop.setMaxEntries(2000L);
395 prop.setMaxEntriesCache(1000L);
396 prop.setMaxBytes(maxNumOfBytes);
397 prop.setMaxBytesCache(maxNumOfBytesCache);
398 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "CacheQueueTest/jdbc" + maxNumOfBytes + "/ram" + maxNumOfBytesCache);
399 this.queue.clear();
400 this.queue.shutdown();
401 this.queue.initialize(queueId, prop);
402
403 if (!this.queue.isShutdown()) this.queue.shutdown();
404 this.queue.initialize(queueId, prop);
405 this.queue.clear();
406
407 int numOfEntries = 20;
408 int entries1 = 5;
409 int entries2 = 10;
410
411 this.queue.clear();
412 DummyEntry[] entries = new DummyEntry[numOfEntries];
413 PriorityEnum prio = PriorityEnum.toPriorityEnum(4);
414
415 boolean persistent = false;
416 for (int i=0; i < numOfEntries; i++) {
417 persistent = (i % 2) == 0; // even are persistent uneven are transient
418 entries[i] = new DummyEntry(glob, prio, this.queue.getStorageId(), entrySize, persistent);
419 }
420
421 // do the test here ....
422 for (int i=0; i < entries1; i++) {
423 this.queue.put(entries[i], false);
424 // assertEquals(ME + " number of entries after putting transients is wrong ", transients.length, queue.getNumOfEntries());
425 }
426
427 CacheQueueInterceptorPlugin cacheQueue = (CacheQueueInterceptorPlugin)this.queue;
428 cacheQueue.storageUnavailable(I_StorageProblemListener.AVAILABLE);
429
430 for (int i=entries1; i < entries2; i++) {
431 this.queue.put(entries[i], false);
432 }
433
434 List<I_Entry> list = this.queue.peek(-1, -1L);
435 assertEquals(ME + " number of entries when retrieving is wrong ", entries2, list.size());
436 for (int i=0; i < list.size(); i++) {
437 long uniqueId = ((I_QueueEntry)list.get(i)).getUniqueId();
438 assertEquals(ME + " entry sequence is wrong ", entries[i].getUniqueId(), uniqueId);
439 }
440 long ret = 0L;
441 boolean[] tmpArr = this.queue.removeRandom( (I_QueueEntry[])list.toArray(new I_QueueEntry[list.size()]) );
442 for (int i=0; i < tmpArr.length; i++) if (tmpArr[i]) ret++;
443 assertEquals(ME + " number of entries removed is wrong ", (long)entries2, ret);
444
445 list = this.queue.peek(-1, -1L);
446 assertEquals(ME + " number of entries peeked after removal is wrong ", 0, list.size());
447
448 long num = this.queue.getNumOfEntries();
449 assertEquals(ME + " number of entries after removal is wrong ", 0L, num);
450
451 cacheQueue.storageAvailable(I_StorageProblemListener.UNAVAILABLE);
452 list = this.queue.peek(-1, -1L);
453 assertEquals(ME + " number of entries peeked after reconnecting is wrong ", 0, list.size());
454
455 num = this.queue.getNumOfEntries();
456 assertEquals(ME + " number of entries after reconnecting is wrong ", 0L, num);
457
458 /*
459 for (int i=entries2; i < numOfEntries; i++) {
460 this.queue.put(entries[i], false);
461 }
462 */
463
464 }
465
466
467 /**
468 * <pre>
469 * java org.xmlBlaster.test.classtest.queue.CacheQueueTest
470 * </pre>
471 */
472 public static void main(String args[]) {
473
474 Global glob = new Global(args);
475 CacheQueueTest testSub = new CacheQueueTest(glob, "CacheQueueTest");
476
477 long startTime = System.currentTimeMillis();
478
479 testSub.setUp();
480 testSub.testAvailability();
481 testSub.tearDown();
482
483 testSub.setUp();
484 testSub.testConfig();
485 testSub.tearDown();
486
487 testSub.setUp();
488 testSub.testPutPeekRemove();
489 testSub.tearDown();
490
491 testSub.setUp();
492 testSub.testClearWithSwappedEntries();
493 testSub.tearDown();
494
495 long usedTime = System.currentTimeMillis() - startTime;
496 log.info("time used for tests: " + usedTime/1000 + " seconds");
497 }
498 }
499
500
syntax highlighted by Code2HTML, v. 0.9.1