1 package org.xmlBlaster.test.classtest.queue;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.logging.Level;
6 import java.util.logging.Logger;
7
8 import junit.framework.Test;
9 import junit.framework.TestCase;
10 import junit.framework.TestSuite;
11
12 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
13 import org.xmlBlaster.util.Global;
14 import org.xmlBlaster.util.MsgUnit;
15 import org.xmlBlaster.util.XmlBlasterException;
16 import org.xmlBlaster.util.def.Constants;
17 import org.xmlBlaster.util.def.ErrorCode;
18 import org.xmlBlaster.util.def.PriorityEnum;
19 import org.xmlBlaster.util.plugin.PluginInfo;
20 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
21 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
22 import org.xmlBlaster.util.queue.BlockingQueueWrapper;
23 import org.xmlBlaster.util.queue.I_Entry;
24 import org.xmlBlaster.util.queue.I_Queue;
25 import org.xmlBlaster.util.queue.I_QueueEntry;
26 import org.xmlBlaster.util.queue.I_Storage;
27 import org.xmlBlaster.util.queue.I_StorageSizeListener;
28 import org.xmlBlaster.util.queue.QueuePluginManager;
29 import org.xmlBlaster.util.queue.StorageId;
30 import org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin;
31 import org.xmlBlaster.util.queuemsg.DummyEntry;
32
33 /**
34 * Test RamQueuePlugin.
35 * <p>
36 * The sorting order is priority,timestamp:
37 * </p>
38 * <pre>
39 * -> 5,100 - 5,98 - 5,50 - 9,3000 - 9,2500 ->
40 * </pre>
41 * <p>
42 * As 9 is highest priority it is the first to be taken out.<br />
43 * As we need to maintain the timely sequence and
44 * id is a timestamp in (more or less) nano seconds elapsed since 1970)
45 * the id 2500 (it is older) has precedence to the id 3000
46 * </p>
47 * <p>
48 * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.queue.I_QueueTest
49 * </p>
50 * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry)
51 * @see org.xmlBlaster.util.queue.I_Queue
52 * @see org.xmlBlaster.util.queue.ram.RamQueuePlugin
53 * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
54 */
55 public class I_QueueTest extends TestCase {
56
57
58 class QueueSizeListener implements I_StorageSizeListener {
59
60 private long lastNumEntries = 0L,
61 lastNumBytes = 0L,
62 lastIncrementEntries = 0L,
63 lastIncrementBytes = 0L;
64 private int count = 0;
65
66 public long getLastIncrementEntries() {
67 return this.lastIncrementEntries;
68 }
69
70 public long getLastIncrementBytes() {
71 return this.lastIncrementBytes;
72 }
73
74 public int getCount() {
75 return this.count;
76 }
77
78 public void clear() {
79 this.lastNumEntries = 0L;
80 this.lastNumBytes = 0L;
81 this.lastIncrementEntries = 0L;
82 this.lastIncrementBytes = 0L;
83 this.count = 0;
84 }
85
86 public void changed(I_Storage storage, long numEntries, long numBytes, boolean isShutdown) {
87 this.lastIncrementEntries = numEntries - this.lastNumEntries;
88 this.lastIncrementBytes = numBytes - this.lastNumBytes;
89 this.lastNumEntries = numEntries;
90 this.lastNumBytes = numBytes;
91 this.count++;
92 }
93
94 }
95
96
97 private String ME = "I_QueueTest";
98 protected Global glob;
99 private static Logger log = Logger.getLogger(I_QueueTest.class.getName());
100
101 private I_Queue queue;
102 private QueueSizeListener queueSizeListener = new QueueSizeListener();
103
104 static String[] PLUGIN_TYPES = {
105 new String("RAM"),
106 new String("JDBC"),
107 new String("CACHE")
108 };
109
110 /*
111 static I_Queue[] IMPL = {
112 new org.xmlBlaster.util.queue.ram.RamQueuePlugin(),
113 new org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin(),
114 new org.xmlBlaster.util.queue.cache.CacheQueueInterceptorPlugin()
115 };
116 */
117
118 public class QueuePutter extends Thread {
119
120 private I_Queue queue;
121 private long delay;
122 private int numOfEntries;
123 private boolean ignoreInterceptor;
124
125 public QueuePutter(I_Queue queue, long delay, int numOfEntries, boolean ignoreInterceptor) {
126 this.queue = queue;
127 this.delay = delay;
128 this.numOfEntries = numOfEntries;
129 this.ignoreInterceptor = ignoreInterceptor;
130 }
131
132 public void run() {
133 try {
134 for (int i=0; i < this.numOfEntries; i++) {
135 Thread.sleep(this.delay);
136 DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
137 this.queue.put(queueEntry, this.ignoreInterceptor);
138 }
139 }
140 catch (Exception ex) {
141 ex.printStackTrace();
142 }
143 }
144
145 }
146
147
148 public I_QueueTest(String name, int currImpl, Global glob) {
149 super(name);
150 // this.queue = IMPL[currImpl];
151 //this.ME = "I_QueueTest[" + this.queue.getClass().getName() + "]";
152
153 if (glob == null) this.glob = Global.instance();
154 else this.glob = glob;
155
156
157 try {
158 String type = PLUGIN_TYPES[currImpl];
159 this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
160 QueuePluginManager pluginManager = new QueuePluginManager(glob);
161
162 PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
163 java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
164 prop.put("tableNamePrefix", "TEST");
165 prop.put("entriesTableName", "_entries");
166 this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());
167
168 pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
169 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "SomeQueueId");
170 this.queue = pluginManager.getPlugin(pluginInfo, queueId, new CbQueueProperty(this.glob, Constants.RELATING_CALLBACK, this.glob.getStrippedId()));
171 this.queue.shutdown(); // to allow to initialize again
172 }
173 catch (Exception ex) {
174 log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
175 }
176 }
177
178 protected void setUp() {
179 // cleaning up the database from previous runs ...
180 QueuePropertyBase prop = null;
181 try {
182 prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
183 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "SomeQueueId");
184 queue.initialize(queueId, prop);
185 queue.clear();
186 queue.shutdown();
187 }
188 catch (Exception ex) {
189 log.severe("could not propertly set up the database: " + ex.getMessage());
190 }
191 }
192
193 /**
194 * Tests QueuePropertyBase() and getStorageId()
195 * @param queueTypeList A space separated list of names for the
196 * implementations to be tested. Valid names are:
197 * RamQueuePlugin JdbcQueuePlugin
198 */
199 public void testConfig() {
200 config(this.queue);
201 }
202
203 /**
204 * Tests initialize(), getProperties(), setProperties() and capacity()
205 * @param queue !!!Is not initialized in this case!!!!
206 */
207 private void config(I_Queue queue) {
208 ME = "I_QueueTest.config(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
209 System.out.println("***" + ME);
210
211 QueuePropertyBase prop1 = null;
212 QueuePropertyBase prop = null;
213 try {
214 // test initialize()
215 prop1 = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
216 int max = 12;
217 prop1.setMaxEntries(max);
218 prop1.setMaxEntriesCache(max);
219 assertEquals(ME+": Wrong capacity", max, prop1.getMaxEntries());
220 assertEquals(ME+": Wrong cache capacity", max, prop1.getMaxEntriesCache());
221 //PluginInfo pluginInfo = new PluginInfo(glob, null, "");
222 //queue.init(glob, pluginInfo); // Init from pluginloader is first
223 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "SomeQueueId");
224 queue.initialize(queueId, prop1);
225 queue.clear(); // this is needed since the tearDown has cleaned the queue with previous cfg (other StorageId)
226 assertEquals(ME+": Wrong queue ID", queueId, queue.getStorageId());
227
228 try {
229 prop = new CbQueueProperty(glob, Constants.RELATING_SUBJECT, "/node/test");
230 prop.setMaxEntries(99);
231 prop.setMaxEntriesCache(99);
232 queue.setProperties(prop);
233 }
234 catch(XmlBlasterException e) {
235 fail("Changing properties failed");
236 }
237
238 }
239 catch(XmlBlasterException e) {
240 fail(ME + ": Exception thrown: " + e.getMessage());
241 }
242
243 long len = prop.getMaxEntries();
244 assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), queue.getMaxNumOfEntries());
245 assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), ((QueuePropertyBase)queue.getProperties()).getMaxEntries());
246 assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries());
247
248 try {
249 for (int ii=0; ii<len; ii++) {
250 queue.put(new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true), false);
251 }
252 assertEquals(ME+": Wrong total size", len, queue.getNumOfEntries());
253
254 try {
255 DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
256 queue.put(queueEntry, false);
257 queue.put(queueEntry, false);
258 fail("Did expect an exception on overflow");
259 }
260 catch(XmlBlasterException e) {
261 log.info("SUCCESS the exception is OK: " + e.getMessage());
262 }
263
264 log.info("toXml() test:" + queue.toXml(""));
265 log.info("usage() test:" + queue.usage());
266
267 assertEquals(ME+": should not be shutdown", false, queue.isShutdown());
268 queue.shutdown();
269 assertEquals(ME+": should be shutdown", true, queue.isShutdown());
270
271 log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
272 System.out.println("***" + ME + " [SUCCESS]");
273 queue.shutdown();
274 queue = null;
275 }
276 catch(XmlBlasterException e) {
277 fail(ME + ": Exception thrown: " + e.getMessage());
278 }
279 }
280
281 //------------------------------------
282 public void testSize1() {
283 String queueType = "unknown";
284 try {
285 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
286 int max = 1;
287 prop.setMaxEntries(max);
288 prop.setMaxEntriesCache(max);
289 queueType = this.queue.toString();
290 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/size1");
291 this.queue.initialize(queueId, prop);
292 queue.clear();
293 assertEquals(ME + "wrong size before starting ", 0L, queue.getNumOfEntries());
294 assertEquals(ME, 1L, queue.getMaxNumOfEntries());
295 size1(this.queue);
296 }
297 catch (XmlBlasterException ex) {
298 fail("Exception when testing Size1 probably due to failed initialization of the queue of type " + queueType);
299 }
300 }
301
302 /**
303 * Tests put(MsgQueueEntry[]) and put(MsgQueueEntry) and clear()
304 */
305 private void size1(I_Queue queue) {
306 this.queue = queue;
307 ME = "I_QueueTest.size1(" + queue.getStorageId() + ")[" + this.queue.getClass().getName() + "]";
308 System.out.println("***" + ME);
309 int maxEntries = (int)queue.getMaxNumOfEntries();
310 try {
311 //========== Test 1: put(I_QueueEntry[])
312 int numLoop = 10;
313 List<I_Entry> list = new ArrayList<I_Entry>();
314
315 //========== Test 2: put(I_QueueEntry)
316 this.queue.removeStorageSizeListener(null);
317 this.queue.addStorageSizeListener(this.queueSizeListener);
318 this.queueSizeListener.clear();
319
320 for (int ii=0; ii<numLoop; ii++) {
321 DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
322 try {
323 queue.put(queueEntry, false);
324 assertEquals("number of entries incremented on last invocation", 1, this.queueSizeListener.getLastIncrementEntries());
325 assertEquals("number of bytes incremented on last invocation", queueEntry.getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
326
327 if (ii > maxEntries) { // queue allows on overload
328 fail("Didn't expect more than " + maxEntries + " entries" + queue.toXml(""));
329 }
330 else
331 list.add(queueEntry);
332 }
333 catch (XmlBlasterException e) {
334 if (ii <= maxEntries) {
335 fail("Didn't expect exception" + e.getMessage());
336 }
337 }
338 }
339 assertEquals("number of invocations for queue size listener is wrong", maxEntries+1, this.queueSizeListener.getCount());
340
341 // The queues allow temporary oversize (one extra put())
342 assertEquals(ME+": Wrong total size " + queue.toXml(""), maxEntries+1, queue.getNumOfEntries());
343 this.checkSizeAndEntries(" put(I_QueueEntry) ", list, queue);
344 log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
345
346 List<I_Entry> entryList = null;
347 try {
348 entryList = queue.peekLowest(1, -1L, null, false);
349 assertEquals("PEEK #1 failed"+queue.toXml(""), 1, entryList.size());
350 log.info("curr entries="+queue.getNumOfEntries());
351 }
352 catch (XmlBlasterException e) {
353 if (e.getErrorCode()!=ErrorCode.INTERNAL_NOTIMPLEMENTED) throw e;
354 }
355
356
357 //this.queue.removeStorageSizeListener(null);
358 //this.queue.addStorageSizeListener(this.queueSizeListener);
359 //this.queueSizeListener.clear();
360
361 entryList = queue.takeLowest(1, -1L, null, false);
362 long singleSize = ((I_QueueEntry)entryList.get(0)).getSizeInBytes();
363 assertEquals("TAKE #1 failed"+queue.toXml(""), 1, entryList.size());
364 log.info("curr entries="+queue.getNumOfEntries());
365 assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries());
366 assertEquals("number of bytes incremented on last invocation", -singleSize, this.queueSizeListener.getLastIncrementBytes());
367
368 entryList = queue.takeLowest(1, -1L, null, false);
369 assertEquals("TAKE #2 failed"+queue.toXml(""), 1, entryList.size());
370 assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries());
371 assertEquals("number of bytes incremented on last invocation", -singleSize, this.queueSizeListener.getLastIncrementBytes());
372
373 queue.clear();
374 assertEquals(ME+": Wrong empty size", 0L, queue.getNumOfEntries());
375
376 System.out.println("***" + ME + " [SUCCESS]");
377 queue.shutdown();
378 queue = null;
379
380 }
381 catch(XmlBlasterException e) {
382 fail(ME + ": Exception thrown: " + e.getMessage());
383 }
384 log.info("SUCCESS");
385 }
386
387
388 //------------------------------------
389 public void testPutMsg() {
390 String queueType = "unknown";
391 try {
392 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
393 queueType = this.queue.toString();
394 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/putMsg");
395 this.queue.initialize(queueId, prop);
396 queue.clear();
397 assertEquals(ME + "wrong size before starting ", 0L, queue.getNumOfEntries());
398 putMsg(this.queue);
399 }
400 catch (XmlBlasterException ex) {
401 fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType);
402 }
403 }
404
405
406 /**
407 * @see checkSizeAndEntries(String, I_QueueEntry[], I_Queue)
408 */
409 private void checkSizeAndEntries(String txt, List<I_Entry> queueEntries, I_Queue queue) {
410 checkSizeAndEntries(txt, (I_QueueEntry[])queueEntries.toArray(new I_QueueEntry[queueEntries.size()]), queue);
411 }
412
413
414 /**
415 * Helper method to do a generic size check (size and number of entries)
416 */
417 private void checkSizeAndEntries(String txt, I_QueueEntry[] queueEntries, I_Queue queue) {
418 long sizeOfTransients = 0L;
419 long numOfPersistents = 0;
420 long numOfTransients = 0;
421 long sizeOfPersistents = 0L;
422
423 for (int i=0; i < queueEntries.length; i++) {
424 I_QueueEntry entry = queueEntries[i];
425 if (entry.isPersistent()) {
426 sizeOfPersistents += entry.getSizeInBytes();
427 numOfPersistents++;
428 }
429 else {
430 sizeOfTransients += entry.getSizeInBytes();
431 numOfTransients++;
432 }
433 }
434
435 long queueNumOfPersistents = queue.getNumOfPersistentEntries();
436 long queueNumOfTransients = queue.getNumOfEntries() - queueNumOfPersistents;
437 long queueSizeOfPersistents = queue.getNumOfPersistentBytes();
438 long queueSizeOfTransients = queue.getNumOfBytes() - queueSizeOfPersistents;
439
440 txt += " NumPersistents=" + queueNumOfPersistents + " NumOfTransients=" + queueNumOfTransients;
441 txt += " SizeOfPersistents=" + queueSizeOfPersistents + " SizeOfTransients=" + queueSizeOfTransients;
442
443 assertEquals(ME + ": " + txt + " wrong number of persistents ", numOfPersistents, queueNumOfPersistents);
444 assertEquals(ME + ": " + txt + " wrong number of transients ", numOfTransients, queueNumOfTransients);
445 assertEquals(ME + ": " + txt + " wrong size of persistents ", sizeOfPersistents, queueSizeOfPersistents);
446 assertEquals(ME + ": " + txt + " wrong size of transients ", sizeOfTransients, queueSizeOfTransients);
447 }
448
449
450
451 /**
452 * Tests put(MsgQueueEntry[]) and put(MsgQueueEntry) and clear()
453 */
454 private void putMsg(I_Queue queue) {
455 ME = "I_QueueTest.putMsg(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
456 System.out.println("***" + ME);
457 try {
458 //========== Test 1: put(I_QueueEntry[])
459 int numLoop = 10;
460 List<I_Entry> list = new ArrayList<I_Entry>();
461
462 this.queue.removeStorageSizeListener(null);
463 this.queue.addStorageSizeListener(this.queueSizeListener);
464 this.queueSizeListener.clear();
465
466 for (int ii=0; ii<numLoop; ii++) {
467 DummyEntry[] queueEntries = {
468 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
469 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
470 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)};
471
472 queue.put(queueEntries, false);
473
474 assertEquals("number of entries incremented on last invocation", 3, this.queueSizeListener.getLastIncrementEntries());
475 assertEquals("number of bytes incremented on last invocation", 3*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
476 for (int i=0; i < 3; i++) list.add(queueEntries[i]);
477
478 this.checkSizeAndEntries(" put(I_QueueEntry[]) ", list, queue);
479 assertEquals(ME+": Wrong size", (ii+1)*queueEntries.length, queue.getNumOfEntries());
480 }
481 assertEquals("number of invocations for queue size listener is wrong", numLoop, this.queueSizeListener.getCount());
482
483 int total = numLoop*3;
484 assertEquals(ME+": Wrong total size", total, queue.getNumOfEntries());
485 log.info("#1 Success, filled " + queue.getNumOfEntries() + " messages into queue");
486
487
488 //========== Test 2: put(I_QueueEntry)
489 for (int ii=0; ii<numLoop; ii++) {
490 DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
491 list.add(queueEntry);
492 queue.put(queueEntry, false);
493 }
494 assertEquals(ME+": Wrong total size", numLoop+total, queue.getNumOfEntries());
495 this.checkSizeAndEntries(" put(I_QueueEntry) ", list, queue);
496 log.info("#2 Success, filled " + queue.getNumOfEntries() + " messages into queue");
497
498 queue.clear();
499 assertEquals(ME+": Wrong empty size", 0L, queue.getNumOfEntries());
500
501 System.out.println("***" + ME + " [SUCCESS]");
502 queue.shutdown();
503 queue = null;
504
505 }
506 catch(XmlBlasterException e) {
507 fail(ME + ": Exception thrown: " + e.getMessage());
508 }
509 }
510
511
512 // ------------------------------------
513 public void testPeekMsg() {
514
515 String queueType = "unknown";
516 try {
517 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
518 queueType = this.queue.toString();
519 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/peekMsg");
520 this.queue.initialize(queueId, prop);
521 queue.clear();
522 assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
523 peekMsg(this.queue);
524 }
525 catch (XmlBlasterException ex) {
526 log.severe("Exception when testing peekMsg probably due to failed initialization of the queue " + queueType);
527 }
528
529 }
530
531
532 // ------------------------------------
533 public void testPeekMsgBlocking() {
534
535 String queueType = "unknown";
536 try {
537 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
538 queueType = this.queue.toString();
539 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/peekMsg");
540 this.queue.initialize(queueId, prop);
541 queue.clear();
542 assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
543
544 // fill the queue:
545 DummyEntry[] queueEntries = {
546 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
547 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
548 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)
549 };
550 queue.put(queueEntries, false);
551 BlockingQueueWrapper wrapper = new BlockingQueueWrapper(200L);
552 wrapper.init(queue);
553 int numOfEntries = 2;
554 List<I_Entry> ret = wrapper.blockingPeek(numOfEntries, 1000L);
555 assertEquals("Wrong number of entries found", 2, ret.size());
556 queue.removeNum(2);
557 numOfEntries = 2;
558 ret = wrapper.blockingPeek(numOfEntries, 1000L);
559 assertEquals("Wrong number of entries found", 1, ret.size());
560 queue.clear();
561 ret = wrapper.blockingPeek(numOfEntries, 1000L);
562 assertEquals("Wrong number of entries found", 0, ret.size());
563
564 // and now making asynchronous putting with events
565 numOfEntries = 3;
566 long delay = 500L;
567 boolean inhibitEvents = false;
568 QueuePutter putter = new QueuePutter(this.queue, delay, numOfEntries, inhibitEvents);
569 putter.start();
570 long t0 = System.currentTimeMillis();
571 ret = wrapper.blockingPeek(numOfEntries, 10000L);
572 assertEquals("Wrong number of entries when blocking with events", numOfEntries, ret.size());
573 long delta = System.currentTimeMillis() - t0;
574 log.info("The blocking request with events took '" + delta + "' milliseconds");
575 assertTrue("The method was blocking too long (did probably not wake up correctly", delta < 7000L);
576 queue.clear();
577 // and now making asynchronous putting without events (polling should detect it)
578 numOfEntries = 3;
579 delay = 500L;
580 inhibitEvents = true;
581 putter = new QueuePutter(this.queue, delay, numOfEntries, inhibitEvents);
582 putter.start();
583 t0 = System.currentTimeMillis();
584 ret = wrapper.blockingPeek(numOfEntries, 10000L);
585 assertEquals("Wrong number of entries when blocking with events", numOfEntries, ret.size());
586 delta = System.currentTimeMillis() - t0;
587 log.info("The blocking request without events took '" + delta + "' milliseconds");
588 assertTrue("The method was blocking too long (did probably not wake up correctly", delta < 7000L);
589 queue.clear();
590 }
591 catch (XmlBlasterException ex) {
592 log.severe("Exception when testing peekMsg probably due to failed initialization of the queue " + queueType);
593 }
594
595 }
596
597
598 /**
599 * Tests peek() and peek(int num) and remove()
600 * For a discussion of the sorting order see Javadoc of this class
601 */
602 private void peekMsg(I_Queue queue) {
603 ME = "I_QueueTest.peekMsg(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
604 System.out.println("***" + ME);
605 try {
606 //========== Test 1: peek()
607 {
608 DummyEntry[] queueEntries = {
609 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
610 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
611 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)
612 };
613 queue.put(queueEntries, false);
614 for (int ii=0; ii<10; ii++) {
615 I_QueueEntry result = queue.peek();
616 assertTrue("Missing entry", result != null);
617 assertEquals(ME+": Wrong result", queueEntries[0].getUniqueId(), result.getUniqueId());
618 }
619 queue.remove(); // Remove one
620 for (int ii=0; ii<10; ii++) {
621 I_QueueEntry result = queue.peek();
622 assertTrue("Missing entry", result != null);
623 assertEquals(ME+": Wrong result", queueEntries[1].getUniqueId(), result.getUniqueId());
624 }
625 queue.remove(); // Remove one
626 for (int ii=0; ii<10; ii++) {
627 I_QueueEntry result = queue.peek();
628 assertTrue("Missing entry", result != null);
629 assertEquals(ME+": Wrong result", queueEntries[2].getUniqueId(), result.getUniqueId());
630 }
631 queue.remove(); // Remove one
632 for (int ii=0; ii<10; ii++) {
633 I_QueueEntry result = queue.peek();
634 assertTrue("Unexpected entry", result == null);
635 }
636 log.info("#1 Success, peek()");
637 }
638
639
640 //========== Test 2: peek(num)
641 {
642 DummyEntry[] queueEntries = {
643 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
644 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
645 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
646 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
647 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
648 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
649 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
650 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
651 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
652 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
653 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
654 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true)
655 };
656 queue.put(queueEntries, false);
657
658 for (int ii=-1; ii<100; ii++) {
659 List<I_Entry> results = queue.peek(ii, -1L); // does no remove
660 assertTrue("Missing entry", results != null);
661 int expected = ii;
662 if (ii == -1 || ii >= queueEntries.length)
663 expected = queueEntries.length;
664 assertEquals(ME+": Wrong number of entries returned ii=" + ii, expected, results.size());
665 }
666
667 queue.clear();
668 log.info("#2 Success, peek(int)");
669 }
670
671 //========== Test 3: peekSamePriority(-1)
672 {
673 DummyEntry[] queueEntries = {
674 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
675 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
676 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
677 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
678 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
679 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
680 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
681 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
682 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
683 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
684 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
685 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true)
686 };
687 queue.put(queueEntries, false);
688
689 int[] prios = { 9, 7, 5 };
690 for (int j=0; j<prios.length; j++) {
691 for (int ii=0; ii<10; ii++) {
692 List<I_Entry> results = queue.peekSamePriority(-1, -1L); // does no remove
693 assertTrue("Expected results", results != null);
694 assertEquals(ME+": Wrong number of 9 priorities", 4, results.size());
695 for (int k=0; k<results.size(); ++k)
696 assertEquals(ME+": Wrong priority returned", prios[j], ((I_QueueEntry)results.get(k)).getPriority());
697 }
698 for (int ii=0; ii<4; ii++) {
699 int num = queue.remove();
700 assertEquals(ME+": Expected remove", 1, num);
701 }
702 }
703
704 assertEquals(ME+": Expected empty queue", 0, queue.getNumOfEntries());
705
706 log.info("#3 Success, peekSamePriority()");
707 }
708
709 //========== Test 4: peekWithPriority(-1,7,9)
710 {
711 DummyEntry[] queueEntries = {
712 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
713 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
714 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
715 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
716 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
717 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
718 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
719 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
720 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
721 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
722 new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true),
723 new DummyEntry(glob, PriorityEnum.MIN_PRIORITY, queue.getStorageId(), true),
724 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
725 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true)
726 };
727 queue.put(queueEntries, false);
728
729 for (int ii=0; ii<10; ii++) {
730 List<I_Entry> results = queue.peekWithPriority(-1, -1L, 7, 9); // does no remove
731 assertTrue("Expected results", results != null);
732 assertEquals(ME+": Wrong number of 9 priorities", 8, results.size());
733 for (int k=0; k<results.size(); ++k) {
734 assertEquals(ME+": Wrong priority returned", (k<4)?9L:7L, ((I_QueueEntry)results.get(k)).getPriority());
735 }
736 }
737 queue.clear();
738 assertEquals(ME+": Expected empty queue", 0, queue.getNumOfEntries());
739
740 log.info("#4 Success, peekWithPriority()");
741 }
742
743
744 //========== Test 5: peek(100, 60)
745 {
746 DummyEntry[] queueEntries = {
747 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 80, true),
748 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), 80, true),
749 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), 80, true),
750 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), 80, true),
751 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), 80, true),
752 };
753 queue.put(queueEntries, false);
754
755 try {
756 List<I_Entry> results = queue.peek(100, 60); // does no remove
757 assertNotNull(ME+": the result should not be null");
758 assertEquals(ME+": Expected one entry on peek(100,60)", 1, results.size());
759 }
760 catch (XmlBlasterException e) {
761 e.printStackTrace();
762 assertTrue("An exception should not occur here " + e.getMessage(), false);
763 }
764
765 queue.clear();
766 assertEquals(ME+": Expected empty queue", 0, queue.getNumOfEntries());
767
768 log.info("#5 Success, peek(100, 60)");
769 }
770
771 System.out.println("***" + ME + " [SUCCESS]");
772 queue.shutdown();
773 }
774 catch(XmlBlasterException e) {
775 e.printStackTrace();
776 fail(ME + ": Exception thrown: " + e.getMessage());
777 }
778 }
779
780
781 //-----------------------------------------
782 public void testRemoveWithPriority() {
783 String queueType = "unknown";
784 try {
785 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
786 queueType = this.queue.toString();
787 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/removeWithPriority");
788 this.queue.initialize(queueId, prop);
789 queue.clear();
790 assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
791 removeWithPriority(this.queue);
792 }
793 catch (XmlBlasterException ex) {
794 log.severe("Exception when testing removeWithpriority probably due to failed initialization of the queue " + queueType);
795 }
796 }
797
798
799 /**
800 * Test removeWithPriority(long[])
801 */
802 private void removeWithPriority(I_Queue queue) {
803 ME = "I_QueueTest.removeWithPriority(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
804 System.out.println("***" + ME);
805 try {
806 //========== Test 1: remove prio 7 and 9
807 {
808 DummyEntry[] queueEntries = {
809 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
810 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
811 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
812 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
813 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
814 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
815 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
816 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
817 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
818 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
819 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
820 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
821 new DummyEntry(glob, PriorityEnum.MIN_PRIORITY, queue.getStorageId(), true)
822 };
823 this.queue.removeStorageSizeListener(null);
824 this.queue.addStorageSizeListener(this.queueSizeListener);
825 this.queueSizeListener.clear();
826
827 queue.put(queueEntries, false);
828
829 long numRemoved = queue.removeWithPriority(-1, -1L, 7, 9);
830
831 assertEquals("number of invocations", 2, this.queueSizeListener.getCount());
832 assertEquals("number of entries incremented on last invocation", -8, this.queueSizeListener.getLastIncrementEntries());
833 assertEquals("number of bytes incremented on last invocation", -8*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
834
835 assertEquals(ME+": Wrong number removed", 8, numRemoved);
836 assertEquals(ME+": Wrong size", queueEntries.length-8, queue.getNumOfEntries());
837
838 numRemoved = queue.removeWithPriority(-1, -1L, 27, 99);
839 long sizeInBytes = (queueEntries.length - 8) * queueEntries[0].getSizeInBytes();
840 assertEquals(ME+": Wrong size in bytes ", sizeInBytes, queue.getNumOfBytes());
841 assertEquals(ME+": Wrong number removed", 0, numRemoved);
842 assertEquals(ME+": Wrong number of entries ", queueEntries.length-8, queue.getNumOfEntries());
843
844 queue.clear();
845
846 log.info("#1 Success, fill and remove");
847 }
848
849 //========== Test 2: remove prio 7 and 9 with num limit
850 {
851 DummyEntry[] queueEntries = {
852 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
853 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
854 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
855 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
856 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
857 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
858 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
859 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
860 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
861 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
862 new DummyEntry(glob, PriorityEnum.HIGH_PRIORITY, queue.getStorageId(), true),
863 new DummyEntry(glob, PriorityEnum.MAX_PRIORITY, queue.getStorageId(), true),
864 new DummyEntry(glob, PriorityEnum.MIN_PRIORITY, queue.getStorageId(), true)
865 };
866 this.queue.removeStorageSizeListener(null);
867 this.queue.addStorageSizeListener(this.queueSizeListener);
868 this.queueSizeListener.clear();
869
870 queue.put(queueEntries, false);
871
872 long numRemoved = queue.removeWithPriority(2, -1L, 7, 9);
873
874 assertEquals(ME+": Wrong number removed", 2, numRemoved);
875 assertEquals(ME+": Wrong size", queueEntries.length-2, queue.getNumOfEntries());
876 assertEquals("number of invocations", 2, this.queueSizeListener.getCount());
877 assertEquals("number of entries incremented on last invocation", -2, this.queueSizeListener.getLastIncrementEntries());
878 assertEquals("number of bytes incremented on last invocation", -2*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
879
880 log.info("#2 Success, fill and remove");
881 }
882 queue.shutdown();
883 }
884 catch(XmlBlasterException e) {
885 fail(ME + ": Exception thrown: " + e.getMessage());
886 }
887 }
888
889 //------------------------------------
890 public void testRemoveRandom() {
891
892 String queueType = "unknown";
893 try {
894 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
895 queueType = this.queue.toString();
896 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/removeRandom");
897 this.queue.initialize(queueId, prop);
898 queue.clear();
899 assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
900 removeRandom(this.queue);
901 }
902 catch (XmlBlasterException ex) {
903 log.severe("Exception when testing removeRandom probably due to failed initialization of the queue " + queueType);
904 }
905
906 }
907
908
909 /**
910 * Test removeRandom(long[])
911 */
912 private void removeRandom(I_Queue queue) {
913 ME = "I_QueueTest.removeRandom(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "][" + queue.getClass().getName() + "]";
914 System.out.println("***" + ME);
915 try {
916 //========== Test 1: remove 1 from 1
917 {
918 this.queue.removeStorageSizeListener(null);
919 this.queue.addStorageSizeListener(this.queueSizeListener);
920 this.queueSizeListener.clear();
921
922 //MsgUnit msgUnit = new MsgUnit("<key/>", "bla".getBytes(), "<qos/>");
923 DummyEntry[] queueEntries = { new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true) };
924 queue.put(queueEntries, false);
925
926 I_QueueEntry[] testEntryArr = { queueEntries[0] };
927 long numRemoved = 0L;
928 boolean[] tmpArr = queue.removeRandom(testEntryArr);
929 for (int i=0; i < tmpArr.length; i++) if(tmpArr[i]) numRemoved++;
930
931 assertEquals("number of entries incremented on last invocation", -1, this.queueSizeListener.getLastIncrementEntries());
932 assertEquals("number of bytes incremented on last invocation", -queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
933
934 assertEquals(ME+": Wrong number removed", 1, numRemoved);
935 assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries());
936 log.info("#1 Success, fill and random remove");
937 }
938
939 //========== Test 2: removeRandom 2 from 3
940 {
941 DummyEntry[] queueEntries = {
942 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
943 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
944 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)
945 };
946 queue.put(queueEntries, false);
947
948 I_QueueEntry[] testEntryArr = { queueEntries[0],
949 queueEntries[2]
950 };
951 long numRemoved = 0L;
952 boolean[] tmpArr = queue.removeRandom(testEntryArr);
953 for (int i=0; i < tmpArr.length; i++) if(tmpArr[i]) numRemoved++;
954
955 assertEquals("number of entries incremented on last invocation", -2, this.queueSizeListener.getLastIncrementEntries());
956 assertEquals("number of bytes incremented on last invocation", -2*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
957
958 assertEquals(ME+": Wrong number removed", 2, numRemoved);
959 assertEquals(ME+": Wrong size", 1, queue.getNumOfEntries());
960 I_QueueEntry result = queue.peek();
961 assertEquals(ME+": Wrong timestamp", queueEntries[1].getUniqueId(), result.getUniqueId());
962 queue.clear();
963 log.info("#2 Success, fill and random remove");
964 }
965
966 //========== Test 3: removeRandom 5 from 3
967 {
968 DummyEntry[] queueEntries = {
969 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
970 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
971 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true)
972 };
973 queue.put(queueEntries, false);
974
975 I_QueueEntry[] dataIdArr = {
976 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
977 queueEntries[0],
978 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
979 new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true),
980 queueEntries[2],
981 };
982 long numRemoved = 0L;
983 boolean[] tmpArr = queue.removeRandom(dataIdArr);
984 for (int i=0; i < tmpArr.length; i++) if(tmpArr[i]) numRemoved++;
985
986 assertEquals("number of entries incremented on last invocation", -2, this.queueSizeListener.getLastIncrementEntries());
987 assertEquals("number of bytes incremented on last invocation", -2*queueEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
988
989 assertEquals(ME+": Wrong number removed", 2, numRemoved);
990 assertEquals(ME+": Wrong size", 1, queue.getNumOfEntries());
991
992 I_QueueEntry entry = queue.peek();
993 assertTrue("Missing entry", (I_QueueEntry)null != entry);
994 assertEquals(ME+": Wrong entry removed", queueEntries[1].getUniqueId(), entry.getUniqueId());
995
996 queue.clear();
997 log.info("#3 Success, fill and random remove");
998 }
999
1000 //========== Test 4: removeRandom 0 from 0
1001 {
1002 DummyEntry[] queueEntries = new DummyEntry[0];
1003 queue.put(queueEntries, false);
1004
1005 I_QueueEntry[] dataIdArr = new I_QueueEntry[0];
1006
1007 long numRemoved = 0L;
1008 boolean[] tmpArr = queue.removeRandom(dataIdArr);
1009 for (int i=0; i < tmpArr.length; i++) if(tmpArr[i]) numRemoved++;
1010
1011 assertEquals(ME+": Wrong number removed", 0, numRemoved);
1012 assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries());
1013 queue.clear();
1014 log.info("#4 Success, fill and random remove");
1015 }
1016
1017 //========== Test 5: removeRandom null from null
1018 {
1019 queue.put((DummyEntry[])null, false);
1020
1021 // long numRemoved = queue.removeRandom((I_QueueEntry[])null);
1022 long numRemoved = 0L;
1023 boolean[] tmpArr = queue.removeRandom((I_QueueEntry[])null);
1024 for (int i=0; i < tmpArr.length; i++) if(tmpArr[i]) numRemoved++;
1025
1026 assertEquals(ME+": Wrong number removed", 0, numRemoved);
1027 assertEquals(ME+": Wrong size", 0, queue.getNumOfEntries());
1028 queue.clear();
1029 log.info("#5 Success, fill and random remove");
1030 }
1031
1032 queue.shutdown();
1033 System.out.println("***" + ME + " [SUCCESS]");
1034 }
1035 catch(XmlBlasterException e) {
1036 fail(ME + ": Exception thrown: " + e.getMessage());
1037 }
1038 }
1039
1040
1041
1042 //------------------------------------
1043 public void testTakeLowest() {
1044 String queueType = "unknown";
1045 try {
1046 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1047 queueType = this.queue.toString();
1048 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/takeLowest");
1049 this.queue.initialize(queueId, prop);
1050 queue.clear();
1051 assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
1052 takeLowest(this.queue);
1053 }
1054 catch (XmlBlasterException ex) {
1055 log.severe("Exception when testing removeRandomLong probably due to failed initialization of the queue " + queueType);
1056 }
1057
1058 }
1059
1060
1061
1062 /**
1063 * returns the number of entries left in the queue after this processing operation
1064 * @param queue the queue to use for this test
1065 * @param numEntries the number of entries to pass to the takeLowest operation
1066 * @param numBytes the num of bytes to pass to the takeLowest operation
1067 * @param leaveOne the flag to pass to the takeLowest operation
1068 * @param origEntries the array of the original entries put into the queue
1069 * @param entriesLeft number of entries left in the queue before this operation
1070 * @param currentEntries the number of entries which should have been processed by this operation
1071 */
1072 private final int assertCheckForTakeLowest(I_Queue queue, int numEntries, long numBytes,
1073 I_QueueEntry refEntry, boolean leaveOne, I_QueueEntry[] origEntries, int entriesLeft,
1074 int currentEntries, long size) throws XmlBlasterException {
1075 String me = ME + "/" + numEntries + "/" + numBytes + "/" + leaveOne + "/" + entriesLeft + "/" + currentEntries;
1076 if (log.isLoggable(Level.FINE)) log.fine("");
1077 assertEquals(me+": Wrong size of entry ", size, origEntries[0].getSizeInBytes());
1078 assertEquals(me+": Wrong amount of entries in queue before takeLowest invocation ", entriesLeft, queue.getNumOfEntries());
1079 assertEquals(me+": Wrong size of entries in queue before takeLowest invocation ", size*entriesLeft, queue.getNumOfBytes());
1080 assertEquals(me+": Wrong amount of persistent entries in queue before takeLowest invocation ", entriesLeft, queue.getNumOfPersistentEntries());
1081 assertEquals(me+": Wrong size of persistent entries in queue before takeLowest invocation ", size*entriesLeft, queue.getNumOfPersistentBytes());
1082
1083 List<I_Entry> list = null;
1084 try {
1085 list = queue.peekLowest(numEntries, numBytes, refEntry, leaveOne); // gives back all minus one
1086 assertEquals(me+": Wrong number of entries in peekLowest return ", currentEntries, list.size());
1087 assertEquals(me+": Wrong number of entries in queue after peekLowest invocation ", entriesLeft, queue.getNumOfEntries());
1088 assertEquals(me+": Wrong number of bytes in queue after peekLowest invocation ", size*(entriesLeft), queue.getNumOfBytes());
1089 assertEquals(me+": Wrong number of persistent bytes in queue after takeLowest invocation ", size*(entriesLeft), queue.getNumOfPersistentBytes());
1090 }
1091 catch (XmlBlasterException e) {
1092 if (e.getErrorCode()!=ErrorCode.INTERNAL_NOTIMPLEMENTED) throw e;
1093 }
1094
1095 list = queue.takeLowest(numEntries, numBytes, refEntry, leaveOne); // gives back all minus one
1096
1097 assertEquals("number of entries incremented on last invocation", -currentEntries, this.queueSizeListener.getLastIncrementEntries());
1098 assertEquals("number of bytes incremented on last invocation", -currentEntries*origEntries[0].getSizeInBytes(), this.queueSizeListener.getLastIncrementBytes());
1099
1100 assertEquals(me+": Wrong number of entries in takeLowest return ", currentEntries, list.size());
1101 assertEquals(me+": Wrong number of entries in queue after takeLowest invocation ", entriesLeft-currentEntries, queue.getNumOfEntries());
1102 assertEquals(me+": Wrong number of bytes in queue after takeLowest invocation ", size*(entriesLeft-currentEntries), queue.getNumOfBytes());
1103 assertEquals(me+": Wrong number of persistent bytes in queue after takeLowest invocation ", size*(entriesLeft-currentEntries), queue.getNumOfPersistentBytes());
1104
1105 for (int i=entriesLeft-currentEntries; i < entriesLeft; i++) {
1106 int j = entriesLeft - 1 - i;
1107 long ref = ((I_QueueEntry)list.get(j)).getUniqueId();
1108 assertEquals(me+": Wrong sequence in takeLowest", origEntries[i].getUniqueId(), ref);
1109 }
1110 return entriesLeft - currentEntries;
1111 }
1112
1113
1114 /**
1115 * Test takeLowest(I_Queue)
1116 */
1117 private void takeLowest(I_Queue queue) {
1118
1119 if (queue instanceof CacheQueueInterceptorPlugin) return;
1120
1121 ME = "I_QueueTest.takeLowest(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1122 System.out.println("***" + ME);
1123 try {
1124 //========== Test 1: takeLowest without restrictions
1125 {
1126 log.fine("takeLowest test 1");
1127 int imax = 50;
1128 long size = 0L;
1129 long msgSize = 100L; // every msg is 100 bytes long
1130 int entriesLeft = imax;
1131
1132
1133 this.queue.removeStorageSizeListener(null);
1134 this.queue.addStorageSizeListener(this.queueSizeListener);
1135 this.queueSizeListener.clear();
1136
1137 DummyEntry[] entries = new DummyEntry[imax];
1138 for (int i=0; i < imax; i++) {
1139 entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), msgSize, true);
1140 size += entries[i].getSizeInBytes();
1141 queue.put(entries[i], false);
1142 }
1143
1144 assertEquals(ME+": Wrong number put", imax, queue.getNumOfEntries());
1145 assertEquals(ME+": Wrong expected size in bytes of entries", msgSize*imax, size);
1146 assertEquals(ME+": Wrong size in bytes put", size, queue.getNumOfBytes());
1147
1148 entriesLeft = assertCheckForTakeLowest(queue, 0, -1L, null, true, entries, entriesLeft, 0, msgSize);
1149 entriesLeft = assertCheckForTakeLowest(queue, 1, -1L, null, true, entries, entriesLeft, 1, msgSize);
1150 entriesLeft = assertCheckForTakeLowest(queue, 2, -1L, null, true, entries, entriesLeft, 2, msgSize);
1151 entriesLeft = assertCheckForTakeLowest(queue, -1, 0L, null, true, entries, entriesLeft, 0, msgSize);
1152 entriesLeft = assertCheckForTakeLowest(queue, 0, 0L, null, true, entries, entriesLeft, 0, msgSize);
1153 entriesLeft = assertCheckForTakeLowest(queue, 1, 0L, null, true, entries, entriesLeft, 1, msgSize);
1154 entriesLeft = assertCheckForTakeLowest(queue, 2, 0L, null, true, entries, entriesLeft, 2, msgSize);
1155 entriesLeft = assertCheckForTakeLowest(queue, -1, 50L, null, true, entries, entriesLeft, 1, msgSize);
1156 entriesLeft = assertCheckForTakeLowest(queue, 0, 50L, null, true, entries, entriesLeft, 1, msgSize);
1157 entriesLeft = assertCheckForTakeLowest(queue, 1, 50L, null, true, entries, entriesLeft, 1, msgSize);
1158 entriesLeft = assertCheckForTakeLowest(queue, 2, 50L, null, true, entries, entriesLeft, 2, msgSize);
1159 entriesLeft = assertCheckForTakeLowest(queue, -1, 100L, null, true, entries, entriesLeft, 1, msgSize);
1160 entriesLeft = assertCheckForTakeLowest(queue, 0, 100L, null, true, entries, entriesLeft, 1, msgSize);
1161 entriesLeft = assertCheckForTakeLowest(queue, 1, 100L, null, true, entries, entriesLeft, 1, msgSize);
1162 entriesLeft = assertCheckForTakeLowest(queue, 2, 100L, null, true, entries, entriesLeft, 2, msgSize);
1163 entriesLeft = assertCheckForTakeLowest(queue, -1, 150L, null, true, entries, entriesLeft, 2, msgSize);
1164 entriesLeft = assertCheckForTakeLowest(queue, 0, 150L, null, true, entries, entriesLeft, 2, msgSize);
1165 entriesLeft = assertCheckForTakeLowest(queue, 1, 150L, null, true, entries, entriesLeft, 2, msgSize);
1166 entriesLeft = assertCheckForTakeLowest(queue, 2, 150L, null, true, entries, entriesLeft, 2, msgSize);
1167 entriesLeft = assertCheckForTakeLowest(queue, -1, 200L, null, true, entries, entriesLeft, 2, msgSize);
1168 entriesLeft = assertCheckForTakeLowest(queue, 0, 200L, null, true, entries, entriesLeft, 2, msgSize);
1169 entriesLeft = assertCheckForTakeLowest(queue, 1, 200L, null, true, entries, entriesLeft, 2, msgSize);
1170 entriesLeft = assertCheckForTakeLowest(queue, 2, 200L, null, true, entries, entriesLeft, 2, msgSize);
1171 entriesLeft = assertCheckForTakeLowest(queue, -1, -1L, null, true, entries, entriesLeft, entriesLeft-1, msgSize);
1172 entriesLeft = assertCheckForTakeLowest(queue, -1, -1L, null, false, entries, entriesLeft, 1, msgSize);
1173 assertEquals(ME+": Wrong size in takeLowest after cleaning ", queue.getNumOfEntries(), 0);
1174 queue.clear();
1175 }
1176
1177 //========== Test 2: takeLowest which should return an empty array
1178 {
1179 log.fine("takeLowest test 2");
1180 int imax = 20;
1181 long size = 0L;
1182
1183 DummyEntry[] entries = new DummyEntry[imax];
1184 for (int i=0; i < imax; i++) {
1185 entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1186 size += entries[i].getSizeInBytes();
1187 queue.put(entries[i], false);
1188 }
1189
1190 DummyEntry queueEntry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1191
1192 assertEquals(ME+": Wrong number put", imax, queue.getNumOfEntries());
1193 assertEquals(ME+": Wrong size in bytes put", size, queue.getNumOfBytes());
1194
1195 // should return an empty array since the timestamp is the last
1196 List<I_Entry> list = queue.takeLowest(-1, -1, queueEntry, true);
1197
1198 assertEquals(ME+": Wrong size in takeLowest return ", 0, list.size());
1199 queue.clear();
1200 assertEquals(ME+": Wrong size in takeLowest after cleaning ", 0, queue.getNumOfEntries());
1201 }
1202
1203
1204 //========== Test 3: takeLowest should return 13 entries
1205 {
1206 log.fine("takeLowest test 3");
1207 int imax = 20;
1208 long size = 0L;
1209
1210 DummyEntry[] entries = new DummyEntry[imax];
1211 for (int i=0; i < imax; i++) {
1212 entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1213 size += entries[i].getSizeInBytes();
1214 queue.put(entries[i], false);
1215 }
1216 DummyEntry queueEntry = entries[6];
1217
1218 assertEquals(ME+": Wrong number put", imax, queue.getNumOfEntries());
1219 assertEquals(ME+": Wrong size in bytes put", size, queue.getNumOfBytes());
1220
1221 // should return an empty array since the timestamp is the last
1222 List<I_Entry> list = queue.takeLowest(-1, -1, queueEntry, true);
1223
1224 assertEquals(ME+": Wrong size in takeLowest return ", list.size(), imax-6-1);
1225 queue.clear();
1226 assertEquals(ME+": Wrong size in takeLowest after cleaning ", queue.getNumOfEntries(), 0);
1227 }
1228
1229
1230 //========== Test 4: takeLowest without restrictions
1231 {
1232 log.fine("takeLowest test 4 (with entry null)");
1233 int imax = 20;
1234 long size = 0L;
1235
1236 DummyEntry[] entries = new DummyEntry[imax];
1237 for (int i=0; i < imax; i++) {
1238 entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1239 size += entries[i].getSizeInBytes();
1240 queue.put(entries[i], false);
1241 }
1242
1243 assertEquals(ME+": Wrong number put", imax, queue.getNumOfEntries());
1244 assertEquals(ME+": Wrong size in bytes put", size, queue.getNumOfBytes());
1245
1246 List<I_Entry> list = queue.takeLowest(-1, -1, null, true);
1247
1248 assertEquals(ME+": Wrong size in takeLowest return ", list.size(), entries.length-1);
1249 for (int i=1; i < imax; i++) {
1250 int j = imax - 1 - i;
1251 long ref = ((I_QueueEntry)list.get(j)).getUniqueId();
1252 assertEquals(ME+": Wrong unique ID", entries[i].getUniqueId(), ref);
1253 }
1254 queue.clear();
1255 assertEquals(ME+": Wrong size in takeLowest after cleaning ", queue.getNumOfEntries(), 0);
1256 }
1257
1258
1259 }
1260 catch(XmlBlasterException e) {
1261 fail(ME + ": Exception thrown: " + e.getMessage());
1262 }
1263 }
1264
1265
1266 public void testWrongOrder() {
1267 String queueType = "unknown";
1268 try {
1269 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1270 queueType = this.queue.toString();
1271 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/takeLowest");
1272 this.queue.initialize(queueId, prop);
1273 queue.clear();
1274 assertEquals(ME + "wrong size before starting ", 0, queue.getNumOfEntries());
1275 wrongOrder(this.queue);
1276 }
1277 catch (XmlBlasterException ex) {
1278 log.severe("Exception when testing removeRandomLong probably due to failed initialization of the queue " + queueType);
1279 }
1280
1281 }
1282
1283 /**
1284 * Test wrongOrder(I_Queue)
1285 */
1286 private void wrongOrder(I_Queue queue) {
1287 ME = "I_QueueTest.wrongOrder(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1288 System.out.println("***" + ME);
1289 try {
1290 //========== Test 1: checks if entries are returned in the correct
1291 // order even if they are inserted in the wrong order
1292 {
1293 log.fine("wrongOrder test 1");
1294 int imax = 5;
1295 long size = 0L;
1296
1297 DummyEntry[] entries = new DummyEntry[imax];
1298 for (int i=0; i < imax; i++) {
1299 entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1300 size += entries[i].getSizeInBytes();
1301 }
1302
1303 DummyEntry[] putEntries = new DummyEntry[imax];
1304 putEntries[0] = entries[3];
1305 putEntries[1] = entries[4];
1306 putEntries[2] = entries[2];
1307 putEntries[3] = entries[0];
1308 putEntries[4] = entries[1];
1309
1310 queue.put(putEntries, false);
1311
1312 assertEquals(ME+": Wrong number put", imax, queue.getNumOfEntries());
1313 assertEquals(ME+": Wrong size in bytes put", size, queue.getNumOfBytes());
1314
1315 List<I_Entry> listPeekSamePrio = queue.peekSamePriority(-1, -1L);
1316 List<I_Entry> listPeekWithPrio = queue.peekWithPriority(-1, -1L, 0, 10);
1317 List<I_Entry> listPeek = queue.peek(-1, -1L);
1318
1319 //they all should give the same result ...
1320 for (int i=0; i<imax; i++) {
1321 long id = entries[i].getUniqueId();
1322 long idPeekSamePrio = ((I_QueueEntry)listPeekSamePrio.get(i)).getUniqueId();
1323 long idPeekWithPrio = ((I_QueueEntry)listPeekWithPrio.get(i)).getUniqueId();
1324 long idPeek = ((I_QueueEntry)listPeek.get(i)).getUniqueId();
1325 assertEquals(ME+": Wrong entry for peekSamePrio ", id, idPeekSamePrio);
1326 assertEquals(ME+": Wrong entry for peekWithPrio ", id, idPeekWithPrio);
1327 assertEquals(ME+": Wrong entry for peek ", id, idPeek);
1328 }
1329 queue.clear();
1330 assertEquals(ME+": Wrong size in takeLowest after cleaning ", queue.getNumOfEntries(), 0);
1331 }
1332
1333 }
1334 catch(XmlBlasterException e) {
1335 fail(ME + ": Exception thrown: " + e.getMessage());
1336 }
1337 }
1338
1339
1340 public void testPutEntriesTwice() {
1341 String queueType = "unknown";
1342 try {
1343 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1344 queueType = this.queue.toString();
1345 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/putEntriesTwice");
1346 this.queue.initialize(queueId, prop);
1347 queue.clear();
1348 assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1349 putEntriesTwice(this.queue);
1350 }
1351 catch (XmlBlasterException ex) {
1352 log.severe("Exception when testing putEntriesTwice probably due to failed initialization of the queue " + queueType);
1353 }
1354 }
1355
1356
1357 /**
1358 * Test wrongOrder(I_Queue)
1359 */
1360 private void putEntriesTwice(I_Queue queue) {
1361 ME = "I_QueueTest.putEntriesTwice(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1362 System.out.println("***" + ME);
1363 try {
1364 //========== Test 1: checks if entries are returned in the correct
1365 // order even if they are inserted in the wrong order
1366 {
1367 log.fine("putEntriesTwice test 1");
1368 int imax = 5;
1369 long size = 0L;
1370
1371 DummyEntry[] entries = new DummyEntry[imax];
1372 for (int i=0; i < imax; i++) {
1373 entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1374 size += entries[i].getSizeInBytes();
1375 }
1376
1377 queue.put(entries, false);
1378 queue.put(entries, false);
1379
1380 assertEquals(ME+": Wrong number of entries after putting same entries twice ", imax, queue.getNumOfEntries());
1381 queue.removeRandom(entries);
1382
1383 assertEquals(ME+": Wrong size in takeLowest after cleaning ", queue.getNumOfEntries(), 0);
1384 }
1385 }
1386 catch(XmlBlasterException e) {
1387 fail(ME + ": Exception thrown: " + e.getMessage());
1388 }
1389 }
1390
1391
1392
1393 public void testPeekWithLimitEntry() {
1394 String queueType = "unknown";
1395 try {
1396 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1397 queueType = this.queue.toString();
1398 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/peekWithLimitEntry");
1399 this.queue.initialize(queueId, prop);
1400 queue.clear();
1401 assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1402 peekWithLimitEntry(this.queue);
1403 }
1404 catch (XmlBlasterException ex) {
1405 log.severe("Exception when testing peekWithLimitEntry probably due to failed initialization of the queue " + queueType);
1406 }
1407 }
1408
1409
1410 /**
1411 * Test testPeekWithLimitEntry(I_Queue)
1412 */
1413 private void peekWithLimitEntry(I_Queue queue) {
1414 ME = "I_QueueTest.peekWithLimitEntry(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1415 System.out.println("***" + ME);
1416 try {
1417 //========== Test 1: normal case where limitEntry is contained in the queue
1418 {
1419 log.fine("peekWithLimitEntry test 1");
1420 int imax = 5;
1421
1422 DummyEntry[] entries = new DummyEntry[imax];
1423 entries[0] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1424 entries[3] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1425 entries[1] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1426 entries[4] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1427 entries[2] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1428
1429 queue.put(entries, false);
1430 assertEquals(ME+": Wrong number of entries after putting same entries ", imax, queue.getNumOfEntries());
1431
1432 List<I_Entry> list = queue.peekWithLimitEntry(entries[3]);
1433 assertEquals(ME+": Wrong number of peeked entries (with limit) ", 3, list.size());
1434 for (int i=0; i < list.size(); i++) {
1435 assertEquals(ME + ": Wrong order in peeked entries (with limit): ", entries[i].getUniqueId(), ((I_QueueEntry)list.get(i)).getUniqueId());
1436 }
1437
1438 queue.removeRandom(entries);
1439 assertEquals(ME+": Wrong size in peekWithLimitEntry after cleaning ", queue.getNumOfEntries(), 0);
1440 }
1441
1442 //========== Test 2: normal case where limitEntry is NOT contained in the queue (should not return anything)
1443 {
1444 log.fine("peekWithLimitEntry test 2");
1445 int imax = 5;
1446
1447 DummyEntry[] entries = new DummyEntry[imax];
1448 entries[0] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1449 entries[3] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1450 entries[1] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1451 entries[4] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1452 entries[2] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1453
1454 DummyEntry limitEntry = new DummyEntry(glob, PriorityEnum.HIGH8_PRIORITY, queue.getStorageId(), true);
1455
1456 queue.put(entries, false);
1457 assertEquals(ME+": Wrong number of entries after putting same entries ", imax, queue.getNumOfEntries());
1458
1459 List<I_Entry> list = queue.peekWithLimitEntry(limitEntry);
1460 assertEquals(ME+": Wrong number of peeked entries (with limit) ", 0, list.size());
1461 queue.removeRandom(entries);
1462 assertEquals(ME+": Wrong size in peekWithLimitEntry after cleaning ", queue.getNumOfEntries(), 0);
1463 }
1464
1465 //========== Test 3: normal case where limitEntry is NOT contained in the queue
1466 {
1467 log.fine("peekWithLimitEntry test 3");
1468 int imax = 5;
1469
1470 DummyEntry[] entries = new DummyEntry[imax];
1471 entries[0] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1472 entries[3] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1473 entries[1] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1474 entries[4] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1475 entries[2] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1476
1477 DummyEntry limitEntry = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1478
1479 queue.put(entries, false);
1480 assertEquals(ME+": Wrong number of entries after putting same entries ", imax, queue.getNumOfEntries());
1481
1482 List<I_Entry> list = queue.peekWithLimitEntry(limitEntry);
1483 assertEquals(ME+": Wrong number of peeked entries (with limit) ", imax, list.size());
1484 for (int i=0; i < list.size(); i++) {
1485 assertEquals(ME + ": Wrong order in peeked entries (with limit): ", entries[i].getUniqueId(), ((I_QueueEntry)list.get(i)).getUniqueId());
1486 }
1487
1488 queue.removeRandom(entries);
1489 assertEquals(ME+": Wrong size in peekWithLimitEntry after cleaning ", queue.getNumOfEntries(), 0);
1490 }
1491
1492 //========== Test 4: normal case where limitEntry is NOT contained in the queue
1493 {
1494 log.fine("peekWithLimitEntry test 4");
1495 int imax = 5;
1496
1497 DummyEntry[] entries = new DummyEntry[imax];
1498 entries[0] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1499 entries[3] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1500 entries[1] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1501 entries[4] = new DummyEntry(glob, PriorityEnum.LOW_PRIORITY, queue.getStorageId(), true);
1502 entries[2] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1503
1504 queue.put(entries, false);
1505 assertEquals(ME+": Wrong number of entries after putting same entries ", imax, queue.getNumOfEntries());
1506
1507 List<I_Entry> list = queue.peekWithLimitEntry(null);
1508 assertEquals(ME+": Wrong number of peeked entries (with limit) ", 0, list.size());
1509
1510 queue.removeRandom(entries);
1511 assertEquals(ME+": Wrong size in peekWithLimitEntry after cleaning ", queue.getNumOfEntries(), 0);
1512 }
1513 }
1514 catch(XmlBlasterException e) {
1515 fail(ME + ": Exception thrown: " + e.getMessage());
1516 }
1517 }
1518
1519
1520
1521 public void testSizesCheck() {
1522 String queueType = "unknown";
1523 try {
1524 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1525 queueType = this.queue.toString();
1526 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/testSizes");
1527 this.queue.initialize(queueId, prop);
1528 queue.clear();
1529 assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1530 sizesCheck(this.queue);
1531 }
1532 catch (XmlBlasterException ex) {
1533 log.severe("Exception when testing sizesCheck probably due to failed initialization of the queue " + queueType);
1534 }
1535 }
1536
1537
1538 /**
1539 * Test sizesCheck(I_Queue)
1540 */
1541 private void sizesCheck(I_Queue queue) {
1542 ME = "I_QueueTest.sizesCheck(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1543 System.out.println("***" + ME);
1544 try {
1545 //========== Test 1: normal case where limitEntry is contained in the queue
1546 {
1547 log.fine("sizesCheck test 1");
1548 int imax = 20;
1549
1550 DummyEntry[] entries = new DummyEntry[imax];
1551 List<I_Entry> list = new ArrayList<I_Entry>();
1552
1553 for (int i=0; i < imax; i++) {
1554 entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), true);
1555 list.add(entries[i]);
1556 }
1557
1558 queue.put(entries, false);
1559 this.checkSizeAndEntries("sizesCheck test 1: ", list, queue);
1560
1561 if (queue instanceof CacheQueueInterceptorPlugin) return;
1562 log.info("size of list before: " + list.size());
1563 queue.takeLowest(2, 100L, null, true);
1564 list.remove(list.size()-1);
1565 list.remove(list.size()-1);
1566 log.info("size of list after: " + list.size());
1567
1568 this.checkSizeAndEntries("sizesCheck test 1 (after takeLowest): ", list, queue);
1569
1570 queue.removeRandom(entries);
1571 list.removeAll(list);
1572 this.checkSizeAndEntries("sizesCheck test 1 (after removing): ", list, queue);
1573
1574
1575 }
1576 }
1577 catch(XmlBlasterException e) {
1578 fail(ME + ": Exception thrown: " + e.getMessage());
1579 }
1580 }
1581
1582 public void testBigEntries() {
1583 String queueType = "unknown";
1584 try {
1585 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1586 queueType = this.queue.toString();
1587 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/testSizes");
1588 this.queue.initialize(queueId, prop);
1589 queue.clear();
1590 assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1591 bigEntries(this.queue);
1592 }
1593 catch (XmlBlasterException ex) {
1594 log.severe("Exception when testing sizesCheck probably due to failed initialization of the queue " + queueType);
1595 }
1596 }
1597
1598 /*
1599 public void testPublishMsgBigEntry() {
1600 String queueType = "unknown";
1601 try {
1602 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1603 queueType = this.queue.toString();
1604 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "QueuePlugin/testSizes");
1605 this.queue.initialize(queueId, prop);
1606 queue.clear();
1607 assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1608 publishMsgBigEntry(this.queue);
1609 }
1610 catch (XmlBlasterException ex) {
1611 log.severe("Exception when testing sizesCheck probably due to failed initialization of the queue " + queueType);
1612 }
1613 }
1614 */
1615
1616 /**
1617 * Test bigEngtries(I_Queue)
1618 * It tests the insertion and removal of entries which contain a large blob (2.1MB)
1619 */
1620 private void publishMsgBigEntry(I_Queue queue) {
1621 ME = "I_QueueTest.publishMsgBigEntry(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1622 System.out.println("***" + ME);
1623 try {
1624 {
1625 log.fine("start test");
1626 int msgSize = 1000000;
1627
1628 StorageId storageId = new StorageId(glob, "mystore", "test");
1629 byte[] content = new byte[msgSize];
1630 MsgUnit msgUnit = new MsgUnit(this.glob, "<key oid='aaa'/>", content, "<qos/>");
1631 MsgQueuePublishEntry pubEntry = new MsgQueuePublishEntry(this.glob, msgUnit, storageId);
1632 queue.put(pubEntry, false);
1633 I_Entry entry = queue.peek();
1634 queue.removeRandom(entry);
1635 }
1636 }
1637 catch(XmlBlasterException e) {
1638 fail(ME + ": Exception thrown: " + e.getMessage());
1639 }
1640 }
1641
1642 /**
1643 * Test bigEngtries(I_Queue)
1644 * It tests the insertion and removal of entries which contain a large blob (2.1MB)
1645 */
1646 private void bigEntries(I_Queue queue) {
1647 ME = "I_QueueTest.bigEntries(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1648 System.out.println("***" + ME);
1649 try {
1650 {
1651 log.fine("start test");
1652 int imax = 3;
1653 long msgSize = 202010L;
1654 DummyEntry[] entries = new DummyEntry[imax];
1655 List<I_Entry> list = new ArrayList<I_Entry>();
1656
1657 for (int i=0; i < imax; i++) {
1658 entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), msgSize, true);
1659 list.add(entries[i]);
1660 }
1661
1662 queue.put(entries, false);
1663 this.checkSizeAndEntries("sizesCheck test 1: ", list, queue);
1664 List<I_Entry> entriesArray = queue.peek(imax, -1L);
1665 assertEquals("wrong number of big entries retrieved", imax, entriesArray.size());
1666 queue.removeRandom(entries);
1667 list.removeAll(list);
1668 this.checkSizeAndEntries("sizesCheck test 1 (after removing): ", list, queue);
1669
1670
1671 }
1672 }
1673 catch(XmlBlasterException e) {
1674 fail(ME + ": Exception thrown: " + e.getMessage());
1675 }
1676 }
1677
1678 // ---------------------------------------------------------------------
1679
1680 public void testOverflow() {
1681 String queueType = "unknown";
1682 try {
1683 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
1684 prop.setMaxEntries(1L);
1685 prop.setMaxEntriesCache(1L);
1686
1687 queueType = this.queue.toString();
1688 StorageId queueId = new StorageId(glob, Constants.RELATING_CALLBACK, "QueuePlugin/testOverflow");
1689 this.queue.initialize(queueId, prop);
1690 queue.clear();
1691 assertEquals(ME + " wrong size before starting ", 0, queue.getNumOfEntries());
1692 overflow(this.queue);
1693 }
1694 catch (XmlBlasterException ex) {
1695 log.severe("Exception when testing overflowCheck probably due to failed initialization of the queue " + queueType);
1696 }
1697 }
1698
1699
1700 /**
1701 * Test overflow(I_Queue)
1702 * It tests if the overflow mechanism works OK
1703 */
1704 private void overflow(I_Queue queue) {
1705 ME = "I_QueueTest.overflow(" + queue.getStorageId() + ")[" + queue.getClass().getName() + "]";
1706 System.out.println("***" + ME);
1707 try {
1708 log.fine("start test");
1709 int imax = 4;
1710 long msgSize = 100L;
1711 boolean isPersistent = true;
1712 DummyEntry[] entries = new DummyEntry[imax];
1713
1714 for (int i=0; i < imax; i++) {
1715 entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), msgSize, isPersistent);
1716 }
1717
1718 queue.put(entries[0], false); // <-- OK
1719 queue.put(entries[1], false); // <-- OK
1720 try {
1721 queue.put(entries[2], false); // <-- overflow
1722 assertTrue("here we expect an overflow exception", false);
1723 }
1724 catch (XmlBlasterException ex) {
1725 log.info("overflow: an exception here is OK since it was expected due to overflow of the queue");
1726 }
1727 try {
1728 queue.put(entries[3], false); // <-- overflow
1729 assertTrue("here we expect an overflow exception", false);
1730 }
1731 catch (XmlBlasterException ex) {
1732 log.info("overflow: an exception here is OK since it was expected due to overflow of the queue");
1733 }
1734
1735 List<I_Entry> ret = queue.peek(4, -1L);
1736 assertEquals("the number of entries in the queue", 2, ret.size());
1737 for (int i=0; i < 2; i++) {
1738 assertEquals(ME + ".overflow: entry '" + i + "' in sequence is wrong", entries[i].getUniqueId(), ((I_QueueEntry)ret.get(i)).getUniqueId());
1739 }
1740 }
1741 catch(XmlBlasterException e) {
1742 fail(ME + ": Exception thrown: " + e.getMessage());
1743 }
1744 }
1745
1746 // ---------------------------------------------------------------------
1747
1748 public void tearDown() {
1749 try {
1750 this.queue.clear();
1751 this.queue.shutdown();
1752 }
1753 catch (Exception ex) {
1754 log.severe("error when tearing down " + ex.getMessage());
1755 }
1756 }
1757
1758
1759 /**
1760 * Method is used by TestRunner to load these tests
1761 */
1762 public static Test suite()
1763 {
1764 TestSuite suite= new TestSuite();
1765 Global glob = new Global();
1766 for (int i=0; i<PLUGIN_TYPES.length; i++) {
1767 suite.addTest(new I_QueueTest("testConfig", i, glob));
1768 suite.addTest(new I_QueueTest("testSize1", i, glob));
1769 suite.addTest(new I_QueueTest("testPutMsg", i, glob));
1770 suite.addTest(new I_QueueTest("testPeekMsg", i, glob));
1771 suite.addTest(new I_QueueTest("testRemoveRandom", i, glob));
1772 suite.addTest(new I_QueueTest("testRemoveWithPriority", i, glob));
1773 suite.addTest(new I_QueueTest("testTakeLowest", i, glob));
1774 suite.addTest(new I_QueueTest("testPutEntriesTwice", i, glob));
1775 suite.addTest(new I_QueueTest("testPeekWithLimitEntry", i, glob));
1776 suite.addTest(new I_QueueTest("testSizesCheck", i, glob));
1777 suite.addTest(new I_QueueTest("testBigEntries", i, glob));
1778 suite.addTest(new I_QueueTest("testOverflow", i, glob));
1779 }
1780 return suite;
1781 }
1782
1783 /**
1784 * <pre>
1785 * java org.xmlBlaster.test.classtest.queue.I_QueueTest
1786 * </pre>
1787 */
1788 public static void main(String args[]) {
1789
1790 Global glob = new Global(args);
1791
1792 for (int i=0; i < PLUGIN_TYPES.length; i++) {
1793 I_QueueTest testSub = new I_QueueTest("I_QueueTest", i, glob);
1794
1795 long startTime = System.currentTimeMillis();
1796
1797 testSub.setUp();
1798 testSub.testSizesCheck();
1799 testSub.tearDown();
1800
1801 /*
1802 testSub.setUp();
1803 testSub.testPublishMsgBigEntry();
1804 testSub.tearDown();
1805 */
1806
1807 testSub.setUp();
1808 testSub.testPeekMsgBlocking();
1809 testSub.tearDown();
1810
1811 testSub.setUp();
1812 testSub.testConfig();
1813 testSub.tearDown();
1814
1815 testSub.setUp();
1816 testSub.testSize1();
1817 testSub.tearDown();
1818
1819 testSub.setUp();
1820 testSub.testPutMsg();
1821 testSub.tearDown();
1822
1823 testSub.setUp();
1824 testSub.testPeekMsg();
1825 testSub.tearDown();
1826
1827 testSub.setUp();
1828 testSub.testRemoveRandom();
1829 testSub.tearDown();
1830
1831 testSub.setUp();
1832 testSub.testRemoveWithPriority();
1833 testSub.tearDown();
1834
1835 testSub.setUp();
1836 testSub.testTakeLowest();
1837 testSub.tearDown();
1838
1839 testSub.setUp();
1840 testSub.testPutEntriesTwice();
1841 testSub.tearDown();
1842
1843 testSub.setUp();
1844 testSub.testPeekWithLimitEntry();
1845 testSub.tearDown();
1846
1847 testSub.setUp();
1848 testSub.testBigEntries();
1849 testSub.tearDown();
1850
1851 testSub.setUp();
1852 testSub.testOverflow();
1853 testSub.tearDown();
1854
1855 long usedTime = System.currentTimeMillis() - startTime;
1856 I_QueueTest.log.info("time used for tests: " + usedTime/1000 + " seconds");
1857 }
1858 }
1859 }
syntax highlighted by Code2HTML, v. 0.9.1