util/queue/CacheQueuePlugin.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      CacheQueuePlugin.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 ------------------------------------------------------------------------------*/
00006 #include <util/queue/CacheQueuePlugin.h>
00007 #include <util/queue/QueueFactory.h>
00008 #include <util/XmlBlasterException.h>
00009 #include <util/Global.h>
00010 #ifdef XMLBLASTER_PERSISTENT_QUEUE // to compile on Windows
00011 #  include <util/queue/SQLiteQueuePlugin.h> // temporary for usage -> remove again
00012 #endif
00013 
00014 using namespace std;
00015 using namespace org::xmlBlaster::util;
00016 using namespace org::xmlBlaster::util::thread;
00017 using namespace org::xmlBlaster::util::qos::storage;
00018 
00019 namespace org { namespace xmlBlaster { namespace util { namespace queue {
00020 
00021 CacheQueuePlugin::CacheQueuePlugin(org::xmlBlaster::util::Global& global, const org::xmlBlaster::util::qos::storage::ClientQueueProperty& property)
00022    : ME("CacheQueuePlugin"), 
00023      global_(global), 
00024      log_(global.getLog("org.xmlBlaster.util.queue")), 
00025      property_(property), 
00026      transientQueueP_(0), 
00027      persistentQueueP_(0), 
00028      accessMutex_()
00029 {
00030    // TODO: type/version should be set from outside!!!
00031 
00032    transientQueueP_ = &QueueFactory::getFactory().getPlugin(global_, property, "RAM", "1.0");
00033 
00034    try {
00035       persistentQueueP_ = &QueueFactory::getFactory().getPlugin(global_, property, "SQLite", "1.0");
00036 
00037       // Note: On startup we can only load the highest priority in a bulk, peekWithSamePriority() does not support to get all!
00038       reloadFromPersistentStore();
00039    }
00040    catch (const XmlBlasterException &e) {
00041       log_.warn(ME, "No persistent queue is available, we continue RAM based. Reason: " + e.getMessage());
00042    }
00043    log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "]");
00044 }
00045 
00046 /*
00047 CacheQueuePlugin::CacheQueuePlugin(const CacheQueuePlugin& queue)
00048    : ME("CacheQueuePlugin"), 
00049      global_(queue.global_), 
00050      log_(queue.log_), 
00051      property_(queue.property_), 
00052      storage_(queue.storage_), 
00053      accessMutex_()
00054 {
00055    numOfBytes_ = queue.numOfBytes_;
00056 }
00057 
00058 CacheQueuePlugin& CacheQueuePlugin::operator =(const CacheQueuePlugin& queue)
00059 {
00060    Lock lock(queue.accessMutex_);
00061    property_   = queue.property_;
00062    storage_    = queue.storage_;
00063    numOfBytes_ = queue.numOfBytes_;
00064    return *this;
00065 
00066 }
00067 */
00068 
00069 CacheQueuePlugin::~CacheQueuePlugin()
00070 {
00071    if (log_.call()) log_.call(ME, "destructor");
00072    QueueFactory::getFactory().releasePlugin(transientQueueP_);
00073    if (persistentQueueP_) QueueFactory::getFactory().releasePlugin(persistentQueueP_);
00074 } 
00075 
00076 void CacheQueuePlugin::put(const MsgQueueEntry &entry)
00077 {
00078    if (log_.call()) log_.call(ME, "::put");
00079 
00080    Lock lock(accessMutex_);
00081    transientQueueP_->put(entry);
00082    if (persistentQueueP_) {
00083       if (entry.isPersistent()) {
00084          try {
00085            persistentQueueP_->put(entry);
00086          }
00087          catch (const XmlBlasterException &e) {
00088             log_.warn(ME, "Ignoring problem to put entry into persistent queue, we are handling it transient: " + e.getMessage());
00089          }
00090       }
00091    }
00092 }
00093 
00094 long CacheQueuePlugin::reloadFromPersistentStore() const
00095 {
00096    if (persistentQueueP_ && transientQueueP_->getNumOfEntries() == 0 && persistentQueueP_->getNumOfEntries() > 0) {
00097       // On startup shuffle them to the transient queue (only the highest priority is accessible with our I_Queue API)
00098       const vector<EntryType> vec = persistentQueueP_->peekWithSamePriority(-1, -1);
00099       long count = 0;
00100       vector<EntryType>::const_iterator iter = vec.begin();
00101       for (; iter != vec.end(); ++iter) {
00102          const EntryType &entryType = (*iter);
00103          transientQueueP_->put(*entryType);
00104          count++;
00105       }
00106       return count;
00107    }
00108    return 0;
00109 }
00110 
00111 const vector<EntryType> CacheQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const
00112 {
00113    Lock lock(accessMutex_);
00114    vector<EntryType> vec = transientQueueP_->peekWithSamePriority(maxNumOfEntries, maxNumOfBytes);
00115 
00116    if (vec.size() == 0) {
00117       long count = reloadFromPersistentStore();
00118       if (count > 0) {
00119          return transientQueueP_->peekWithSamePriority(maxNumOfEntries, maxNumOfBytes);
00120       }
00121    }
00122 
00123    return vec;
00124 }
00125 
00126 
00127 long CacheQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end) 
00128 {
00129    Lock lock(accessMutex_);
00130    long count = transientQueueP_->randomRemove(start, end);
00131 
00132    if (persistentQueueP_) {
00133       vector<EntryType> persistents;
00134       vector<EntryType>::const_iterator iter = start;
00135       while (iter != end) {
00136          const EntryType &entryType = (*iter);
00137          if (entryType->isPersistent()) {
00138             persistents.push_back(entryType);
00139          }
00140          iter++;
00141       }
00142       try {
00143          persistentQueueP_->randomRemove(persistents.begin(), persistents.end());
00144       }
00145       catch (const XmlBlasterException &e) {
00146          log_.warn(ME, "Ignoring problem to remove entry from persistent queue, we remove it from the transient queue only: " + e.getMessage());
00147       }
00148    }
00149    return count;
00150 }
00151 
00152 long CacheQueuePlugin::getNumOfEntries() const
00153 {
00154    return transientQueueP_->getNumOfEntries();
00155 }
00156 
00157 long CacheQueuePlugin::getMaxNumOfEntries() const
00158 {
00159    return transientQueueP_->getMaxNumOfEntries();
00160 }
00161 
00162 int64_t CacheQueuePlugin::getNumOfBytes() const
00163 {
00164    return transientQueueP_->getNumOfBytes();
00165 }
00166 
00167 int64_t CacheQueuePlugin::getMaxNumOfBytes() const
00168 {
00169    return transientQueueP_->getMaxNumOfBytes();
00170 }
00171 
00172 void CacheQueuePlugin::clear()
00173 {
00174    Lock lock(accessMutex_);
00175    transientQueueP_->clear();
00176    if (persistentQueueP_) {
00177       try {
00178          persistentQueueP_->clear();
00179       }
00180       catch (const XmlBlasterException &e) {
00181          log_.warn(ME, "Ignoring problem to put entry into persistent queue, we are handling it transient: " + e.getMessage());
00182       }
00183    }
00184 }
00185 
00186 bool CacheQueuePlugin::empty() const
00187 {
00188    return transientQueueP_->empty();
00189 }
00190 
00191 void CacheQueuePlugin::destroy()
00192 {
00193    transientQueueP_->destroy();
00194    if (persistentQueueP_) {
00195       try {
00196          persistentQueueP_->destroy();
00197       }
00198       catch (const XmlBlasterException &e) {
00199          log_.warn(ME, "Ignoring problem to destroy the persistent queue: " + e.getMessage());
00200       }
00201    }
00202 }
00203 
00204 string CacheQueuePlugin::usage()
00205 {
00206    std::string text = string("");
00207    text += string("\nThe CACHE queue plugin configuration:");
00208 #ifdef XMLBLASTER_PERSISTENT_QUEUE // to compile on Windows
00209    text += SQLiteQueuePlugin::usage();   // TODO: depending on persistency
00210 #else
00211    text += ClientQueueProperty::usage();
00212 #endif
00213    return text;
00214 }
00215 }}}} // namespace
00216 
00217 
00218