1 package org.xmlBlaster.test.classtest.queue;
2
3 import java.util.logging.Logger;
4 import java.util.logging.Level;
5 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
6 import org.xmlBlaster.util.Global;
7 import org.xmlBlaster.util.MsgUnit;
8 import org.xmlBlaster.util.XmlBlasterException;
9 import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;
10 import org.xmlBlaster.util.queue.BlockingQueueWrapper;
11 import org.xmlBlaster.util.queue.I_Entry;
12 import org.xmlBlaster.util.queue.I_StorageSizeListener;
13 import org.xmlBlaster.util.queue.I_Storage;
14 import org.xmlBlaster.util.queue.StorageId;
15 import org.xmlBlaster.util.queue.I_Queue;
16 import org.xmlBlaster.util.queue.I_QueueEntry;
17 import org.xmlBlaster.util.def.PriorityEnum;
18 import org.xmlBlaster.util.def.Constants;
19 import org.xmlBlaster.util.def.ErrorCode;
20 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
21 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
22 import org.xmlBlaster.util.plugin.PluginInfo;
23
24 import org.xmlBlaster.util.queuemsg.DummyEntry;
25
26 import java.util.ArrayList;
27
28 import junit.framework.*;
29 import org.xmlBlaster.util.queue.QueuePluginManager;
30
31 /**
32 * Test RamQueuePlugin.
33 * <p>
34 * The sorting order is priority,timestamp:
35 * </p>
36 * <pre>
37 * -> 5,100 - 5,98 - 5,50 - 9,3000 - 9,2500 ->
38 * </pre>
39 * <p>
40 * As 9 is highest priority it is the first to be taken out.<br />
41 * As we need to maintain the timely sequence and
42 * id is a timestamp in (more or less) nano seconds elapsed since 1970)
43 * the id 2500 (it is older) has precedence to the id 3000
44 * </p>
45 * <p>
46 * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.I_QueueTest
47 * </p>
48 * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry)
49 * @see org.xmlBlaster.util.queue.I_Queue
50 * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin
51 * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
52 */
53 public class I_QueueTest extends TestCase {
54
55
56 class QueueSizeListener implements I_StorageSizeListener {
57
58 private long lastNumEntries = 0L,
59 lastNumBytes = 0L,
60 lastIncrementEntries = 0L,
61 lastIncrementBytes = 0L;
62 private int count = 0;
63
64 public long getLastIncrementEntries() {
65 return this.lastIncrementEntries;
66 }
67
68 public long getLastIncrementBytes() {
69 return this.lastIncrementBytes;
70 }
71
72 public int getCount() {
73 return this.count;
74 }
75
76 public void clear() {
77 this.lastNumEntries = 0L;
78 this.lastNumBytes = 0L;
79 this.lastIncrementEntries = 0L;
80 this.lastIncrementBytes = 0L;
81 this.count = 0;
82 }
83
84 public void changed(I_Storage storage, long numEntries, long numBytes, boolean isShutdown) {
85 this.lastIncrementEntries = numEntries - this.lastNumEntries;
86 this.lastIncrementBytes = numBytes - this.lastNumBytes;
87 this.lastNumEntries = numEntries;
88 this.lastNumBytes = numBytes;
89 this.count++;
90 }
91
92 }
93
94
95 private String ME = "I_QueueTest";
96 protected Global glob;
97 private static Logger log = Logger.getLogger(I_QueueTest.class.getName());
98
99 private I_Queue queue;
100 private QueueSizeListener queueSizeListener = new QueueSizeListener();
101
102 static String[] PLUGIN_TYPES = {
103 new String("RAM"),
104 new String("JDBC"),
105 new String("CACHE")
106 };
107
108 /*
109 static I_Queue[] IMPL = {
110 new org.xmlBlaster.util.queue.ram.RamQueuePlugin(),
111 new org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin(),
112 new org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin()
113 };
114 */
115
116 public class QueuePutter extends Thread {
117
118 private I_Queue queue;
119 private long delay;
120 private int numOfEntries;
121 private boolean ignoreInterceptor;
122
123 public QueuePutter(I_Queue queue, long delay, int numOfEntries, boolean ignoreInterceptor) {
124 this.queue = queue;
125 this.delay = delay;
126 this.numOfEntries = numOfEntries;
127 this.ignoreInterceptor = ignoreInterceptor;
128 }
129
130 public void run() {
131 try {
132 for (int i=0; i < this.numOfEntries; i++) {
133 Thread.sleep(this.delay);
134 DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
135 this.queue.put(queueEntry, this.ignoreInterceptor);
136 }
137 }
138 catch (Exception ex) {
139 ex.printStackTrace();
140 }
141 }
142
143 }
144
145
146 public I_QueueTest(String name, int currImpl, Global glob) {
147 super(name);
148 // this.queue = IMPL[currImpl];
149 //this.ME = "I_QueueTest[" + this.queue.getClass().getName() + "]";
150
151 if (glob == null) this.glob = Global.instance();
152 else this.glob = glob;
153
154
155 try {
156 String type = PLUGIN_TYPES[currImpl];
157 this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
158 QueuePluginManager pluginManager = new QueuePluginManager(glob);
159
160 PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
161 java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
162 prop.put("tableNamePrefix", "TEST");
163 prop.put("entriesTableName", "_entries");
164 this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());
165
166 pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
167 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId");
168 this.queue = pluginManager.getPlugin(pluginInfo, queueId, new CbQueueProperty(this.glob, Constants.RELATING_CALLBACK, this.glob.getStrippedId()));
169 this.queue.shutdown(); // to allow to initialize again
170 }
171 catch (Exception ex) {
172 log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
173 }
174 }
175
176 protected void setUp() {
177 // cleaning up the database from previous runs ...
178 QueuePropertyBase prop = null;
179 try {
180 prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
181 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId");
182 queue.initialize(queueId, prop);
183 queue.clear();
184 queue.shutdown();
185 }
186 catch (Exception ex) {
187 log.severe("could not propertly set up the database: " + ex.getMessage());
188 }
189 }
190
191 /**
192 * Tests QueuePropertyBase() and getStorageId()
193 * @param queueTypeList A space separated list of names for the
194 * implementations to be tested. Valid names are:
195 * RamQueuePlugin JdbcQueuePlugin
196 */
197 public void testConfig() {
198 config(this.queue);
199 }
200
201 /**
202 * Tests initialize(), getProperties(), setProperties() and capacity()
203 * @param queue !!!Is not initialized in this case!!!!
204 */
205 private void config(I_Queue queue) {
206 ME = "I_QueueTest.config(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
207 System.out.println("***" + ME);
208
209 QueuePropertyBase prop1 = null;
210 QueuePropertyBase prop = null;
211 try {
212 // test initialize()
213 prop1 = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
214 int max = 12;
215 prop1.setMaxEntries(max);
216 prop1.setMaxEntriesCache(max);
217 assertEquals(ME+": Wrong capacity", max, prop1.getMaxEntries());
218 assertEquals(ME+": Wrong cache capacity", max, prop1.getMaxEntriesCache());
219 //PluginInfo pluginInfo = new PluginInfo(glob, null, "");
220 //queue.init(glob, pluginInfo); // Init from pluginloader is first
221 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SomeQueueId");
222 queue.initialize(queueId, prop1);
223 assertEquals(ME+": Wrong queue ID", queueId, queue.getStorageId());
224
225 try {
226 prop = new CbQueueProperty(glob, Constants.RELATING_SUBJECT, "/node/test");
227 prop.setMaxEntries(99);
228 prop.setMaxEntriesCache(99);
229 queue.setProperties(prop);
230 }
231 catch(XmlBlasterException e) {
232 fail("Changing properties failed");
233 }
234
235 }
236 catch(XmlBlasterException e) {
237 fail(ME + ": Exception thrown: " + e.getMessage());
238 }
239
240 long len = prop.getMaxEntries();
241 assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), queue.getMaxNumOfEntries());
242 assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), ((QueuePropertyBase)queue.getProperties()).getMaxEntries());
243 assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries());
244
245 try {
246 for (int ii=0; ii<len; ii++) {
247 queue.put(new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true), false);
248 }
249 assertEquals(ME+": Wrong total size", len, queue.getNumOfEntries());
250
251 try {
252 DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
253 queue.put(queueEntry, false);
254 queue.put(queueEntry, false);
255 fail("Did expect an exception on overflow");
256 }
257 catch(XmlBlasterException e) {
258 log.info("SUCCESS the exception is OK: " + e.getMessage());
259 }
260
261 log.info("toXml() test:" + queue.toXml(""));
262 log.info("usage() test:" + queue.usage());
263
264 assertEquals(ME+": should not be shutdown", false, queue.isShutdown());
265 queue.shutdown();
266 assertEquals(ME+": should be shutdown", true, queue.isShutdown());
267
268 log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
269 System.out.println("***" + ME + " [SUCCESS]");
270 queue.shutdown();
271 queue = null;
272 }
273 catch(XmlBlasterException e) {
274 fail(ME + ": Exception thrown: " + e.getMessage());
275 }
276 }
277
278 //------------------------------------
279 public void testSize1() {
280 String queueType = "unknown";
281 try {
282 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
283 int max = 1;
284 prop.setMaxEntries(max);
285 prop.setMaxEntriesCache(max);
286 queueType = this.queue.toString();
287 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/size1");
288 this.queue.initialize(queueId, prop);
289 queue.clear();
290 assertEquals(ME + "wrong size before starting ", 0L, queue.getNumOfEntries());
291 assertEquals(ME, 1L, queue.getMaxNumOfEntries());
292 size1(this.queue);
293 }
294 catch (XmlBlasterException ex) {
295 fail("Exception when testing Size1 probably due to failed initialization of the queue of type " + queueType);
296 }
297 }
298
299 /**
300 * Tests put(MsgQueueEntry[]) and put(MsgQueueEntry) and clear()
301 */
302 private void size1(I_Queue queue) {
303 this.queue = queue;
304 ME = "I_QueueTest.size1(" + queue.getStorageId() + ")[" + this.queue.getClass().getName() + "]";
305 System.out.println("***" + ME);
306 int maxEntries = (int)queue.getMaxNumOfEntries();
307 try {
308 //========== Test 1: put(I_QueueEntry[])
309 int numLoop = 10;
310 ArrayList list = new ArrayList();
311
312 //========== Test 2: put(I_QueueEntry)
313 this.queue.removeStorageSizeListener(null);
314 this.queue.addStorageSizeListener(this.queueSizeListener);
315 this.queueSizeListener.clear();
316
317 for (int ii=0; ii<numLoop; ii++) {
318 DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
319 try {
320 queue.put(queueEntry, false);
321 assertEquals("number of entries incremented on last invocation", 1, this.queueSizeListener.getLastIncrementEntries());
322 assertEquals("number of bytes incremented on last invocation", queueEntry.getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
323
324 if (ii > maxEntries) { // queue allows on overload
325 fail("Didn't expect more than " + maxEntries + " entries" + queue.toXml(""));
326 }
327 else
328 list.add(queueEntry);
329 }
330 catch (XmlBlasterException e) {
331 if (ii <= maxEntries) {
332 fail("Didn't expect exception" + e.getMessage());
333 }
334 }
335 }
336 assertEquals("number of invocations for queue size listener is wrong", maxEntries+1, this.queueSizeListener.getCount());
337
338 // The queues allow temporary oversize (one extra put())
339 assertEquals(ME+": Wrong total size " + queue.toXml(""), maxEntries+1, queue.getNumOfEntries());
340 this.checkSizeAndEntries(" put(I_QueueEntry) ", list, queue);
341 log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
342
343 ArrayList entryList = null;
344 try {
345 entryList = queue.peekLowest(1, -1L, null, false);
346 assertEquals("PEEK #1 failed"+queue.toXml(""), 1, entryList.size());
347 log.info("curr entries="+queue.getNumOfEntries());
348 }
349 catch (XmlBlasterException e) {
350 if (e.getErrorCode()!=ErrorCode.INTERNAL_NOTIMPLEMENTED) throw e;
351 }
352
353
354 //this.queue.removeStorageSizeListener(null);
355 //this.queue.addStorageSizeListener(this.queueSizeListener);
356 //this.queueSizeListener.clear();
357
358 entryList = queue.takeLowest(1, -1L, null, false);
359 long singleSize = ((I_QueueEntry)entryList.get(0)).getSizeInBytes();
360 assertEquals("TAKE #1 failed"+queue.toXml(""), 1, entryList.size());
361 log.info("curr entries="+queue.getNumOfEntries());
362 assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries());
363 assertEquals("number of bytes incremented on last invocation", -singleSize, this.queueSizeListener.getLastIncrementBytes());
364
365 entryList = queue.takeLowest(1, -1L, null, false);
366 assertEquals("TAKE #2 failed"+queue.toXml(""), 1, entryList.size());
367 assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries());
368 assertEquals("number of bytes incremented on last invocation", -singleSize, this.queueSizeListener.getLastIncrementBytes());
369
370 queue.clear();
371 assertEquals(ME+": Wrong empty size", 0L, queue.getNumOfEntries());
372
373 System.out.println("***" + ME + " [SUCCESS]");
374 queue.shutdown();
375 queue = null;
376
377 }
378 catch(XmlBlasterException e) {
379 fail(ME + ": Exception thrown: " + e.getMessage());
380 }
381 log.info("SUCCESS");
382 }
383
384
385 //------------------------------------
386 public void testPutMsg() {
387 String queueType = "unknown";
388 try {
389 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
390 queueType = this.queue.toString();
391 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/putMsg");
392 this.queue.initialize(queueId, prop);
393 queue.clear();
394 assertEquals(ME + "wrong size before starting ", 0L, queue.getNumOfEntries());
395 putMsg(this.queue);
396 }
397 catch (XmlBlasterException ex) {
398 fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
399 }
400 }
401
402
403 /**
404 * @see checkSizeAndEntries(String, I_QueueEntry[], I_Queue)
405 */
406 private void checkSizeAndEntries(String txt, ArrayList queueEntries, I_Queue queue) {
407 checkSizeAndEntries(txt, (I_QueueEntry[])queueEntries.toArray(new I_QueueEntry[queueEntries.size()]), queue);
408 }
409
410
411 /**
412 * Helper method to do a generic size check (size and number of entries)
413 */
414 private void checkSizeAndEntries(String txt, I_QueueEntry[] queueEntries, I_Queue queue) {
415 long sizeOfTransients = 0L;
416 long numOfPersistents = 0;
417 long numOfTransients = 0;
418 long sizeOfPersistents = 0L;
419
420 for (int i=0; i < queueEntries.length; i++) {
421 I_QueueEntry entry = queueEntries[i];
422 if (entry.isPersistent()) {
423 sizeOfPersistents += entry.getSizeInBytes();
424 numOfPersistents++;
425 }
426 else {
427 sizeOfTransients += entry.getSizeInBytes();
428 numOfTransients++;
429 }
430 }
431
432 long queueNumOfPersistents = queue.getNumOfPersistentEntries();
433 long queueNumOfTransients = queue.getNumOfEntries() - queueNumOfPersistents;
434 long queueSizeOfPersistents = queue.getNumOfPersistentBytes();
435 long queueSizeOfTransients = queue.getNumOfBytes() - queueSizeOfPersistents;
436
437 txt += " NumPersistents=" + queueNumOfPersistents + " NumOfTransients=" + queueNumOfTransients;
438 txt += " SizeOfPersistents=" + queueSizeOfPersistents + " SizeOfTransients=" + queueSizeOfTransients;
439
440 assertEquals(ME + ": " + txt + " wrong number of persistents ", numOfPersistents, queueNumOfPersistents);
441 assertEquals(ME + ": " + txt + " wrong number of transients ", numOfTransients, queueNumOfTransients);
442 assertEquals(ME + ": " + txt + " wrong size of persistents ", sizeOfPersistents, queueSizeOfPersistents);
443 assertEquals(ME + ": " + txt + " wrong size of transients ", sizeOfTransients, queueSizeOfTransients);
444 }
445
446
447
448 /**
449 * Tests put(MsgQueueEntry[]) and put(MsgQueueEntry) and clear()
450 */
451 private void putMsg(I_Queue queue) {
452 ME = "I_QueueTest.putMsg(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
453 System.out.println("***" + ME);
454 try {
455 //========== Test 1: put(I_QueueEntry[])
456 int numLoop = 10;
457 ArrayList list = new ArrayList();
458
459 this.queue.removeStorageSizeListener(null);
460 this.queue.addStorageSizeListener(this.queueSizeListener);
461 this.queueSizeListener.clear();
462
463 for (int ii=0; ii<numLoop; ii++) {
464 DummyEntry[] queueEntries = {
465 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
466 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
467 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)};
468
469 queue.put(queueEntries, false);
470
471 assertEquals("number of entries incremented on last invocation", 3, this.queueSizeListener.getLastIncrementEntries());
472 assertEquals("number of bytes incremented on last invocation", 3*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
473 for (int i=0; i < 3; i++) list.add(queueEntries[i]);
474
475 this.checkSizeAndEntries(" put(I_QueueEntry[]) ", list, queue);
476 assertEquals(ME+": Wrong size", (ii+1)*queueEntries.length, queue.getNumOfEntries());
477 }
478 assertEquals("number of invocations for queue size listener is wrong", numLoop, this.queueSizeListener.getCount());
479
480 int total = numLoop*3;
481 assertEquals(ME+": Wrong total size", total, queue.getNumOfEntries());
482 log.info("#1 Success, filled " + queue.getNumOfEntries() + " messages into queue");
483
484
485 //========== Test 2: put(I_QueueEntry)
486 for (int ii=0; ii<numLoop; ii++) {
487 DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
488 list.add(queueEntry);
489 queue.put(queueEntry, false);
490 }
491 assertEquals(ME+": Wrong total size", numLoop+total, queue.getNumOfEntries());
492 this.checkSizeAndEntries(" put(I_QueueEntry) ", list, queue);
493 log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
494
495 queue.clear();
496 assertEquals(ME+": Wrong empty size", 0L, queue.getNumOfEntries());
497
498 System.out.println("***" + ME + " [SUCCESS]");
499 queue.shutdown();
500 queue = null;
501
502 }
503 catch(XmlBlasterException e) {
504 fail(ME + ": Exception thrown: " + e.getMessage());
505 }
506 }
507
508
509 // ------------------------------------
510 public void testPeekMsg() {
511
512 String queueType = "unknown";
513 try {
514 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
515 queueType = this.queue.toString();
516 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/peekMsg");
517 this.queue.initialize(queueId, prop);
518 queue.clear();
519 assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
520 peekMsg(this.queue);
521 }
522 catch (XmlBlasterException ex) {
523 log.severe("Exception when testing peekMsg probably due to failed initialization of the queue " + queueType);
524 }
525
526 }
527
528
529 // ------------------------------------
530 public void testPeekMsgBlocking() {
531
532 String queueType = "unknown";
533 try {
534 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test"