testsuite/src/c++/TestQueue.cpp

Go to the documentation of this file.
00001 /*-----------------------------------------------------------------------------
00002 Name:      TestQueue.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Testing the Timeout Features
00006 -----------------------------------------------------------------------------*/
00007 #include "TestSuite.h"
00008 #include <vector>
00009 #include <iostream>
00010 /*#include "tut.h"*/
00011 #include <util/queue/QueueFactory.h>
00012 #include <util/queue/I_Queue.h>
00013 #include <util/queue/PublishQueueEntry.h>
00014 #include <util/queue/ConnectQueueEntry.h>
00015 #include <util/queue/SubscribeQueueEntry.h>
00016 #include <util/queue/UnSubscribeQueueEntry.h>
00017 
00018 namespace org { namespace xmlBlaster { namespace test {
00019 
00020 using namespace std;
00021 using namespace org::xmlBlaster::util;
00022 using namespace org::xmlBlaster::util::qos;
00023 using namespace org::xmlBlaster::util::qos::storage;
00024 using namespace org::xmlBlaster::util::queue;
00025 using namespace org::xmlBlaster::client;
00026 using namespace org::xmlBlaster::client::qos;
00027 using namespace org::xmlBlaster::client::key;
00028 
00037 class TestQueue
00038 {
00039    
00040 private:
00041    string    ME;
00042    Global&   global_;
00043    I_Log&    log_;
00044    I_Queue* queue_;
00045 
00046 public:
00048    std::vector<string> types;
00049 
00050 public:
00051    TestQueue(Global& global, string name) : ME(name), global_(global), log_(global.getLog("test"))
00052    {
00053       queue_ = NULL;
00054       types.push_back("RAM");
00055       types.push_back("SQLite");
00056       types.push_back("CACHE");
00057    }
00058 
00059    virtual ~TestQueue() { }
00060 
00061    void destroyQueue() {
00062       ClientQueueProperty prop(global_, "");
00063       I_Queue *queue = &QueueFactory::getFactory().getPlugin(global_, prop);
00064       queue->destroy();
00065       QueueFactory::getFactory().releasePlugin(queue);
00066    }
00067 
00068    void testPublishCompare() 
00069    {
00070       string me = ME + "::testPublishCompare";
00071       log_.info(me, "");
00072       log_.info(me, "comparison test between PublishQueueEntry objects.");
00073 
00074       PublishKey pubKey(global_);
00075       PublishQos pubQos(global_);
00076       MessageUnit msgUnit(pubKey, string("comparison test"), pubQos);
00077       PublishQueueEntry entry1(global_, msgUnit);
00078       PublishQueueEntry entry2(global_, msgUnit);
00079       PublishQueueEntry entry3(global_, msgUnit, 2);
00080       PublishQueueEntry entry4(global_, msgUnit, 3);
00081       PublishQueueEntry entry5(global_, msgUnit, 1);
00082 
00083       assertEquals(log_, me, true, entry2 < entry1, "1. PublishQos compare 2 with 1");
00084       assertEquals(log_, me, true, entry3 < entry4, "2. PublishQos compare 3 with 4");
00085       assertEquals(log_, me, true, entry5 < entry4, "3. PublishQos compare 5 with 4");
00086 
00087       log_.info(me, "test ended successfully");
00088    }
00089 
00090 
00091    void testConnectCompare() 
00092    {
00093       string me = ME + "::testConnectCompare";
00094       log_.info(me, "");
00095       log_.info(me, "comparison test between ConnectQueueEntry objects.");
00096 
00097       ConnectQos *connectQos = new ConnectQos(global_);
00098       ConnectQueueEntry entry1(global_, connectQos);
00099       ConnectQueueEntry entry2(global_, connectQos);
00100       ConnectQueueEntry entry3(global_, connectQos, 2);
00101       ConnectQueueEntry entry4(global_, connectQos, 3);
00102       ConnectQueueEntry entry5(global_, connectQos, 1);
00103 
00104       assertEquals(log_, me, true, entry2 < entry1, "1. PublishQos compare 2 with 1");
00105       assertEquals(log_, me, true, entry3 < entry4, "2. PublishQos compare 3 with 4");
00106       assertEquals(log_, me, true, entry5 < entry4, "3. PublishQos compare 5 with 4");
00107 
00108       log_.info(me, "test ended successfully");
00109    }
00110 
00111    void testMixedCompare() 
00112    {
00113       string me = ME + "::testMixedCompare";
00114       log_.info(me, "");
00115       log_.info(me, "comparison test between PublishQueueEntry and ConnectQueueEntry objects.");
00116 
00117       PublishKey pubKey(global_);
00118       PublishQos pubQos(global_);
00119       MessageUnit msgUnit(pubKey, string("comparison test"), pubQos);
00120       ConnectQos *connectQos = new ConnectQos(global_);
00121 
00122       PublishQueueEntry entry1(global_, msgUnit, 2);
00123       ConnectQueueEntry entry2(global_, connectQos, 3);
00124       PublishQueueEntry entry3(global_, msgUnit, 1);
00125 
00126       ConnectQueueEntry entry4(global_, connectQos, 2);
00127       PublishQueueEntry entry5(global_, msgUnit, 3);
00128       ConnectQueueEntry entry6(global_, connectQos, 1);
00129 
00130       assertEquals(log_, me, true, entry1 < entry2, "1. Mixed compare 1 with 2");
00131       assertEquals(log_, me, true, entry3 < entry2, "2. Mixed compare 3 with 2");
00132 
00133       assertEquals(log_, me, true, entry4 < entry5, "3. Mixed compare 4 with 5");
00134       assertEquals(log_, me, true, entry6 < entry5, "4. Mixed compare 6 with 5");
00135 
00136       log_.info(me, "test completed successfully");
00137    }
00138 
00139 
00140    void testWithOnePublishEntry()
00141    {
00142       string me = ME + "::testWithOnePublishEntry";
00143       log_.info(me, "");
00144       log_.info(me, "this test creates a queue. The following checks are done:");
00145       ClientQueueProperty prop(global_, "");
00146       queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
00147       assertEquals(log_, me, true, queue_->empty(), "The queue must be empty after creation");
00148       assertEquals(log_, me, 0, queue_->getNumOfEntries(), "The queue must be empty after creation");
00149       PublishQos qos(global_);
00150       PublishKey key(global_);
00151       const string contentStr = "BlaBla";
00152       MessageUnit messageUnit(key, contentStr, qos);
00153       PublishQueueEntry entry(global_, messageUnit, messageUnit.getQos().getPriority());
00154       std::cout << "Putting " << entry.getUniqueId() << std::endl;
00155 
00156       queue_->put(entry);
00157       assertEquals(log_, me, false, queue_->empty(), " 2. the queue must contain entries after invoking put one time");
00158       assertEquals(log_, me, 1, queue_->getNumOfEntries(), " 2b. the queue must contain one entry after invoking put one time");
00159       
00160       vector<EntryType> ret = queue_->peekWithSamePriority();
00161       assertEquals(log_, me, (size_t)1, ret.size(), " 3. the number of entries peeked after one put must be 1");
00162       {
00163          const MsgQueueEntry &e = *ret[0];
00164          std::cout << "Peeking " << e.getUniqueId() << std::endl;
00165          assertEquals(log_, me, entry.getUniqueId(),  e.getUniqueId(), " 3. the uniqueId must be same");
00166          assertEquals(log_, me, entry.getPriority(),  e.getPriority(), " 3. the priority must be same");
00167       }
00168       long numDel = queue_->randomRemove(ret.begin(), ret.end());
00169       assertEquals(log_, me, (long)1, numDel, " 4. randomRemove must return 1 entry deleted");
00170       assertEquals(log_, me, true, queue_->empty(), " 5. after removing all entries (it was only 1 entry) the queue  must be empty");
00171       log_.info(me, "ends here. Test was successful.");
00172    }
00173 
00174 
00175    void testWithOneConnectEntry()
00176    {
00177       string me = ME + "::testWithOneEntry";
00178       log_.info(me, "");
00179       log_.info(me, "this test creates a queue. The following checks are done:");
00180       ClientQueueProperty prop(global_, "");
00181       queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
00182       assertEquals(log_, me, true, queue_->empty(), " 1. the queue must be empty after creation");
00183       ConnectQos *connQos = new ConnectQos(global_);
00184       ConnectQueueEntry entry(global_, connQos);
00185       queue_->put(entry);
00186       assertEquals(log_, me, false, queue_->empty(), " 2. the queue must contain entries after invoking put one time");
00187       vector<EntryType> ret = queue_->peekWithSamePriority();
00188       assertEquals(log_, me, (size_t)1, ret.size(), " 3. the number of entries peeked after one put must be 1");
00189       assertEquals(log_, me, (long)1, queue_->randomRemove(ret.begin(), ret.end()), " 4. randomRemove must return 1 entry deleted");
00190       assertEquals(log_, me, true, queue_->empty(), " 5. after removing all entries (it was only 1 entry) the queue  must be empty");
00191       log_.info(me, "ends here. Test was successful.");
00192    }
00193 
00194 
00195    void testOrder()
00196    {
00197       string me = ME + "::testOrder";
00198       log_.info(me, "");
00199       log_.info(me, "this test checks the order in which entries are returned from the queue");
00200       ClientQueueProperty prop(global_, "");
00201       queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
00202       ConnectQos *connQos = new ConnectQos(global_);
00203 
00204       ConnectQueueEntry e1(global_, ConnectQosRef(new ConnectQos(global_)), 1);
00205       e1.getConnectQos()->addClientProperty("X", 7);
00206       queue_->put(e1);
00207 
00208       ConnectQueueEntry e2(global_, ConnectQosRef(new ConnectQos(global_)), 5);  // NORM_PRIORITY
00209       e2.getConnectQos()->addClientProperty("X", 4);
00210       queue_->put(e2);
00211 
00212       ConnectQueueEntry e3(global_, ConnectQosRef(new ConnectQos(global_)), 7);
00213       e3.getConnectQos()->addClientProperty("X", 1);
00214       queue_->put(e3);
00215 
00216       ConnectQueueEntry e4(global_, ConnectQosRef(new ConnectQos(global_)), 7);
00217       e4.getConnectQos()->addClientProperty("X", 2);
00218       queue_->put(e4);
00219 
00220       ConnectQueueEntry e5(global_, ConnectQosRef(new ConnectQos(global_)), 1);  // MIN1_PRIORITY
00221       e5.getConnectQos()->addClientProperty("X", 8);
00222       queue_->put(e5);
00223 
00224       ConnectQueueEntry e6(global_, ConnectQosRef(new ConnectQos(global_)), 5);
00225       e6.getConnectQos()->addClientProperty("X", 5);
00226       queue_->put(e6);
00227 
00228       ConnectQueueEntry e7(global_, ConnectQosRef(new ConnectQos(global_)), 5);
00229       e7.getConnectQos()->addClientProperty("X", 6);
00230       queue_->put(e7);
00231 
00232       ConnectQueueEntry e8(global_, ConnectQosRef(new ConnectQos(global_)), 7);
00233       e8.getConnectQos()->addClientProperty("X", 3);
00234       queue_->put(e8);
00235 
00236       ConnectQueueEntry e9(global_, connQos, 1);
00237       e9.getConnectQos()->addClientProperty("X", 9);    // MAX_PRIORITY
00238       queue_->put(e9);
00239 
00240       vector<EntryType> ret = queue_->peekWithSamePriority();
00241       // should be 3 entries with priority 7 
00242       assertEquals(log_, me, (size_t)3, ret.size(), "1. number of priority 7 msg peeked must be correct.");
00243 
00244       const MsgQueueEntry &entry = *ret[0];
00245       // TODO:
00246       // [cc] \xmlBlaster\testsuite\src\c++\TestQueue.cpp(245) : warning C4541:
00247       // 'dynamic_cast' used on polymorphic type 'org::xmlBlaster::util::queue::MsgQueueEntry' with /GR-;
00248       // unpredictable behavior may result
00249       //cout << "Trying dynamic cast" << endl;   // On _WINDOWS: /GR  to enable C++ RTTI didn't help (see build.xml)
00250       const ConnectQueueEntry *connectQueueEntry = dynamic_cast<const ConnectQueueEntry*>(&entry);
00251       assertEquals(log_, me, 1, connectQueueEntry->getConnectQos()->getClientProperty("X", -1), "2. checking the first entry.");
00252       assertEquals(log_, me, 2, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "3. checking the second entry.");
00253       assertEquals(log_, me, 3, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "4. checking the third entry.");
00254 
00255       assertEquals(log_, me, false, queue_->empty(), "5. there should still be entries in the queue.");
00256       queue_->randomRemove(ret.begin(), ret.end());
00257       ret = queue_->peekWithSamePriority();
00258       assertEquals(log_, me, (size_t)3, ret.size(), "6. number of priority 7 msg peeked must be correct.");
00259       assertEquals(log_, me, 4, dynamic_cast<const ConnectQueueEntry*>(&(*ret[0]))->getConnectQos()->getClientProperty("X", -1), "7. checking the first entry.");
00260       assertEquals(log_, me, 5, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "8. checking the second entry.");
00261       assertEquals(log_, me, 6, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "9. checking the third entry.");
00262             
00263       queue_->randomRemove(ret.begin(), ret.end());
00264       ret = queue_->peekWithSamePriority();
00265       assertEquals(log_, me, (size_t)3, ret.size(), "10. number of priority 7 msg peeked must be correct.");
00266       assertEquals(log_, me, 7, dynamic_cast<const ConnectQueueEntry*>(&(*ret[0]))->getConnectQos()->getClientProperty("X", -1), "11. checking the first entry.");
00267       assertEquals(log_, me, 8, dynamic_cast<const ConnectQueueEntry*>(&(*ret[1]))->getConnectQos()->getClientProperty("X", -1), "12. checking the second entry.");
00268       assertEquals(log_, me, 9, dynamic_cast<const ConnectQueueEntry*>(&(*ret[2]))->getConnectQos()->getClientProperty("X", -1), "13. checking the third entry.");
00269       queue_->randomRemove(ret.begin(), ret.end());
00270       assertEquals(log_, me, true, queue_->empty(), "14. the queue should be empty now.");
00271       log_.info(me, "test ended successfully");
00272    }
00273 
00274 
00275    void testMaxNumOfEntries()
00276    {
00277       string me = ME + "::testMaxNumOfEntries";
00278       log_.info(me, "");
00279       log_.info(me, "this test checks that an excess of entries really throws an exception");
00280       ClientQueueProperty prop(global_, "");
00281       prop.setMaxEntries(10);
00282       queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
00283       ConnectQosRef connQos = new ConnectQos(global_);
00284       connQos->setPersistent(false);
00285       int i=0;
00286       try {
00287          for (i=0; i < 10; i++) {
00288             if (i == 5) connQos->setPersistent(true);
00289             ConnectQueueEntry entry(global_, connQos);
00290             queue_->put(entry);
00291          }
00292          log_.info(me, "1. putting entries inside the queue: OK");      
00293       }
00294       catch (const XmlBlasterException &/*ex*/) {
00295          log_.error(me, "1. putting entries inside the queue: FAILED could not put inside the queue the entry nr. " + lexical_cast<string>(i));      
00296          assert(0);
00297       }
00298       try {
00299          ConnectQueueEntry entry(global_, connQos);
00300          queue_->put(entry);
00301          log_.error(me, "2. putting entries inside the queue: FAILED should have thrown an exception");      
00302          assert(0);
00303       }
00304       catch (const XmlBlasterException &ex) {
00305          assertEquals(log_, me, ex.getErrorCodeStr(), string("resource.overflow.queue.entries"), "3. checking that exceeding number of entries throws the correct exception.");
00306          queue_->clear();
00307       }
00308       log_.info(me, "test ended successfully");
00309    }
00310 
00311 
00312    void testMaxNumOfBytes()
00313    {
00314       string me = ME + "::testMaxNumOfBytes";
00315       log_.info(me, "");
00316       log_.info(me, "this test checks that an excess of size in bytes really throws an exception");
00317       ClientQueueProperty prop(global_, "");
00318       ConnectQos *connQos = new ConnectQos(global_);
00319       ConnectQueueEntry entry(global_, connQos);
00320       size_t maxBytes = 10 * entry.getSizeInBytes();
00321       prop.setMaxBytes(maxBytes);
00322       queue_ = &QueueFactory::getFactory().getPlugin(global_, prop);
00323 
00324       assertEquals(log_, me, maxBytes, (int)queue_->getMaxNumOfBytes(), "Setting maxNumOfBytes");
00325 
00326       int i=0;
00327       try {
00328          for (i=0; i < 10; i++) {
00329             ConnectQueueEntry ent(global_, connQos);
00330             log_.trace(me, "Putting entry " + lexical_cast<string>(i) + " to queue, size=" + lexical_cast<string>(ent.getSizeInBytes()));
00331             queue_->put(ent);
00332          }
00333          log_.info(me, "1. putting entries inside the queue: OK");      
00334       }
00335       catch (const XmlBlasterException &/*ex*/) {
00336          log_.error(me, "1. putting entries inside the queue: FAILED could not put inside the queue the entry no. " + lexical_cast<string>(i) +
00337                         /*", entryBytes=" + lexical_cast<string>(entry->getNumOfBytes()) +*/
00338                         ", numOfEntries=" + lexical_cast<string>(queue_->getNumOfEntries()) +
00339                         ", numOfBytes=" + lexical_cast<string>(queue_->getNumOfBytes()) +
00340                       " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes()));
00341          assert(0);
00342       }
00343       try {
00344          ConnectQueueEntry ent(global_, connQos);
00345          queue_->put(ent);
00346          log_.error(me, string("2. putting entries inside the queue: FAILED should have thrown an exception currQueueByte=") + 
00347                       lexical_cast<string>(queue_->getNumOfBytes()) +
00348                       " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes()));
00349          assert(0);
00350       }
00351       catch (const XmlBlasterException &ex) {
00352          assertEquals(log_, me, ex.getErrorCodeStr(), string("resource.overflow.queue.bytes"),
00353                       string("3. checking that exceeding number of entries throws the correct exception. numOfBytes=") + 
00354                       lexical_cast<string>(queue_->getNumOfBytes()) +
00355                       " maxNumOfBytes=" + lexical_cast<string>(queue_->getMaxNumOfBytes()));
00356       }
00357       log_.info(me, "test ended successfully");
00358    }
00359 
00360    void setUp() 
00361    {
00362       destroyQueue(); // Destroy old queue
00363    }
00364 
00365    void tearDown() {
00366       if (queue_) {
00367          QueueFactory::getFactory().releasePlugin(queue_);
00368          queue_ = NULL;
00369       }
00370    }
00371 };
00372    
00373 }}} // namespace
00374 
00375 
00376 using namespace org::xmlBlaster::test;
00377 
00379 int main(int args, char *argc[]) 
00380 {
00381    org::xmlBlaster::util::Object_Lifetime_Manager::init();
00382 
00383    try {
00384       Global& glob = Global::getInstance();
00385       glob.initialize(args, argc);
00386 
00387       TestQueue testObj = TestQueue(glob, "TestQueue");
00388 
00389       for (std::vector<string>::size_type i=0; i < testObj.types.size(); i++) {
00390          glob.getProperty().setProperty("queue/connection/type", testObj.types[i], true);
00391          std::cout << "Testing queue type '" << glob.getProperty().get("queue/connection/type", string("eRRoR")) << "'" << std::endl;
00392 
00393          testObj.setUp();
00394          testObj.testPublishCompare();
00395          testObj.tearDown();
00396 
00397          testObj.setUp();
00398          testObj.testConnectCompare();
00399          testObj.setUp();
00400          testObj.tearDown();
00401 
00402          testObj.setUp();
00403          testObj.testMixedCompare();
00404          testObj.tearDown();
00405 
00406          testObj.setUp();
00407          testObj.testWithOnePublishEntry();
00408          testObj.tearDown();
00409 
00410          testObj.setUp();
00411          testObj.testWithOneConnectEntry();
00412          testObj.tearDown();
00413 
00414          testObj.setUp();
00415          testObj.testOrder();
00416          testObj.tearDown();
00417 
00418          testObj.setUp();
00419          testObj.testMaxNumOfEntries();
00420          testObj.tearDown();
00421 
00422          testObj.setUp();
00423          testObj.testMaxNumOfBytes();
00424          testObj.tearDown();
00425       }
00426    }
00427    catch (const XmlBlasterException &e) {
00428       std::cerr << "TestQueue FAILED: " << e.getMessage() << std::endl;
00429       assert(0);
00430 
00431    }
00432 
00433    org::xmlBlaster::util::Object_Lifetime_Manager::fini();
00434    return 0;
00435 }
00436 
00437