util/Timeout.cpp

Go to the documentation of this file.
00001 /*-----------------------------------------------------------------------------
00002 Name:      Timeout.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Allows you be called back after a given delay.
00006 -----------------------------------------------------------------------------*/
00007 #include <algorithm>
00008 #include <string>
00009 
00010 #include <util/Timeout.h>
00011 #include <util/lexical_cast.h>
00012 #include <util/Constants.h>
00013 #include <util/Global.h>
00014 
00015 namespace org { namespace xmlBlaster { namespace util {
00016 
00017 using namespace std;
00018 using namespace org::xmlBlaster::util::thread;
00019 
00020 Timeout::Timeout(Global& global)
00021    : Thread(), ME("Timeout"), threadName_("Timeout-Thread"),
00022      timeoutMap_(), isRunning_(false), isReady_(false),
00023      mapHasNewEntry_(false), isActive_(true),
00024      isDebug_(false), detached_(false), timestampFactory_(TimestampFactory::getInstance()),
00025      global_(global), log_(global.getLog("org.xmlBlaster.util")),
00026      invocationMutex_(), waitForTimeoutMutex_(), waitForTimeoutCondition_()
00027 {
00028    ME += "-Timeout-Thread-" + lexical_cast<std::string>(this);
00029    // the thread will only be instantiated when starting
00030    if (log_.call()) log_.call(ME, " default constructor");
00031    if (log_.trace()) log_.trace(ME, " default constructor: after creating timeout condition");
00032    start(detached_);
00033    if (log_.trace()) log_.trace(ME, " default constructor: after starting the thread");
00034 }
00035 
00036 Timeout::Timeout(Global& global, const string &name)
00037    : Thread(), ME("Timeout"), threadName_(name),
00038      timeoutMap_(), isRunning_(false), isReady_(false),
00039      mapHasNewEntry_(false), isActive_(true),
00040      isDebug_(false), detached_(false), timestampFactory_(TimestampFactory::getInstance()),
00041      global_(global), log_(global.getLog("org.xmlBlaster.util")),
00042      invocationMutex_(), waitForTimeoutMutex_(), waitForTimeoutCondition_()
00043 {
00044    // the thread remains uninitialized ...
00045    ME += "-" + name + "-" + lexical_cast<std::string>(this);
00046    if (log_.call()) log_.call(ME, " alternative constructor");
00047    start(detached_);
00048    if (log_.trace()) log_.trace(ME, " default constructor: after starting the thread");
00049 }
00050 
00051 Timeout::~Timeout() 
00052 {
00053    if (log_.call()) log_.call(ME, " destructor");
00054 
00055    shutdown();
00056 
00057    if (!detached_)
00058       join();
00059 
00060    if (isActive_) { /* Should never happen */
00061       for (int i=0; i<200; i++) {
00062          if (!isActive_) break;
00063          log_.warn(ME, "Waiting for timer thread to finish");
00064          //Thread::yield();
00065          Thread::sleep(10);
00066       }
00067    }
00068 }
00069 
00070 
00071 bool Timeout::start(bool detached) 
00072 {
00073    if (log_.call()) log_.call(ME, " start" + lexical_cast<string>(detached));
00074    isRunning_ = true;
00075    if (log_.trace()) log_.trace(ME, " before creating the running thread");
00076    Thread::start(detached);
00077 
00078    if (log_.trace()) log_.trace(ME, " start: waiting for the thread to be ready (waiting for the first timeout addition)");
00079    while (!isReady_) {
00080       Thread::sleep(5);
00081    }
00082    if (log_.trace()) log_.trace(ME, " start: running thread created and ready");
00083    return true;
00084 }
00085 
00086 void Timeout::join() 
00087 {
00088    Thread::join();
00089    if (log_.trace()) log_.trace(ME, " start: running thread joined (i.e. thread started)");
00090 }
00091 
00092 Timestamp Timeout::addTimeoutListener(I_Timeout *listener, long delay, void *userData) 
00093 {
00094    if (!isRunning_) 
00095       throw org::xmlBlaster::util::XmlBlasterException(USER_WRONG_API_USAGE, "", ME + ".addTimeoutListener", "en", "The timer is not running");
00096 
00097    //if (log_.call()) log_.call(ME, " addTimeoutListener");
00098    Timestamp key = 0;
00099    if (delay < 1) log_.error(ME, ": addTimeoutListener with delay = " + lexical_cast<std::string>(delay));
00100 
00101    {
00102       Lock lock(invocationMutex_);
00103       while (true) {
00104          key = timestampFactory_.getTimestamp() + Constants::MILLION * delay;
00105          TimeoutMap::iterator iter = timeoutMap_.find(key);
00106          if (iter == timeoutMap_.end()) {
00107             if (log_.trace()) log_.trace(ME, "addTimeoutListener, adding key: " + lexical_cast<std::string>(key));
00108             Container cont(listener, userData);
00109             TimeoutMap::value_type el(key, cont);
00110             timeoutMap_.insert(el);
00111             mapHasNewEntry_ = true;
00112             break;
00113          }
00114       }
00115    }
00116 
00117    if (log_.trace()) log_.trace(ME, "addTimeoutListener, going to notify");
00118    Lock waitForTimeoutLock(waitForTimeoutMutex_);
00119    waitForTimeoutCondition_.notify();
00120    //if (log_.trace()) log_.trace(ME, "addTimeoutListener, successfully notified");
00121    return key;
00122 }
00123 
00124 Timestamp Timeout::refreshTimeoutListener(Timestamp key, long delay) 
00125 {
00126    if (log_.call()) log_.call(ME, " refreshTimeoutListener");
00127 
00128    if (!isRunning_) 
00129       throw org::xmlBlaster::util::XmlBlasterException(USER_WRONG_API_USAGE, "", ME + ".refreshTimeoutListener", "en", "The timer is not running");
00130 
00131    if (key < 0)
00132       throw org::xmlBlaster::util::XmlBlasterException(INTERNAL_ILLEGALARGUMENT, "", ME + ".refreshTimeoutListener", "en", "In Timeout.cpp refreshTimeoutListener() is key < 0");
00133 
00134    I_Timeout *callback = 0;
00135    void *userData = 0;
00136    {
00137       Lock lock(invocationMutex_);
00138       TimeoutMap::iterator iter = timeoutMap_.find(key);
00139       if (iter == timeoutMap_.end()) {
00140          if (log_.trace()) log_.trace(ME, "The timeout handle '" + lexical_cast<std::string>(key) + "' is unknown, no timeout refresh done");
00141          return -1;
00142       }
00143       callback = (*iter).second.first;
00144       userData = (*iter).second.second;
00145       timeoutMap_.erase(key);
00146    }
00147    return addTimeoutListener(callback, delay, userData);
00148 }
00149 
00150 Timestamp Timeout::addOrRefreshTimeoutListener(I_Timeout *listener, long delay, void *userData, Timestamp key) 
00151 {
00152    if (log_.call()) log_.call(ME, " addOrRefreshTimeoutListener");
00153    if (key <= 0) return addTimeoutListener(listener, delay, userData);
00154    key = refreshTimeoutListener(key, delay);
00155    if (key <= 0) return addTimeoutListener(listener, delay, userData);
00156    return key;
00157 }
00158 
00159 void Timeout::removeTimeoutListener(Timestamp key) 
00160 {
00161    if (log_.call()) log_.call(ME, " removeTimeoutListener");
00162    Lock lock(invocationMutex_);
00163    timeoutMap_.erase(key);
00164 }
00165 
00166 bool Timeout::isExpired(Timestamp key) 
00167 {
00168    if (log_.call()) log_.call(ME, " isExpired");
00169    Lock lock(invocationMutex_);
00170    return (timeoutMap_.find(key) == timeoutMap_.end());
00171 }
00172 
00173 long Timeout::spanToTimeout(Timestamp key) 
00174 {
00175    if (log_.call()) log_.call(ME, " spanToTimeout");
00176    Lock lock(invocationMutex_);
00177    TimeoutMap::iterator iter = timeoutMap_.find(key);
00178    if (iter == timeoutMap_.end()) return -1;
00179    Timestamp currentTimestamp = timestampFactory_.getTimestamp();
00180    return getTimeout(key) - (long)(currentTimestamp / Constants::MILLION);
00181 }
00182 
00183 long Timeout::getTimeout(Timestamp key) 
00184 {
00185    if (log_.call()) log_.call(ME, " getTimeout");
00186    if (key < 0) return -1;
00187    return (long)(key / Constants::MILLION);
00188 }
00189 
00190 void Timeout::removeAll() 
00191 {
00192    if (log_.call()) log_.call(ME, " removeAll");
00193    Lock lock(invocationMutex_);
00194    timeoutMap_.clear();
00195 }
00196 
00197 void Timeout::shutdown() 
00198 {
00199    if (log_.call()) log_.call(ME, " shutdown");
00200    isRunning_ = false;
00201    removeAll();
00202    Lock waitForTimeoutLock(waitForTimeoutMutex_);
00203    waitForTimeoutCondition_.notify();
00204 }
00205 
00206 
00207 size_t Timeout::getTimeoutMapSize()
00208 {
00209    Lock lock(invocationMutex_);
00210    return timeoutMap_.size();
00211 }
00212 
00213 
00214 void Timeout::run()
00215 {
00216    if (log_.call()) log_.call(ME, " run()");
00217    isActive_ = true;
00218 
00219    Container *container = NULL;
00220    Container tmpContainer;
00221 
00222    try {
00223       while (isRunning_) {
00224 
00225          if (log_.trace()) log_.trace(ME, " run(): is running");
00226          Timestamp delay = 100000 * Constants::MILLION; // sleep veeery long
00227 
00228          {
00229             Lock lock(invocationMutex_);
00230 
00231             TimeoutMap::iterator iter = timeoutMap_.begin();
00232             if (iter == timeoutMap_.end()) {
00233                if (log_.trace()) log_.trace(ME, "No timer is registered, nothing to do");
00234             }
00235             else {
00236                if (log_.trace()) log_.trace(ME, " The timeout is not empty");
00237                Timestamp nextWakeup = (*iter).first;
00238                if (log_.trace()) log_.trace(ME, "run, next event (Timestamp): " + lexical_cast<std::string>(nextWakeup) + " ns");
00239                delay = nextWakeup - timestampFactory_.getTimestamp();
00240 
00241                if (log_.trace()) log_.trace(ME, "run, delay       : " + lexical_cast<std::string>(delay) + " ns");
00242                if ( delay < 0 ) delay = 0;
00243 
00244                if (delay <= 0) {
00245                   tmpContainer = (*iter).second;
00246                   timeoutMap_.erase((*iter).first);
00247                   container = &tmpContainer;
00248                   if (log_.trace()) log_.trace(ME, "Timeout occurred, calling listener with real time error of " + lexical_cast<std::string>(delay) + " nanos");
00249                }
00250             }
00251             mapHasNewEntry_ = false;
00252          }
00253          // must be outside the sync
00254          if (container != NULL) {
00255              (container->first)->timeout(container->second);
00256              container = NULL;
00257          }
00258          Timestamp milliDelay = delay / Constants::MILLION;
00259          if (milliDelay > 0) {
00260             if (log_.trace()) log_.trace(ME, "sleeping ... " + lexical_cast<std::string>(milliDelay) + " milliseconds");
00261             Lock waitForTimeoutLock(waitForTimeoutMutex_);
00262             if (!mapHasNewEntry_) {
00263                isReady_ = true;
00264                if (!isRunning_) break;
00265                waitForTimeoutCondition_.wait(waitForTimeoutLock, (long)milliDelay);
00266                //if (log_.trace()) log_.trace(ME, "waking up ... ");
00267             }
00268          }
00269       }
00270       if (log_.trace()) log_.trace(ME, "The running thread is exiting");
00271    }
00272    catch (const std::exception &e) {
00273       log_.error(ME, string("The running thread is exiting: ") + e.what());
00274    }
00275    catch (...) {
00276       log_.error(ME, "The running thread is exiting with an unknown exception");
00277    }
00278    isActive_ = false;
00279 }
00280 
00281 }}} // namespaces
00282