1 /*-----------------------------------------------------------------------------
  2 Name:      TestQueue.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Testing the Timeout Features
  6 -----------------------------------------------------------------------------*/
  7 #include "TestSuite.h"
  8 #include <vector>
  9 #include <iostream>
 10 /*#include "tut.h"*/
 11 #include <util/queue/QueueFactory.h>
 12 #include <util/queue/I_Queue.h>
 13 #include <util/queue/PublishQueueEntry.h>
 14 #include <util/queue/ConnectQueueEntry.h>
 15 #include <util/queue/SubscribeQueueEntry.h>
 16 #include <util/queue/UnSubscribeQueueEntry.h>
 17 
 18 namespace org { namespace xmlBlaster { namespace test {
 19 
 20 using namespace std;
 21 using namespace org::xmlBlaster::util;
 22 using namespace org::xmlBlaster::util::qos;
 23 using namespace org::xmlBlaster::util::qos::storage;
 24 using namespace org::xmlBlaster::util::queue;
 25 using namespace org::xmlBlaster::client;
 26 using namespace org::xmlBlaster::client::qos;
 27 using namespace org::xmlBlaster::client::key;
 28 
 29 /**
 30  * Tests the queue entry and queue functionality. 
 31  * The following is tested here:
 32  * - PublishQueueEntry comparison operators
 33  * - ConnectQueueEntry comparison operators
 34  * - Intermixed comparisons (between PublishQueueEntry and ConnectQueueEntry).
 35  * - Queue access and overflow
 36  */
 37 class TestQueue
 38 {
 39    
 40 private:
 41    string    ME;
 42    Global&   global_;
 43    I_Log&    log_;
 44    I_Queue* queue_;
 45 
 46 public:
 47    /** The values for "-queue/connection/type"; */
 48    std::vector<string> types;
 49 
 50 public:
 51    TestQueue(Global& global, string name) : ME(name), global_(global), log_(global.getLog("test"))
 52    {
 53       queue_ = NULL;
 54       types.push_back("RAM");
 55       types.push_back("SQLite");
 56       types.push_back("CACHE");
 57    }
 58 
 59    virtual ~TestQueue() { }
 60 
 61    void destroyQueue() {
 62       ClientQueueProperty prop(global_, "");
 63       I_Queue *queue = &QueueFactory::getFactory().getPlugin(global_, prop);
 64       queue->destroy();
 65       QueueFactory::getFactory().releasePlugin(queue);
 66    }
 67 
 68    void testPublishCompare() 
 69    {
 70       string me = ME + "::testPublishCompare";
 71       log_.info(me, "");
 72       log_.info(me, "comparison test between PublishQueueEntry objects.");
 73 
 74       PublishKey pubKey(global_);
 75       PublishQos pubQos(global_);
 76       MessageUnit msgUnit(pubKey, string("comparison test"), pubQos);
 77       PublishQueueEntry entry1(global_, msgUnit);
 78       PublishQueueEntry entry2(global_, msgUnit);
 79       PublishQueueEntry entry3(global_, msgUnit, 2);
 80       PublishQueueEntry entry4(global_, msgUnit, 3);
 81       PublishQueueEntry entry5(global_, msgUnit, 1);
 82 
 83       assertEquals(log_, me, true, entry2 < entry1, "1. PublishQos compare 2 with 1");
 84       assertEquals(log_, me, true, entry3 < entry4, "2. PublishQos compare 3 with 4");
 85       assertEquals(log_, me, true, entry5 < entry4, "3. PublishQos compare 5 with 4");
 86 
 87       log_.info(me, "test ended successfully");
 88    }
 89 
 90 
 91    void testConnectCompare() 
 92    {
 93       string me = ME + "::testConnectCompare";
 94       log_.info(me, "");
 95       log_.info(me, "comparison test between ConnectQueueEntry objects.");
 96 
 97       ConnectQos *connectQos = new ConnectQos(global_);
 98       ConnectQueueEntry entry1(global_, connectQos);
 99       ConnectQueueEntry entry2(global_, connectQos);
100       ConnectQueueEntry entry3(global_, connectQos, 2);
101       ConnectQueueEntry entry4(global_, connectQos, 3);
102       ConnectQueueEntry entry5(global_, connectQos, 1);
103 
104       assertEquals(log_, me, true, entry2 < entry1, "1. PublishQos compare 2 with 1");
105       assertEquals(log_, me, true, entry3 < entry4, "2. PublishQos compare 3 with 4");
106       assertEquals(log_, me, true, entry5 < entry4, "3. PublishQos compare 5 with 4");
107 
108       log_.info(me, "test ended successfully");
109    }
110 
111    void testMixedCompare() 
112    {
113       string me = ME + "::testMixedCompare";
114       log_.info(me, "");
115       log_.info(me, "comparison test between PublishQueueEntry and ConnectQueueEntry objects.");
116 
117       PublishKey pubKey(global_);
118       PublishQos pubQos(global_);
119       MessageUnit msgUnit(pubKey, string("comparison test"), pubQos);
120       ConnectQos *connectQos = new ConnectQos(global_);
121 
122       PublishQueueEntry entry1(global_, msgUnit, 2);
123       ConnectQueueEntry entry2(global_, connectQos, 3);
124       PublishQueueEntry entry3(global_, msgUnit, 1);
125 
126       ConnectQueueEntry entry4(global_, connectQos, 2);
127       PublishQueueEntry entry5(global_, msgUnit, 3);
128       ConnectQueueEntry entry6(global_, connectQos, 1);
129 
130       assertEquals(log_, me, true, entry1 < entry2, "1. Mixed compare 1 with 2");
131       assertEquals(log_, me, true, entry3 < entry2, "2. Mixed compare 3 with 2");
132 
133       assertEquals(log_, me, true, entry4 < entry5, "3. Mixed compare 4 with 5");
134       assertEquals(log_, me, true, entry6 < entry5, "4. Mixed compare 6 with 5");
135 
136       log_.info(me, "test completed successfully");
137    }
138 
139 
140    void testWithOnePublishEntry()
141    {
142       string me = ME + "::testWithOnePublishEntry";
143       log_.info(me, "");
144       log_.info(me, "this test creates a queue. The following checks are done:");
145       ClientQueueProperty prop(global_, "");
146       queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
147       assertEquals(log_, me, true, queue_->empty(), "The queue must be empty after creation");
148       assertEquals(log_, me, 0, queue_->getNumOfEntries(), "The queue must be empty after creation");
149       PublishQos qos(global_);
150       PublishKey key(global_);
151       const string contentStr = "BlaBla";
152       MessageUnit messageUnit(key, contentStr, qos);
153       PublishQueueEntry entry(global_, messageUnit, messageUnit.getQos().getPriority());
154       std::cout << "Putting " << entry.getUniqueId() << std::endl;
155 
156       queue_->put(entry);
157       assertEquals(log_, me, false, queue_->empty(), " 2. the queue must contain entries after invoking put one time");
158       assertEquals(log_, me, 1, queue_->getNumOfEntries(), " 2b. the queue must contain one entry after invoking put one time");
159       
160       vector<EntryType> ret = queue_->peekWithSamePriority();
161       assertEquals(log_, me, (size_t)1, ret.size(), " 3. the number of entries peeked after one put must be 1");
162       {
163          const MsgQueueEntry &e = *ret[0];
164          std::cout << "Peeking " << e.getUniqueId() << std::endl;
165          assertEquals(log_, me, entry.getUniqueId(),  e.getUniqueId(), " 3. the uniqueId must be same");
166          assertEquals(log_, me, entry.getPriority(),  e.getPriority(), " 3. the priority must be same");
167       }
168       long numDel = queue_->randomRemove(ret.begin(), ret.end());
169       assertEquals(log_, me, (long)1, numDel, " 4. randomRemove must return 1 entry deleted");
170       assertEquals(log_, me, true, queue_->empty(), " 5. after removing all entries (it was only 1 entry) the queue  must be empty");
171       log_.info(me, "ends here. Test was successful.");
172    }
173 
174 
175    void testWithOneConnectEntry()
176    {
177       string me = ME + "::testWithOneEntry";
178       log_.info(me, "");
179       log_.info(me, "this test creates a queue. The following checks are done:");
180       ClientQueueProperty prop(global_, "");
181       queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
182       assertEquals(log_, me, true, queue_->empty(), " 1. the queue must be empty after creation");
183       ConnectQos *connQos = new ConnectQos(global_);
184       ConnectQueueEntry entry(global_, connQos);
185       queue_->put(entry);
186       assertEquals(log_, me, false, queue_->empty(), " 2. the queue must contain entries after invoking put one time");
187       vector<EntryType> ret = queue_->peekWithSamePriority();
188       assertEquals(log_, me, (size_t)1, ret.size(), " 3. the number of entries peeked after one put must be 1");
189       assertEquals(log_, me, (long)1, queue_->randomRemove(ret.begin(), ret.end()), " 4. randomRemove must return 1 entry deleted");
190       assertEquals(log_, me, true, queue_->empty(), " 5. after removing all entries (it was only 1 entry) the queue  must be empty");
191       log_.info(me, "ends here. Test was successful.");
192    }
193 
194 
195    void testOrder()
196    {
197       string me = ME + "::testOrder";
198       log_.info(me, "");
199       log_.info(me, "this test checks the order in which entries are returned from the queue");
200       ClientQueueProperty prop(global_, "");
201       queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
202       ConnectQos *connQos = new ConnectQos(global_);
203 
204       ConnectQueueEntry e1(global_, ConnectQosRef(new ConnectQos(global_)), 1);
205       e1.getConnectQos()->addClientProperty("X", 7);
206       queue_->put(e1);
207 
208       ConnectQueueEntry e2(global_, ConnectQosRef(new ConnectQos(global_)), 5);  // NORM_PRIORITY
209       e2.getConnectQos()->addClientProperty("X", 4);
210       queue_->put(e2);
211 
212       ConnectQueueEntry e3(global_, ConnectQosRef(new ConnectQos(global_)), 7);
213       e3.getConnectQos()->addClientProperty("X", 1);
214       queue_->put(e3);
215 
216       ConnectQueueEntry e4(global_, ConnectQosRef(new ConnectQos(global_)), 7);
217       e4.getConnectQos()->addClientProperty("X", 2);
218       queue_->put(e4);
219 
220       ConnectQueueEntry e5(global_, ConnectQosRef(new ConnectQos(global_)), 1);  // MIN1_PRIORITY
221       e5.getConnectQos()->addClientProperty("X", 8);
222       queue_->put(e5);
223 
224       ConnectQueueEntry e6(global_, ConnectQosRef(new ConnectQos(global_)), 5);
225       e6.getConnectQos()->addClientProperty("X", 5);
226       queue_->put(e6);
227 
228       ConnectQueueEntry e7(global_, ConnectQosRef(new ConnectQos(global_)), 5);
229       e7.getConnectQos()->addClientProperty("X", 6);
230       queue_->put(e7);
231 
232       ConnectQueueEntry e8(global_, ConnectQosRef(new ConnectQos(global_)), 7);
233       e8.getConnectQos()->addClientProperty("X", 3);
234       queue_->put(e8);
235 
236       ConnectQueueEntry e9(global_, connQos, 1);
237       e9.getConnectQos()->addClientProperty("X", 9);    // MAX_PRIORITY
238       queue_->put(e9);
239 
240       vector<EntryType> ret = queue_->peekWithSamePriority();
241       // should be 3 entries with priority 7 
242       assertEquals(log_, me, (size_t)3, ret.size(), "1. number of priority 7 msg peeked must be correct.");
243 
244       const MsgQueueEntry &entry = *ret[0];
245       // TODO:
246       // [cc] \xmlBlaster\testsuite\src\c++\TestQueue.cpp(245) : warning C4541:
247       // 'dynamic_cast' used on polymorphic type 'org::xmlBlaster::util::queue::MsgQueueEntry' with /GR-;
248       // unpredictable behavior may result
249       //cout << "Trying dynamic cast" << endl;   // On _WINDOWS: /GR  to enable C++ RTTI didn't help (see build.xml)
250       const ConnectQueueEntry *connectQueueEntry = dynamic_cast<const ConnectQueueEntry*>(&entry);
251       assertEquals(log_, me, 1, connectQueueEntry->getConnectQos()->getClientProperty("X", -1), "2. checking the first entry.");
252       assertEquals(log_, me, 2, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "3. checking the second entry.");
253       assertEquals(log_, me, 3, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "4. checking the third entry.");
254 
255       assertEquals(log_, me, false, queue_->empty(), "5. there should still be entries in the queue.");
256       queue_->randomRemove(ret.begin(), ret.end());
257       ret = queue_->peekWithSamePriority();
258       assertEquals(log_, me, (size_t)3, ret.size(), "6. number of priority 7 msg peeked must be correct.");
259       assertEquals(log_, me, 4, dynamic_cast<const ConnectQueueEntry*>(&(*ret[0]))->getConnectQos()->getClientProperty("X", -1), "7. checking the first entry.");
260       assertEquals(log_, me, 5, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "8. checking the second entry.");
261       assertEquals(log_, me, 6, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "9. checking the third entry.");
262             
263       queue_->randomRemove(ret.begin(), ret.end());
264       ret = queue_->peekWithSamePriority();
265       assertEquals(log_, me, (size_t)3, ret.size(), "10. number of priority 7 msg peeked must be correct.");
266       assertEquals(log_, me, 7, dynamic_cast<const ConnectQueueEntry*>(&(*ret[0]))->getConnectQos()->getClientProperty("X", -1), "11. checking the first entry.");
267       assertEquals(log_, me, 8, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "12. checking the second entry.");
268       assertEquals(log_, me, 9, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "13. checking the third entry.");
269       queue_->randomRemove(ret.begin(), ret.end());
270       assertEquals(log_, me, true, queue_->empty(), "14. the queue should be empty now.");
271       log_.info(me, "test ended successfully");
272    }
273 
274 
275    void testMaxNumOfEntries()
276    {
277       string me = ME + "::testMaxNumOfEntries";
278       log_.info(me, "");
279       log_.info(me, "this test checks that an excess of entries really throws an exception");
280       ClientQueueProperty prop(global_, "");
281       prop.setMaxEntries(10);
282       queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
283       ConnectQosRef connQos = new ConnectQos(global_);
284       connQos->setPersistent(false);
285       int i=0;
286       try {
287          for (i=0; i < 10; i++) {
288             if (i == 5) connQos->setPersistent(true);
289             ConnectQueueEntry entry(global_, connQos);
290             queue_->put(entry);
291          }
292          log_.info(me, "1. putting entries inside the queue: OK");      
293       }
294       catch (const XmlBlasterException &/*ex*/) {
295          log_.error(me, "1. putting entries inside the queue: FAILED could not put inside the queue the entry nr. " + lexical_cast<string>(i));      
296          assert(0);
297       }
298       try {
299          ConnectQueueEntry entry(global_, connQos);
300          queue_->put(entry);
301          log_.error(me, "2. putting entries inside the queue: FAILED should have thrown an exception");      
302          assert(0);
303       }
304       catch (const XmlBlasterException &ex) {
305          assertEquals(log_, me, ex.getErrorCodeStr(), string("resource.overflow.queue.entries"), "3. checking that exceeding number of entries throws the correct exception.");
306          queue_->clear();
307       }
308       log_.info(me, "test ended successfully");
309    }
310 
311 
312    void testMaxNumOfBytes()
313    {
314       string me = ME + "::testMaxNumOfBytes";
315       log_.info(me, "");
316       log_.info(me, "this test checks that an excess of size in bytes really throws an exception");
317       ClientQueueProperty prop(global_, "");
318       ConnectQos *connQos = new ConnectQos(global_);
319       ConnectQueueEntry entry(global_, connQos);
320       size_t maxBytes = 10 * entry.getSizeInBytes();
321       prop.setMaxBytes(maxBytes);
322       queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
323 
324       assertEquals(log_, me, maxBytes, (int)queue_->getMaxNumOfBytes(), "Setting maxNumOfBytes");
325 
326       int i=0;
327       try {
328          for (i=0; i < 10; i++) {
329             ConnectQueueEntry ent(global_, connQos);
330             log_.trace(me, "Putting entry " + lexical_cast<string>(i) + " to queue, size=" + lexical_cast<string>(ent.getSizeInBytes()));
331             queue_->put(ent);
332          }
333          log_.info(me, "1. putting entries inside the queue: OK");      
334       }
335       catch (const XmlBlasterException &/*ex*/) {
336          log_.error(me, "1. putting entries inside the queue: FAILED could not put inside the queue the entry no. " + lexical_cast<string>(i) +
337                         /*", entryBytes=" + lexical_cast<string>(entry->getNumOfBytes()) +*/
338                         ", numOfEntries=" + lexical_cast<string>(queue_->getNumOfEntries()) +
339                         ", numOfBytes=" + lexical_cast<string>(queue_->getNumOfBytes()) +
340                       " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes()));
341          assert(0);
342       }
343       try {
344          ConnectQueueEntry ent(global_, connQos);
345          queue_->put(ent);
346          log_.error(me, string("2. putting entries inside the queue: FAILED should have thrown an exception currQueueByte=") + 
347                       lexical_cast<string>(queue_->getNumOfBytes()) +
348                       " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes()));
349          assert(0);
350       }
351       catch (const XmlBlasterException &ex) {
352          assertEquals(log_, me, ex.getErrorCodeStr(), string("resource.overflow.queue.bytes"),
353                       string("3. checking that exceeding number of entries throws the correct exception. numOfBytes=") + 
354                       lexical_cast<string>(queue_->getNumOfBytes()) +
355                       " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes()));
356       }
357       log_.info(me, "test ended successfully");
358    }
359 
360    void setUp() 
361    {
362       destroyQueue(); // Destroy old queue
363    }
364 
365    void tearDown() {
366       if (queue_) {
367          QueueFactory::getFactory().releasePlugin(queue_);
368          queue_ = NULL;
369       }
370    }
371 };
372    
373 }}} // namespace
374 
375 
376 using namespace org::xmlBlaster::test;
377 
378 /** Compile:  build -DexeName=TestQueue cpp-test-single */
379 int main(int args, char *argc[]) 
380 {
381    org::xmlBlaster::util::Object_Lifetime_Manager::init();
382 
383    try {
384       Global& glob = Global::getInstance();
385       glob.initialize(args, argc);
386 
387       TestQueue testObj = TestQueue(glob, "TestQueue");
388 
389       for (std::vector<string>::size_type i=0; i < testObj.types.size(); i++) {
390          glob.getProperty().setProperty("queue/connection/type", testObj.types[i], true);
391          std::cout << "Testing queue type '" << glob.getProperty().get("queue/connection/type", string("eRRoR")) << "'" << std::endl;
392 
393          testObj.setUp();
394          testObj.testPublishCompare();
395          testObj.tearDown();
396 
397          testObj.setUp();
398          testObj.testConnectCompare();
399          testObj.setUp();
400          testObj.tearDown();
401 
402          testObj.setUp();
403          testObj.testMixedCompare();
404          testObj.tearDown();
405 
406          testObj.setUp();
407          testObj.testWithOnePublishEntry();
408          testObj.tearDown();
409 
410          testObj.setUp();
411          testObj.testWithOneConnectEntry();
412          testObj.tearDown();
413 
414          testObj.setUp();
415          testObj.testOrder();
416          testObj.tearDown();
417 
418          testObj.setUp();
419          testObj.testMaxNumOfEntries();
420          testObj.tearDown();
421 
422          testObj.setUp();
423          testObj.testMaxNumOfBytes();
424          testObj.tearDown();
425       }
426    }
427    catch (const XmlBlasterException &e) {
428       std::cerr << "TestQueue FAILED: " << e.getMessage() << std::endl;
429       assert(0);
430 
431    }
432 
433    org::xmlBlaster::util::Object_Lifetime_Manager::fini();
434    return 0;
435 }


syntax highlighted by Code2HTML, v. 0.9.1