util/queue/RamQueuePlugin.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      RamQueuePlugin.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 ------------------------------------------------------------------------------*/
00006 
00007 #include <util/queue/RamQueuePlugin.h>
00008 #include <util/XmlBlasterException.h>
00009 #include <util/Global.h>
00010 
00011 using namespace std;
00012 using namespace org::xmlBlaster::util;
00013 using namespace org::xmlBlaster::util::thread;
00014 using namespace org::xmlBlaster::util::qos::storage;
00015 
00016 namespace org { namespace xmlBlaster { namespace util { namespace queue {
00017 
00018 RamQueuePlugin::RamQueuePlugin(Global& global, const ClientQueueProperty& property)
00019    : ME("RamQueuePlugin"), 
00020      global_(global), 
00021      log_(global.getLog("org.xmlBlaster.util.queue")), 
00022      property_(property), 
00023      storage_(), 
00024      accessMutex_()
00025 {
00026    numOfBytes_ = 0;
00027    log_.info(ME, "Created queue [" + getType() + "][" + getVersion() + "]");
00028 }
00029 
00030 RamQueuePlugin::RamQueuePlugin(const RamQueuePlugin& queue)
00031    : ME("RamQueuePlugin"), 
00032      global_(queue.global_), 
00033      log_(queue.log_), 
00034      property_(queue.property_), 
00035      storage_(queue.storage_), 
00036      accessMutex_()
00037 {
00038    numOfBytes_ = queue.numOfBytes_;
00039 }
00040 
00041 RamQueuePlugin& RamQueuePlugin::operator =(const RamQueuePlugin& queue)
00042 {
00043    Lock lock(queue.accessMutex_);
00044    property_   = queue.property_;
00045    storage_    = queue.storage_;
00046    numOfBytes_ = queue.numOfBytes_;
00047    return *this;
00048 
00049 }
00050 
00051 RamQueuePlugin::~RamQueuePlugin()
00052 {
00053    if (log_.call()) log_.call(ME, "destructor");
00054    if (!storage_.empty()) {
00055       Lock lock(accessMutex_);
00056       storage_.erase(storage_.begin(), storage_.end());
00057    }
00058 } 
00059 
00060 void RamQueuePlugin::put(const MsgQueueEntry &entry)
00061 {
00062    if (log_.call()) log_.call(ME, "::put");
00063    if (log_.dump()) log_.dump(ME, string("::put, the entry is: ")  + entry.toXml());
00064 
00065    Lock lock(accessMutex_);
00066    if (numOfBytes_+entry.getSizeInBytes() > ((size_t)property_.getMaxBytes()) ) {
00067       throw XmlBlasterException(RESOURCE_OVERFLOW_QUEUE_BYTES, ME + "::put", "client queue");
00068    }
00069 
00070    if (storage_.size() >= (size_t)property_.getMaxEntries() ) {
00071       throw XmlBlasterException(RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME + "::put", "client queue");
00072    }
00073    try {
00074       const EntryType help(*entry.getClone());
00075       storage_.insert(help);
00076       numOfBytes_ += entry.getSizeInBytes();
00077       // add the sizeInBytes_ here ...
00078    }
00079    catch (exception& ex) {
00080       throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::put", ex.what());
00081    }      
00082    catch (...) {
00083       throw XmlBlasterException(INTERNAL_UNKNOWN, ME + "::put", "the original type of this exception is unknown");
00084    }
00085 }
00086 
00087 const vector<EntryType> RamQueuePlugin::peekWithSamePriority(long maxNumOfEntries, long maxNumOfBytes) const
00088 {
00089    Lock lock(accessMutex_);
00090    vector<EntryType> ret;
00091    if (storage_.empty()) return ret;
00092    StorageType::const_iterator iter = storage_.begin();
00093    long numOfEntries = 0;
00094    long numOfBytes = 0;
00095    int referencePriority = (**iter).getPriority();
00096    while (iter != storage_.end()) {
00097       numOfBytes += (**iter).getSizeInBytes();
00098       numOfEntries++;
00099       if (numOfBytes > maxNumOfBytes && maxNumOfBytes > -1) break;
00100       if (numOfEntries > maxNumOfEntries && maxNumOfEntries > -1) break;
00101       if ((**iter).getPriority() != referencePriority ) break;
00102       EntryType entry = (*iter);
00103       ret.insert(ret.end(), entry); 
00104       iter++;
00105    }
00106    return ret;
00107 }
00108 
00109 
00110 long RamQueuePlugin::randomRemove(const vector<EntryType>::const_iterator &start, const vector<EntryType>::const_iterator &end) 
00111 {
00112    Lock lock(accessMutex_);
00113    if (start == end || storage_.empty()) return 0;
00114    vector<EntryType>::const_iterator iter = start;
00115    long count = 0;
00116    while (iter != end) {
00117       long entrySize = (*iter)->getSizeInBytes();
00118       if (storage_.empty()) return 0;
00119       string::size_type help = storage_.erase(*iter);
00120       if (help > 0) {
00121          count += help;
00122          numOfBytes_ -= help * entrySize;
00123       }
00124       iter++;
00125    }
00126    return count;
00127 }
00128 
00129 long RamQueuePlugin::getNumOfEntries() const
00130 {
00131    return storage_.size();
00132 }
00133 
00134 long RamQueuePlugin::getMaxNumOfEntries() const
00135 {
00136    return property_.getMaxEntries();
00137 }
00138 
00139 int64_t RamQueuePlugin::getNumOfBytes() const
00140 {
00141    return numOfBytes_;
00142 }
00143 
00144 int64_t RamQueuePlugin::getMaxNumOfBytes() const
00145 {
00146    return property_.getMaxBytes();
00147 }
00148 
00149 void RamQueuePlugin::clear()
00150 {
00151    Lock lock(accessMutex_);
00152    storage_.erase(storage_.begin(), storage_.end());
00153    numOfBytes_ = 0;
00154 }
00155 
00156 
00157 bool RamQueuePlugin::empty() const
00158 {
00159    return storage_.empty();
00160 }
00161 
00162 
00163 }}}} // namespace
00164 
00165 
00166