1 package org.xmlBlaster.test.classtest.queue;
2
3 import java.util.logging.Logger;
4 import org.xmlBlaster.util.Global;
5 import org.xmlBlaster.util.XmlBlasterException;
6 import org.xmlBlaster.util.def.PriorityEnum;
7 import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;
8 import org.xmlBlaster.util.queue.StorageId;
9 import org.xmlBlaster.util.queue.I_Queue;
10 import org.xmlBlaster.util.queue.I_QueueEntry;
11 import org.xmlBlaster.util.queue.I_StorageProblemListener;
12 import org.xmlBlaster.util.def.Constants;
13 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
14 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
15
16 import org.xmlBlaster.util.queuemsg.DummyEntry;
17
18 import java.util.ArrayList;
19 import java.util.Hashtable;
20
21 import junit.framework.*;
22 import java.util.Enumeration;
23 import org.xmlBlaster.util.queue.QueuePluginManager;
24 import org.xmlBlaster.util.plugin.PluginInfo;
25
26 /**
27 * Test CacheQueueInterceptorPlugin.
28 * <p>
29 * The sorting order is priority,timestamp:
30 * </p>
31 * <pre>
32 * -> 5,100 - 5,98 - 5,50 - 9,3000 - 9,2500 ->
33 * </pre>
34 * <p>
35 * As 9 is highest priority it is the first to be taken out.<br />
36 * As we need to maintain the timely sequence and
37 * id is a timestamp in (more or less) nano seconds elapsed since 1970)
38 * the id 2500 (it is older) has precedence to the id 3000
39 * </p>
40 * <p>
41 * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.CacheQueueTest
42 * </p>
43 * <p>
44 * Configuration example:
45 * </p>
46 * <pre>
47 * JdbcDriver.drivers=org.postgresql.Driver
48 * JdbcDriver.postgresql.mapping=string=text,longint=bigint,int=integer,boolean=boolean
49 * queue.callback.url=jdbc:postgresql://localhost/test
50 * queue.callback.user=postgres
51 * queue.callback.password=
52 * </pre>
53 * <p>
54 * Test database with PostgreSQL:
55 * </p>
56 * <pre>
57 * initdb /tmp/postgres
58 * cp /var/lib/pgsql/data/pg_hba.conf /tmp/postgres (edit host access)
59 * createdb test
60 * postmaster -i -D /tmp/postgres
61 * </pre>
62 * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry)
63 * @see org.xmlBlaster.util.queue.I_Queue
64 * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin
65 * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
66 */
67 public class CacheQueueTest extends TestCase {
68 private String ME = "CacheQueueTest";
69 protected Global glob;
70 private static Logger log = Logger.getLogger(CacheQueueTest.class.getName());
71 private CacheQueueInterceptorPlugin queue = null;
72 private I_Queue[] queues;
73 public ArrayList queueList = null;
74
75 public CacheQueueTest(String name) {
76 this(Global.instance(), name);
77 }
78
79 public CacheQueueTest(Global glob, String name) {
80 super(name);
81 this.glob = glob;
82 }
83
84 protected void setUp() {
85 glob = Global.instance();
86
87 QueuePropertyBase cbProp = null;
88
89 try {
90 glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
91
92 cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
93 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SetupQueue");
94
95 this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
96 QueuePluginManager pluginManager = new QueuePluginManager(glob);
97 PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
98 java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
99 prop.put("tableNamePrefix", "TEST");
100 prop.put("entriesTableName", "_entries");
101 this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());
102
103 pluginInfo = new PluginInfo(glob, pluginManager, "CACHE", "1.0");
104 this.queue = (CacheQueueInterceptorPlugin)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
105 this.queues = new I_Queue[3];
106
107 pluginInfo = new PluginInfo(glob, pluginManager, "RAM", "1.0");
108 this.queues[0] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
109 pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
110 this.queues[1] = (I_Queue)pluginManager.getPlugin(pluginInfo, queueId, cbProp);
111 this.queues[2] = queue;
112
113 for (int i=0; i < 3; i++) this.queues[i].shutdown(); // to allow to initialize again
114 }
115 catch (Exception ex) {
116 log.severe("could not propertly set up the database: " + ex.getMessage());
117 }
118
119 }
120
121
122 public void tearDown() {
123 try {
124 for (int i=0; i < 3; i++) {
125 this.queues[i].clear();
126 this.queues[i].shutdown();
127 }
128 }
129 catch (Exception ex) {
130 log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp");
131 }
132 }
133
134
135 public void testConfig() {
136 String queueType = "CACHE";
137 try {
138 config(20L, 10L, 500L, 200L);
139 }
140 catch (XmlBlasterException ex) {
141 fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
142 ex.printStackTrace();
143 }
144 }
145
146
147 public StorageId config(long maxEntries, long maxEntriesCache, long maxBytes, long maxBytesCache)
148 throws XmlBlasterException {
149
150 // set up the queues ....
151 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
152 prop.setMaxEntries(maxEntries);
153 prop.setMaxEntriesCache(maxEntriesCache);
154 prop.setMaxBytes(maxBytes);
155 prop.setMaxBytesCache(maxBytesCache);
156
157 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "CacheQueueTest/config");
158
159 // this.queue = new CacheQueueInterceptorPlugin();
160 this.queue.initialize(queueId, prop);
161
162 long persistentSize = this.queue.getPersistentQueue().getMaxNumOfBytes();
163 long persistentMsg = this.queue.getPersistentQueue().getMaxNumOfEntries();
164 long transientSize = this.queue.getTransientQueue().getMaxNumOfBytes();
165 long transientMsg = this.queue.getTransientQueue().getMaxNumOfEntries();
166
167 assertEquals("Wrong persistent size", maxBytes, persistentSize);
168 assertEquals("Wrong persistent num of msg", maxEntries, persistentMsg);
169 if (maxBytesCache != transientSize)
170 log.severe("ERROR: Wrong transient size" + this.queue.getTransientQueue().toXml(""));
171 assertEquals("Wrong transient size" + this.queue.getTransientQueue().toXml(""), maxBytesCache, transientSize);
172 assertEquals("Wrong num of transient msg", maxEntriesCache, transientMsg);
173 return queueId;
174 }
175
176 public void testClearWithSwappedEntries() {
177 String queueType = "CACHE";
178 try {
179 StorageId id = config(20L, 3L, 500L, 100L);
180 PriorityEnum prio = PriorityEnum.toPriorityEnum(5);
181 for (int i=0; i < 15; i++) {
182 boolean persistent = (i | 1) == 0; // some persistent and some transient
183 long entrySize = 10L;
184 DummyEntry entry = new DummyEntry(glob, prio, id, entrySize, persistent);
185 this.queue.put(entry, true);
186 }
187
188 long ret = this.queue.clear();
189 assertEquals("wrong number of entries returned by clear", 15L, ret);
190
191 long numOfEntries = this.queue.getNumOfEntries();
192 long numOfBytes = this.queue.getNumOfBytes();
193 assertEquals("the queue should be empty", 0L, numOfEntries);
194 assertEquals("the size of the queue should be 0", 0L, numOfBytes);
195 }
196 catch (XmlBlasterException ex) {
197 fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
198 ex.printStackTrace();
199 }
200 }
201
202
203
204 public void testPutPeekRemove() {
205 String queueType = this.glob.getProperty().get("queueType", "CACHE");
206 log.info("testPutPeekRemove will be done with a queue of type '" + queueType + "'");
207 log.info("if you want to test with another queue type invoke '-queueType $TYPE' on the cmd line where $TYPE is either RAM JDBC or CACHE");
208 int index = 2;
209 if ("RAM".equalsIgnoreCase(queueType)) index = 0;
210 else if ("JDBC".equalsIgnoreCase(queueType)) index = 1;
211
212 try {
213 putPeekRemove(this.queues[index]);
214 }
215 catch (XmlBlasterException ex) {
216 fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
217 ex.printStackTrace();
218 }
219 }
220
221
222 public void putPeekRemove(I_Queue refQueue) throws XmlBlasterException {
223
224 // set up the queues ....
225
226 // every content is 80 bytes which gives an entry size of 100 bytes (80+20)
227 long entrySize = 100;
228
229 String lastSuccessfulLocation = "";
230 long maxNumOfBytesCache[] = {700L, 10000L};
231 long maxNumOfBytes[] = {700L, 50000L};
232 int numOfTransientEntries[] = { 2, 50, 200};
233 int numOfPersistentEntries[] = { 0, 2, 50, 200};
234 // int numPrio[] = { 1, 5, 9};
235
236 // int it=0, id=0, ic=0, is=0;
237 // try {
238 for (int ic=0; ic < maxNumOfBytesCache.length; ic++) {
239 for (int is=0; is < maxNumOfBytes.length; is++) {
240 log.info("**** TEST maxNumOfBytesCache["+ic+"]=" + maxNumOfBytesCache[ic] + " maxNumOfBytes["+is+"]=" + maxNumOfBytes[is]);
241 // a new queue each time here ...
242 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
243 prop.setMaxEntries(2000L);
244 prop.setMaxEntriesCache(1000L);
245 prop.setMaxBytes(maxNumOfBytes[is]);
246 prop.setMaxBytesCache(maxNumOfBytesCache[ic]);
247 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "CacheQueueTest/jdbc" + maxNumOfBytes[is] + "/ram" + maxNumOfBytesCache[ic]);
248
249 // this.queue = new CacheQueueInterceptorPlugin();
250 refQueue.clear();
251 refQueue.shutdown();
252
253 refQueue.initialize(queueId, prop);
254
255 for (int it=0; it < numOfTransientEntries.length; it++) {
256 // entry.setPrio(4+(it%3));
257 for (int id=0; id < numOfPersistentEntries.length; id++) {
258
259 log.info("**** SUB-TEST maxNumOfBytesCache["+ic+"]=" + maxNumOfBytesCache[ic] + " maxNumOfBytes["+is+"]=" + maxNumOfBytes[is] +
260 " -> numOfTransientEntries["+it+"]=" + numOfTransientEntries[it] + " numOfPersistentEntries["+id+"]=" + numOfPersistentEntries[id]);
261 if (!refQueue.isShutdown()) refQueue.shutdown();
262 refQueue.initialize(queueId, prop);
263 refQueue.clear();
264
265 assertEquals(ME + " the number of bytes of the queue should be zero ", 0L, refQueue.getNumOfBytes());
266 assertEquals(ME + " the number of entries in the queue should be zero ", 0L, refQueue.getNumOfEntries());
267 assertEquals(ME + " the number of bytes of the persistent entries in the queue should be zero ", 0L, refQueue.getNumOfPersistentBytes());
268 assertEquals(ME + " the number of persistent entries in the queue should be zero ", 0L, refQueue.getNumOfPersistentEntries());
269
270 assertEquals(ME + " the maximum number of entries is wrong ", maxNumOfBytes[is], refQueue.getMaxNumOfBytes());
271
272 try {
273
274 refQueue.clear();
275 // prepare the inputs .
276 Hashtable[] inputTable = new Hashtable[3];
277 for (int i=0; i < 3; i++) inputTable[i] = new Hashtable();
278
279 DummyEntry[] transients = new DummyEntry[numOfTransientEntries[it]];
280 DummyEntry[] persistentEntries = new DummyEntry[numOfPersistentEntries[id]];
281
282 log.info("putPeekRemove " + queueId + " persistent: " + persistentEntries.length + " transient: " + transients.length);
283
284 boolean persistent = false;
285 for (int i=0; i < transients.length; i++) {
286 int prio = i % 3;
287 PriorityEnum enumer = PriorityEnum.toPriorityEnum(prio+4);
288 DummyEntry entry = new DummyEntry(glob, enumer, refQueue.getStorageId(), entrySize, persistent);
289 transients[i] = entry;
290 inputTable[prio].put(new Long(entry.getUniqueId()), entry);
291 }
292 persistent = true;
293 for (int i=0; i < persistentEntries.length; i++) {
294 int prio = i % 3;
295 PriorityEnum enumer = PriorityEnum.toPriorityEnum(prio+4);
296 DummyEntry entry = new DummyEntry(glob, enumer, refQueue.getStorageId(), entrySize, persistent);
297 persistentEntries[i] = entry;
298 inputTable[prio].put(new Long(entry.getUniqueId()), entry);
299 }
300
301 // do the test here ....
302 assertEquals(ME + " number of persistent entries is wrong ", 0L, refQueue.getNumOfPersistentEntries());
303 assertEquals(ME + " number of entries is wrong ", 0L, refQueue.getNumOfEntries());
304 for (int i=0; i < transients.length; i++) {
305 lastSuccessfulLocation = "transientEntries put #" + i;
306 refQueue.put(transients[i], false);
307 }
308 assertEquals(ME + " number of entries after putting transients is wrong ", transients.length, refQueue.getNumOfEntries());
309 for (int i=0; i < persistentEntries.length; i++) {
310 lastSuccessfulLocation = "persistentEntries put #" + i;
311 refQueue.put(persistentEntries[i], false);
312 }
313 assertEquals(ME + " number of entries after putting transients is wrong ", persistentEntries.length + transients.length, refQueue.getNumOfEntries());
314 long nPersistents = refQueue.getNumOfPersistentEntries();
315 long nTransient = refQueue.getNumOfEntries() - nPersistents;
316
317 assertEquals(ME + " number of persistent entries is wrong ", persistentEntries.length, nPersistents);
318 assertEquals(ME + " number of transient entries is wrong ", transients.length, nTransient);
319
320 ArrayList total = new ArrayList();
321 ArrayList ret = refQueue.peekSamePriority(-1, -1L);
322 refQueue.removeRandom((I_QueueEntry[])ret.toArray(new I_QueueEntry[ret.size()]));
323 while (ret.size() > 0) {
324 total.addAll(ret);
325 ret = refQueue.peekSamePriority(-1, -1L);
326 if (ret.size() > 0)
327 refQueue.removeRandom((I_QueueEntry[])ret.toArray(new I_QueueEntry[ret.size()]));
328 }
329 int mustEntries = inputTable[0].size() + inputTable[1].size() + inputTable[2].size();
330
331
332 long totNumOfBytes = entrySize * (numOfPersistentEntries[id]+numOfTransientEntries[it]);
333 log.fine("total number of bytes: " + totNumOfBytes + " maxNumOfBytes: " + maxNumOfBytes[is]);
334 log.fine("entries must be: " + mustEntries);
335
336 assertTrue("Overflow is not allowed " + refQueue.toXml("") + "total number of bytes " + totNumOfBytes + " max number of bytes: " + maxNumOfBytes[is], totNumOfBytes <= maxNumOfBytes[is]);
337 // assertTrue(ME + " Overflow is not allowed " + refQueue.toXml("") , checkIfPossible(transientNumOfBytes, persistentNumOfBytes, maxTransientNumOfBytes, maxPersistentNumOfBytes));
338 assertEquals(ME + " number of returned values differe from input values " + refQueue.toXml(""), mustEntries, total.size());
339 log.info("SUCCESS: cacheSize=" + maxNumOfBytesCache[ic] + " maxBytes=" + maxNumOfBytes[is] + " .... looks OK");
340
341 int count = 0;
342 for (int j=0; j < 3; j++) {
343 Hashtable table = inputTable[j];
344 Enumeration keys = table.keys();
345 while (keys.hasMoreElements()) {
346 ((I_QueueEntry)table.get(keys.nextElement())).getUniqueId();
347 ((I_QueueEntry)total.get(count)).getUniqueId();
348 assertEquals("uniqueId differe for count " + count + " " + refQueue.toXml(""), mustEntries, total.size());
349 count++;
350 }
351 }
352 }
353 catch(XmlBlasterException e) {
354 log.finest("Exception (might be ok): " + e.toString());
355 assertTrue("Overflow is not allowed on location '"+ lastSuccessfulLocation + "' " + refQueue.toXml("") + "total number of bytes " + entrySize*(numOfPersistentEntries[id]+numOfTransientEntries[it]) + " max muber of bytes: " + maxNumOfBytes[is], entrySize*(numOfPersistentEntries[id]+numOfTransientEntries[it]) > maxNumOfBytes[is]);
356 log.info("SUCCESS: Exception is OK: " + e.toString());
357 }
358 }
359 }
360 }
361 }
362 }
363
364
365 public void testAvailability() {
366 String queueType = "CACHE";
367 try {
368 availability();
369 }
370 catch (XmlBlasterException ex) {
371 fail("Exception when testing availability probably due to failed initialization of the queue of type " + queueType);
372 ex.printStackTrace();
373 }
374 }
375
376
377 /**
378 * when queue available:
379 * -fill queue with 3 persistent and 2 transient messages -> RAM:5 JDBC:3
380 * - queue is made unavailable
381 * - queue is filled with 2 persistent and 3 transient msg -> RAM:10 JDBC:3 (since no comm)
382 * - peek and then remove all available entries: -> RAM:0 JDBC:3 (since no comm)
383 */
384 public void availability() throws XmlBlasterException {
385 // set up the queues ....
386 long maxNumOfBytesCache = 10000L;
387 long maxNumOfBytes = 50000L;
388 long entrySize = 100L;
389
390 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
391 prop.setMaxEntries(2000L);
392 prop.setMaxEntriesCache(1000L);
393 prop.setMaxBytes(maxNumOfBytes);
394 prop.setMaxBytesCache(maxNumOfBytesCache);
395 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "CacheQueueTest/jdbc" + maxNumOfBytes + "/ram" + maxNumOfBytesCache);
396 this.queue.clear();
397 this.queue.shutdown();
398 this.queue.initialize(queueId, prop);
399
400 if (!this.queue.isShutdown()) this.queue.shutdown();
401 this.queue.initialize(queueId, prop);
402 this.queue.clear();
403
404 int numOfEntries = 20;
405 int entries1 = 5;
406 int entries2 = 10;
407
408 this.queue.clear();
409 DummyEntry[] entries = new DummyEntry[numOfEntries];
410 PriorityEnum prio = PriorityEnum.toPriorityEnum(4);
411
412 boolean persistent = false;
413 for (int i=0; i < numOfEntries; i++) {
414 persistent = (i % 2) == 0; // even are persistent uneven are transient
415 entries[i] = new DummyEntry(glob, prio, this.queue.getStorageId(), entrySize, persistent);
416 }
417
418 // do the test here ....
419 for (int i=0; i < entries1; i++) {
420 this.queue.put(entries[i], false);
421 // assertEquals(ME + " number of entries after putting transients is wrong ", transients.length, queue.getNumOfEntries());
422 }
423
424 CacheQueueInterceptorPlugin cacheQueue = (CacheQueueInterceptorPlugin)this.queue;
425 cacheQueue.storageUnavailable(I_StorageProblemListener.AVAILABLE);
426
427 for (int i=entries1; i < entries2; i++) {
428 this.queue.put(entries[i], false);
429 }
430
431 ArrayList list = this.queue.peek(-1, -1L);
432 assertEquals(ME + " number of entries when retrieving is wrong ", entries2, list.size());
433 for (int i=0; i < list.size(); i++) {
434 long uniqueId = ((I_QueueEntry)list.get(i)).getUniqueId();
435 assertEquals(ME + " entry sequence is wrong ", entries[i].getUniqueId(), uniqueId);
436 }
437 long ret = 0L;
438 boolean[] tmpArr = this.queue.removeRandom( (I_QueueEntry[])list.toArray(new I_QueueEntry[list.size()]) );
439 for (int i=0; i < tmpArr.length; i++) if (tmpArr[i]) ret++;
440 assertEquals(ME + " number of entries removed is wrong ", (long)entries2, ret);
441
442 list = this.queue.peek(-1, -1L);
443 assertEquals(ME + " number of entries peeked after removal is wrong ", 0, list.size());
444
445 long num = this.queue.getNumOfEntries();
446 assertEquals(ME + " number of entries after removal is wrong ", 0L, num);
447
448 cacheQueue.storageAvailable(I_StorageProblemListener.UNAVAILABLE);
449 list = this.queue.peek(-1, -1L);
450 assertEquals(ME + " number of entries peeked after reconnecting is wrong ", 0, list.size());
451
452 num = this.queue.getNumOfEntries();
453 assertEquals(ME + " number of entries after reconnecting is wrong ", 0L, num);
454
455 /*
456 for (int i=entries2; i < numOfEntries; i++) {