util/dispatch/ConnectionsHandler.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      ConnectionsHandler.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Handles the I_XmlBlasterConnections 
00006 ------------------------------------------------------------------------------*/
00007 
00008 #include <util/dispatch/ConnectionsHandler.h>
00009 #include <util/Global.h>
00010 #include <util/Timeout.h>
00011 #include <util/Timestamp.h>
00012 #include <util/Constants.h>
00013 #include <util/lexical_cast.h>
00014 #include <util/queue/QueueFactory.h>
00015 #include <util/queue/PublishQueueEntry.h>
00016 #include <util/queue/ConnectQueueEntry.h>
00017 #include <util/queue/SubscribeQueueEntry.h>
00018 
00019 namespace org { namespace xmlBlaster { namespace util { namespace dispatch {
00020 
00021 using namespace std;
00022 using namespace org::xmlBlaster::client::protocol;
00023 using namespace org::xmlBlaster::client;
00024 using namespace org::xmlBlaster::util;
00025 using namespace org::xmlBlaster::util::qos;
00026 using namespace org::xmlBlaster::util::thread;
00027 using namespace org::xmlBlaster::util::qos::storage;
00028 using namespace org::xmlBlaster::util::queue;
00029 using namespace org::xmlBlaster::client::qos;
00030 using namespace org::xmlBlaster::client::key;
00031 
00032 ConnectionsHandler::ConnectionsHandler(org::xmlBlaster::util::Global& global,
00033                                        const string& instanceName)
00034    : ME(string("ConnectionsHandler-") + instanceName), 
00035      connectQos_((ConnectQos*)0),
00036      connectReturnQos_((ConnectReturnQos*)0),
00037      status_(START), 
00038      global_(global), 
00039      log_(global.getLog("org.xmlBlaster.util.dispatch")),
00040      connectMutex_(),
00041      publishMutex_(),
00042      postSendListener_(0),
00043      instanceName_(instanceName)
00044 {
00045    ClientQueueProperty prop(global_, "");
00046    connectionProblemsListener_ = NULL;
00047    connection_         = NULL;
00048    queue_              = NULL;
00049    retries_            = -1;
00050    currentRetry_       = 0;
00051    pingPollTimerKey_   = 0;
00052    doStopPing_         = false;
00053    if (log_.call()) log_.call(ME, "constructor");
00054 }
00055 
00056 ConnectionsHandler::~ConnectionsHandler()
00057 {
00058    if (log_.call()) log_.call(ME, "destructor");
00059    if (pingPollTimerKey_ != 0) {
00060       global_.getPingTimer().removeTimeoutListener(pingPollTimerKey_);
00061       pingPollTimerKey_ = 0;
00062    }
00063    doStopPing_ = true;
00064    /*
00065    while (pingIsStarted_) {
00066       Thread::sleep(200);
00067    }
00068    */
00069    Lock lock(connectMutex_);
00070    string type = (connectQos_.isNull()) ? org::xmlBlaster::util::Global::getDefaultProtocol() : connectQos_->getAddress()->getType(); // "SOCKET"
00071    string version = "1.0"; // currently hardcoded
00072    if (connection_) {
00073       global_.getDispatchManager().releasePlugin(instanceName_, type, version);
00074       connection_ = NULL;
00075    }
00076    if ( queue_ ) {
00077       delete queue_;
00078       queue_ = NULL;
00079    }
00080    if (log_.trace()) log_.trace(ME, "destructor: going to delete the connectQos");
00081    status_ = END;
00082    if (log_.trace()) log_.trace(ME, "destructor ended");
00083 } 
00084 
00085 
00086 ConnectReturnQosRef ConnectionsHandler::connect(const ConnectQosRef& qos)
00087 {
00088    if (log_.call()) log_.call(ME, string("::connect status is '") + lexical_cast<std::string>(status_) + "'");
00089    if (qos.isNull()) {
00090       throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::connect", "your connectQos is null");
00091    }
00092    if (log_.dump()) log_.dump(ME, string("::connect, the qos is: ") + qos->toXml());
00093    Lock lock(connectMutex_);
00094    if (isConnected()) {
00095       log_.warn(ME, "connect: you are already connected");
00096       return connectReturnQos_;
00097    }
00098 
00099    connectQos_ = qos;
00100 
00101    global_.setSessionName(connectQos_->getSessionQos().getSessionName());
00102    global_.setImmutableId(connectQos_->getSessionQos().getRelativeName());
00103    global_.setId(connectQos_->getSessionQos().getAbsoluteName()); // temporary
00104    //log_.info(ME, "BEFORE id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName());
00105 
00106    retries_ = connectQos_->getAddress()->getRetries();
00107    long pingInterval = connectQos_->getAddress()->getPingInterval();
00108    if (log_.trace()) {
00109       log_.trace(ME, string("connect: number of retries during communication failure: ") + lexical_cast<std::string>(retries_));
00110       log_.trace(ME, string("connect: Ping Interval: ") + lexical_cast<std::string>(pingInterval));
00111    }
00112 
00113    string type = connectQos_->getAddress()->getType();
00114    string version = "1.0"; // currently hardcoded
00115    if (!connection_) {
00116       connection_ = &(global_.getDispatchManager().getPlugin(instanceName_, type, version));
00117    }
00118 
00119    try {
00120       connectReturnQos_ = connection_->connect(*connectQos_);
00121       global_.setSessionName(connectReturnQos_->getSessionQos().getSessionName());
00122       // For "joe/1" it remains immutable; For "joe" there is added the server side generated sessionId "joe/-33":
00123       global_.setImmutableId(connectReturnQos_->getSessionQos().getRelativeName());
00124       global_.setId(connectReturnQos_->getSessionQos().getAbsoluteName());
00125                 //log_.info(ME, "AFTER id=" + global_.getId() + " immutable=" + global_.getImmutableId() + " sessionName=" + global_.getSessionName()->getAbsoluteName());
00126    }
00127    catch (XmlBlasterException &ex) {
00128       if ((ex.isCommunication() || ex.getErrorCodeStr().find("user.configuration") == 0)) {
00129          log_.warn(ME, "Got exception when connecting, polling now: " + ex.toString());
00130          if (pingPollTimerKey_ == 0)
00131             startPinger(false);
00132          return queueConnect();
00133       }
00134       else {
00135          if (log_.trace()) log_.trace(ME, string("the exception in connect is ") + ex.toXml());
00136          throw ex;
00137       }
00138    }                                                                                                                                                                                                                                                                                    
00139    
00140    log_.info(ME, string("successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'");
00141    connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId());
00142 
00143    enum States oldState = status_;
00144    status_ = ALIVE;
00145    if (connectionProblemsListener_) connectionProblemsListener_->reachedAlive(oldState, this);
00146    // start the ping if in failsafe, i.e. if delay > 0
00147    startPinger(false);
00148    if (log_.dump()) log_.dump(ME, string("::connect, the return qos is: ") + connectReturnQos_->toXml());
00149 
00150    flushQueue();
00151 
00152    return connectReturnQos_;
00153 }
00154 
00155 bool ConnectionsHandler::disconnect(const DisconnectQos& qos)
00156 {
00157    Lock lock(connectMutex_);
00158    if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::DISCONNECT);
00159    if (log_.dump()) log_.dump(ME, string("::disconnect, the qos is: ") + qos.toXml());
00160 
00161    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::DISCONNECT);
00162    if (status_ == DEAD) {
00163       log_.warn(ME, "already disconnected");
00164       return false;
00165    }
00166    if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::DISCONNECT);
00167 
00168    if (qos.getClearClientQueue() && queue_ != 0) queue_->clear();
00169 
00170    bool ret = connection_->disconnect(qos);
00171    enum States oldState = status_;
00172    status_ = DEAD;
00173    if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this);
00174    return ret;
00175 }
00176 
00177 string ConnectionsHandler::getProtocol()
00178 {
00179    return connection_->getProtocol();
00180 }
00181 
00182 /*
00183 string ConnectionsHandler::loginRaw()
00184 {
00185    return connection_->loginRaw();
00186 }
00187 */
00188 
00189 bool ConnectionsHandler::shutdown()
00190 {
00191    if (connection_) {
00192       return connection_->shutdown();
00193    }
00194    return false;
00195 }
00196 
00197 string ConnectionsHandler::getLoginName() 
00198 {
00199    return connection_->getLoginName();
00200 }
00201 
00202 bool ConnectionsHandler::isLoggedIn()
00203 {
00204    return connection_->isLoggedIn();
00205 }
00206 
00207 string ConnectionsHandler::ping(const string& qos)
00208 {
00209 //   Lock lock(connectionMutex_);
00210    return connection_->ping(qos);
00211 }
00212 
00213 SubscribeReturnQos ConnectionsHandler::subscribe(const SubscribeKey& key, const SubscribeQos& qos)
00214 {
00215    if (log_.call()) log_.call(ME, MethodName::SUBSCRIBE);
00216    if (log_.dump()) log_.dump(ME, string("::subscribe, the key is: ") + key.toXml());
00217    if (log_.dump()) log_.dump(ME, string("::subscribe, the qos is: ") + qos.toXml());
00218 
00219 //   Lock lock(connectionMutex_);
00220 
00221    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, MethodName::SUBSCRIBE);
00222    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, MethodName::SUBSCRIBE);
00223    if (putToQueue()) return queueSubscribe(key, qos);
00224    try {
00225       SubscribeReturnQos ret = connection_->subscribe(key, qos);
00226       return ret;
00227    }   
00228    catch (XmlBlasterException& ex) {
00229       toPollingOrDead(&ex);
00230       if (putToQueue() && isRecoverable(&ex)) {
00231          log_.info(ME, string("::subscribe ") + key.getOid() + " is queued, exception=" + ex.getMessage());
00232          return queueSubscribe(key, qos);
00233       }
00234       else {
00235          log_.warn(ME, string("::subscribe failed throwing now exception: ") + key.toXml() + qos.toXml() + " exception=" + ex.getMessage());
00236          throw ex;
00237       }
00238    }
00239 }
00240 
00241 
00242 vector<MessageUnit> ConnectionsHandler::get(const GetKey& key, const GetQos& qos)
00243 {
00244    if (log_.call()) log_.call(ME, "get");
00245    if (log_.dump()) log_.dump(ME, string("::get, the key is: ") + key.toXml());
00246    if (log_.dump()) log_.dump(ME, string("::get, the qos is: ") + qos.toXml());
00247    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "get");
00248    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "get");
00249    if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, "get");
00250    try {
00251       return connection_->get(key, qos);
00252    }   
00253    catch (XmlBlasterException& ex) {
00254       toPollingOrDead(&ex);
00255       throw ex;
00256    }
00257 }
00258 
00259 
00260 vector<UnSubscribeReturnQos> 
00261    ConnectionsHandler::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
00262 {
00263    if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
00264    if (log_.dump()) log_.dump(ME, string("::unSubscribe, the key is: ") + key.toXml());
00265    if (log_.dump()) log_.dump(ME, string("::unSubscribe, the qos is: ") + qos.toXml());
00266    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
00267    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
00268    if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::UNSUBSCRIBE);
00269    try {
00270       vector<UnSubscribeReturnQos> ret = connection_->unSubscribe(key, qos);
00271       return ret;
00272    }   
00273    catch (XmlBlasterException& ex) {
00274       toPollingOrDead(&ex);
00275       throw ex;
00276    }
00277 }
00278 
00279 bool ConnectionsHandler::putToQueue() {
00280    if (status_ == POLLING) return true;
00281    if (queue_ && queue_->getNumOfEntries() > 0) {
00282       return true; // guarantee sequence
00283    }
00284    return false;
00285 }
00286 
00287 PublishReturnQos ConnectionsHandler::publish(const MessageUnit& msgUnit)
00288 {
00289    if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::PUBLISH);
00290    if (log_.dump()) log_.dump(ME, string("::publish, the msgUnit is: ") + msgUnit.toXml());
00291    Lock lock(publishMutex_);
00292    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::PUBLISH);
00293    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::PUBLISH);
00294    if (putToQueue()) return queuePublish(msgUnit);
00295    try {
00296       // fill in the sender absolute name
00297       if (!connectReturnQos_.isNull()) {
00298          msgUnit.getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());
00299       }
00300       return connection_->publish(msgUnit);
00301    }   
00302    catch (XmlBlasterException& ex) {
00303       toPollingOrDead(&ex);
00304       if (putToQueue() && isRecoverable(&ex)) {
00305          log_.info(ME, string("::publish ") + msgUnit.getKey().getOid() + " is queued, exception=" + ex.getMessage());
00306          return queuePublish(msgUnit);
00307       }
00308       else {
00309          log_.warn(ME, string("::publish failed throwing now exception, the msgUnit is: ") + msgUnit.toXml() + " exception=" + ex.getMessage());
00310          throw ex;
00311       }
00312    }
00313 }
00314 
00315 
00316 void ConnectionsHandler::publishOneway(const vector<MessageUnit> &msgUnitArr)
00317 {
00318    if (log_.call()) log_.call(ME, "publishOneway");
00319    Lock lock(publishMutex_);
00320 
00321    // fill in the sender absolute name
00322    if (!connectReturnQos_.isNull()) {
00323       for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) {
00324          msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());
00325       }
00326    }
00327 
00328    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishOneway");
00329    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishOneway");
00330    if (putToQueue()) {
00331       for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]);
00332    }
00333 
00334    try {
00335       connection_->publishOneway(msgUnitArr);
00336    }   
00337    catch (XmlBlasterException& ex) {
00338       toPollingOrDead(&ex);
00339       if (putToQueue() && isRecoverable(&ex)) {
00340          for (size_t i=0; i < msgUnitArr.size(); i++) queuePublish(msgUnitArr[i]);
00341       }
00342       else
00343          throw ex;
00344    }
00345 }
00346 
00347 
00348 vector<PublishReturnQos> ConnectionsHandler::publishArr(const vector<MessageUnit> &msgUnitArr)
00349 {
00350    if (log_.call()) log_.call(ME, "publishArr");
00351    Lock lock(publishMutex_);
00352 
00353    // fill in the sender absolute name
00354    if (!connectReturnQos_.isNull()) {
00355       for (vector<MessageUnit>::size_type i=0;i<msgUnitArr.size();i++) {
00356          msgUnitArr[i].getQos().setSender(connectReturnQos_->getSessionQos().getSessionName());
00357       }
00358    }
00359 
00360    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, "publishArr");
00361    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, "publishArr");
00362    if (putToQueue()) {
00363       vector<PublishReturnQos> retQos;
00364       for (size_t i=0; i < msgUnitArr.size(); i++) {
00365          retQos.insert(retQos.end(), queuePublish(msgUnitArr[i]));
00366       }
00367       return retQos;
00368    }
00369    try {
00370       return connection_->publishArr(msgUnitArr);
00371    }   
00372    catch (XmlBlasterException& ex) {
00373       toPollingOrDead(&ex);
00374       if (putToQueue() && isRecoverable(&ex)) {
00375          vector<PublishReturnQos> retQos;
00376          for (size_t i=0; i < msgUnitArr.size(); i++) {
00377             retQos.insert(retQos.end(), queuePublish(msgUnitArr[i]));
00378          }
00379          return retQos;
00380       }
00381       else throw ex;
00382    }
00383 }
00384 
00385 
00386 vector<EraseReturnQos> ConnectionsHandler::erase(const EraseKey& key, const EraseQos& qos)
00387 {
00388    if (log_.call()) log_.call(ME, org::xmlBlaster::util::MethodName::ERASE);
00389    if (log_.dump()) log_.dump(ME, string("::erase, the key is: ") + key.toXml());
00390    if (log_.dump()) log_.dump(ME, string("::erase, the qos is: ") + qos.toXml());
00391 
00392    if (status_ == START)   throw XmlBlasterException(COMMUNICATION_NOCONNECTION, ME, org::xmlBlaster::util::MethodName::ERASE);
00393    if (status_ == DEAD)    throw XmlBlasterException(COMMUNICATION_NOCONNECTION_DEAD, ME, org::xmlBlaster::util::MethodName::ERASE);
00394    if (putToQueue()) throw XmlBlasterException(COMMUNICATION_NOCONNECTION_POLLING, ME, org::xmlBlaster::util::MethodName::ERASE);
00395 
00396    try {
00397       return connection_->erase(key, qos);
00398    }   
00399    catch (XmlBlasterException& ex) {
00400       toPollingOrDead(&ex);
00401       throw ex;
00402    }
00403 }
00404 
00405 void ConnectionsHandler::initFailsafe(I_ConnectionProblems* connectionProblems)
00406 {
00407 //   Lock lock(connectionMutex_);
00408    if (log_.trace()) log_.trace(ME, "Register initFailsafe " + lexical_cast<string>(connectionProblems!=0));
00409    connectionProblemsListener_ = connectionProblems;
00410 }
00411 
00412 // If recoverable we queue a msgUnit, else we throw an exception
00413 bool ConnectionsHandler::isRecoverable(const org::xmlBlaster::util::XmlBlasterException* reason)
00414 {
00415    // TODO: Authorization could also be recoverable (by a server admin)
00416    //       Such decision must be left to the user (we need a callback to the user here)
00417    // As a default all communication problems are assumed to be recoverable
00418    if (reason == 0)
00419       return true;
00420    bool ret = reason->isCommunication();
00421     if (log_.call()) log_.call(ME, "isRecoverable " + lexical_cast<string>(ret));
00422    return ret;
00423 }
00424 
00425 void ConnectionsHandler::toPollingOrDead(const org::xmlBlaster::util::XmlBlasterException* reason)
00426 {
00427    if (reason == 0)
00428       return;
00429    if (!reason->isCommunication())
00430      return;
00431       
00432    if (log_.call()) log_.call(ME, "toPollingOrDead");
00433    
00434    enum States oldState = status_;
00435    if (!isFailsafe()) {
00436       log_.info(ME, "going into DEAD status since not in failsafe mode. "
00437                     "For failsafe mode set 'delay' to a positive long value, for example on the cmd line: -delay 10000" +
00438                     ((reason != 0) ? (": " + reason->getMessage()) : ""));
00439       status_ = DEAD;
00440       connection_->shutdown();
00441       if (connectionProblemsListener_) connectionProblemsListener_->reachedDead(oldState, this);
00442       return;
00443    }
00444 
00445    log_.info(ME, "going into POLLING status:" + ((reason != 0) ? (": " + reason->getMessage()) : ""));
00446    status_ = POLLING;
00447    currentRetry_ = 0;
00448    /*
00449    try {
00450       DisconnectQos discQos(global_);
00451       connection_->disconnect(discQos);
00452    }
00453    catch (...) {
00454       log_.warn(ME, "exception when trying to disconnect");
00455    }
00456    */
00457    connection_->shutdown();
00458    if (connectionProblemsListener_) connectionProblemsListener_->reachedPolling(oldState, this);
00459    startPinger(true);
00460 }
00461 
00462 
00463 void ConnectionsHandler::timeout(void * /*userData*/)
00464 {
00465                                                     
00466   Lock lock(connectMutex_);
00467    pingPollTimerKey_ = 0;
00468    if (doStopPing_) return; // then it must stop
00469    if ( log_.call() ) log_.call(ME, string("ping timeout occured with status '") + getStatusString() + "'" );
00470    if (status_ == ALIVE) { // then I am pinging
00471       if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'ALIVE'");
00472       try {
00473          if (connection_) {
00474             connection_->ping("<qos/>");
00475             if ( log_.trace() ) log_.trace(ME, "lowlevel ping returned: status is 'ALIVE'");
00476             startPinger(false);
00477          }
00478       }
00479       catch (XmlBlasterException& ex) {
00480          if ( log_.trace() ) log_.trace(ME, "lowlevel ping failed: " + ex.toString());
00481          toPollingOrDead(&ex);
00482       }
00483       return;
00484    }
00485  
00486    if (status_ == POLLING) {
00487       if ( log_.trace() ) log_.trace(ME, "ping timeout: status is 'POLLING'");
00488       try {
00489          if (connection_ && !connectQos_.isNull()) {
00490             if ( log_.trace() ) log_.trace(ME, "ping timeout: going to retry a connection");
00491  
00492             string lastSessionId = connectQos_->getSessionQos().getSecretSessionId();
00493             connectReturnQos_ = connection_->connect(*connectQos_);
00494             if (log_.trace()) log_.trace(ME, string("Successfully reconnected, ConnectRetQos: ") + connectReturnQos_->toXml());
00495             string sessionId = connectReturnQos_->getSessionQos().getSecretSessionId();
00496             log_.info(ME, string("Successfully reconnected as '") + connectReturnQos_->getSessionQos().getAbsoluteName() +
00497                           "' after " + lexical_cast<string>(currentRetry_) + " attempts");
00498             connectQos_->getSessionQos().setSecretSessionId(sessionId);
00499  
00500             if ( log_.trace() ) {
00501                log_.trace(ME, string("ping timeout: re-connection, the new connect returnQos: ") + connectReturnQos_->toXml());
00502             }
00503  
00504             bool doFlush = true;
00505             enum States oldState = status_;
00506             status_ = ALIVE;
00507             if ( connectionProblemsListener_ ) doFlush = connectionProblemsListener_->reachedAlive(oldState, this);
00508  
00509             Lock lockPub(publishMutex_); // lock here to avoid publishing while flushing queue (to ensure sequence)
00510             if (sessionId != lastSessionId) {
00511                log_.trace(ME, string("When reconnecting the sessionId changed from '") + lastSessionId + "' to '" + sessionId + "'");
00512             }
00513  
00514             if (doFlush) {
00515                try {
00516                   flushQueueUnlocked(queue_, true);
00517                }
00518                catch (const XmlBlasterException &ex) {
00519                   log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue:" + ex.getMessage());
00520                }
00521                catch (...) {
00522                   log_.warn(ME, "An exception occured when trying to asynchronously flush the contents of the queue. Probably not all messages have been sent. These unsent messages are still in the queue");
00523                }
00524             }
00525             startPinger(false);
00526          }
00527       }
00528       catch (XmlBlasterException ex) {
00529          if (log_.trace()) log_.trace(ME, "timeout got exception: " + ex.getMessage());
00530          currentRetry_++;
00531          if ( currentRetry_ < retries_ || retries_ < 0) { // continue to poll
00532             startPinger(false);
00533          }
00534          else {
00535             enum States oldState = status_;
00536             status_ = DEAD;
00537             if ( connectionProblemsListener_ ) {
00538                connectionProblemsListener_->reachedDead(oldState, this);
00539                // stopping
00540             }
00541          }
00542       }
00543       return;
00544    }
00545  
00546    // if it comes here it will stop
00547  
00548 }
00549 
00550 SubscribeReturnQos ConnectionsHandler::queueSubscribe(const SubscribeKey& key, const SubscribeQos& qos)
00551 {
00552    if (!queue_) {
00553       if (connectQos_.isNull()) {
00554          throw XmlBlasterException(INTERNAL_SUBSCRIBE, ME + "::queueSubscribe", "need to create a queue but the connectQos is NULL (probably never connected)");
00555       }
00556       if (log_.trace()) log_.trace(ME+":queueSubscribe", "creating a client queue ...");
00557       queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
00558       if (log_.trace()) log_.trace(ME+":queueSubscribe", "created a client queue");
00559    }
00560    SubscribeReturnQos retQos(global_);
00561    SubscribeQos& q = const_cast<SubscribeQos&>(qos);
00562    SessionNameRef sessionName = global_.getSessionName();
00563    std::string subscriptionId = q.generateSubscriptionId(sessionName, key);
00564    retQos.getData().setSubscriptionId(subscriptionId);
00565    retQos.getData().setState(Constants::STATE_OK);
00566    retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED"
00567    qos.setSubscriptionId(subscriptionId);
00568    SubscribeQueueEntry entry(global_, key, qos, qos.getData().getPriority());
00569    queue_->put(entry);
00570    //if (log_.trace()) 
00571       log_.warn(ME, string("queueSubscribe: entry '") + key.getOid() +
00572                      "' has been queued with client side generated subscriptionId=" + subscriptionId);
00573    return retQos;
00574 }
00575 
00576 PublishReturnQos ConnectionsHandler::queuePublish(const MessageUnit& msgUnit)
00577 {
00578    if (log_.call()) log_.call(ME, "queuePublish");
00579    if (!queue_) {
00580       if (connectQos_.isNull()) {
00581          throw XmlBlasterException(INTERNAL_PUBLISH, ME + "::queuePublish", "need to create a queue but the connectQos is NULL (probably never connected)");
00582       }
00583       if (log_.trace()) log_.trace(ME+":queuePublish", "creating a client queue ...");
00584       queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
00585       if (log_.trace()) log_.trace(ME+":queuePublish", "created a client queue");
00586    }
00587    if (log_.trace()) 
00588       log_.trace(ME, string("queuePublish: entry '") + msgUnit.getKey().getOid() + "' has been queued");
00589    PublishReturnQos retQos(global_);
00590    retQos.setKeyOid(msgUnit.getKey().getOid());
00591    retQos.setState(Constants::STATE_OK);
00592    retQos.getData().setStateInfo(Constants::INFO_QUEUED); // "QUEUED"
00593    PublishQueueEntry entry(global_, msgUnit, msgUnit.getQos().getPriority());
00594    queue_->put(entry);
00595    return retQos;
00596 }
00597 
00598 ConnectReturnQosRef& ConnectionsHandler::queueConnect()
00599 {
00600    if (log_.call()) log_.call(ME, string("::queueConnect with sessionQos: '") + connectQos_->getSessionQos().getAbsoluteName() + "'");
00601    long tmp = connectQos_->getSessionQos().getPubSessionId(); 
00602    if ( tmp <= 0) {
00603       if (log_.trace()) log_.trace(ME, string("::queueConnect, the public session id is '") + lexical_cast<std::string>(tmp));
00604       throw XmlBlasterException(USER_CONNECT, ME + "::queueConnect", "queueing connection request not possible because you did not specify a positive public sessionId");
00605    }
00606 
00607    if (!queue_) {
00608       if (log_.trace()) log_.info(ME, "::queueConnect: created a client queue");
00609       queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
00610    }
00611    if (log_.trace()) 
00612       log_.trace(ME, string("queueConnect: entry '") + connectQos_->getSessionQos().getAbsoluteName() + "' has been queued");
00613 
00614    connectReturnQos_ = new ConnectReturnQos(*connectQos_);
00615 
00616    /* Michele thinks we should not queue the ConnectQos
00617    ConnectQueueEntry entry(global_, *connectQos_);
00618    queue_->put(entry);
00619    */
00620    enum States oldState = status_;
00621    status_ = POLLING;
00622    if ( connectionProblemsListener_ ) {
00623       connectionProblemsListener_->reachedPolling(oldState, this);
00624       // stopping
00625    }
00626    startPinger(true);
00627    return connectReturnQos_;
00628 }
00629 
00630 I_PostSendListener* ConnectionsHandler::registerPostSendListener(I_PostSendListener *listener) {
00631    I_PostSendListener* old = postSendListener_; 
00632    postSendListener_ = listener;
00633    return old;
00634 }
00635 
00642 long ConnectionsHandler::flushQueue()
00643 {
00644    if (log_.call()) log_.call(ME, "flushQueue");
00645    //   Lock lock(connectionMutex_);
00646 
00647    if (!queue_) {
00648       if (connectQos_.isNull()) {
00649          log_.error(ME+".flusgQueue", "need to create a queue but the connectQos is NULL (probably never connected)");
00650       }
00651       if (log_.trace()) log_.trace(ME+".flushQueue", "creating the client queue ...");
00652       queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
00653       if (queue_->getNumOfEntries() < 1) {
00654          if (log_.trace()) log_.trace(ME+".flushQueue", "Created queue [" + queue_->getType() + "][" + queue_->getVersion() +
00655                                                         "], it is empty, nothing to do.");
00656          return 0;
00657       }
00658       log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " +
00659                     lexical_cast<string>(queue_->getNumOfEntries()) + " entries.");
00660    }
00661 
00662    return flushQueueUnlocked(queue_, true);
00663 }  
00664 
00665    
00666 long ConnectionsHandler::flushQueueUnlocked(I_Queue *queueToFlush, bool doRemove)
00667 {
00668    if ( log_.call() ) log_.call(ME, "flushQueueUnlocked");
00669            if (!queueToFlush || queueToFlush->empty()) return 0;
00670    if (status_ != ALIVE || connection_ == NULL) return -1;
00671 
00672    long ret = 0;
00673    if (!queueToFlush->empty()) {
00674       log_.info(ME, "Queue [" + queue_->getType() + "][" + queue_->getVersion() + "] contains " +
00675                   lexical_cast<string>(queue_->getNumOfEntries()) + " entries, we send them to the server");
00676    }
00677    while (!queueToFlush->empty()) { 
00678       long maxNumOfEntries= (doRemove) ? 1 : -1; // doRemove==false makes no sense, TODO: remove this arg
00679       if (log_.trace()) log_.trace(ME, "flushQueueUnlocked: flushing one priority sweep maxNumOfEntries=" + lexical_cast<string>(maxNumOfEntries));
00680       const vector<EntryType> entries = queueToFlush->peekWithSamePriority(maxNumOfEntries);
00681       vector<EntryType>::const_iterator iter = entries.begin();
00682       while (iter != entries.end()) {
00683          try {
00684             if (log_.trace()) log_.trace(ME, "sending the content to xmlBlaster: " + (*iter)->toXml());
00685             const EntryType entry = (*iter);
00686             const MsgQueueEntry &entry2 = *entry;
00687             {
00688                MsgQueueEntry &entry3 = const_cast<MsgQueueEntry&>(entry2);
00689                entry3.setSender(connectReturnQos_->getSessionQos().getSessionName());
00690             }
00691             entry2.send(*this); // entry2 contains the PublishReturnQos after calling send
00692             if (log_.trace()) log_.trace(ME, "content to xmlBlaster successfully sent");
00693 
00694             I_PostSendListener *p = postSendListener_;
00695             if (p) {
00696                 p->postSend(entry2);
00697             }
00698          }
00699          catch (XmlBlasterException &ex) {
00700            if (ex.isCommunication()) toPollingOrDead(&ex);
00701            log_.warn(ME, "flushQueueUnlocked: can't send queued message to server: " + ex.getMessage());
00702            //if (doRemove) queueToFlush->randomRemove(entries.begin(), iter);
00703            throw ex;
00704          }
00705          iter++;
00706       }
00707       if (doRemove) {
00708           //log_.trace(ME, "remove send message from client queue");
00709           ret += queueToFlush->randomRemove(entries.begin(), entries.end());
00710       }
00711    }
00712    return ret;
00713 }
00714 
00715 I_Queue* ConnectionsHandler::getQueue()
00716 {
00717    if (!queue_) {
00718       if (log_.trace()) log_.trace(ME+".getQueue", "creating the client queue ...");
00719       queue_ = &QueueFactory::getFactory().getPlugin(global_, connectQos_->getClientQueueProperty());
00720       log_.info(ME, "Created queue [" + queue_->getType() + "][" + queue_->getVersion() + "] which contains " +
00721                     lexical_cast<string>(queue_->getNumOfEntries()) + " entries.");
00722    }
00723    return queue_;
00724 }
00725 
00726 bool ConnectionsHandler::isFailsafe() const
00727 {
00728    if (connectQos_.isNull()) return false;
00729    return connectQos_->getAddress()->getDelay() > 0;
00730 }
00731 
00732 // pinger or poller
00733 bool ConnectionsHandler::startPinger(bool withInitialPing)
00734 {
00735    if (log_.call()) log_.call(ME, "startPinger");
00736    if (doStopPing_) return false;
00737 
00738    if (log_.trace()) log_.trace(ME, "startPinger (no request to stop the pinger is active for the moment)");
00739    if (pingPollTimerKey_ != 0 && !withInitialPing) {
00740       if (log_.trace()) log_.trace(ME, "startPinger: the pinger is already running. I will return without starting a new thread");
00741       return false;  
00742    }
00743 
00744    long delay        = 10000;
00745    long pingInterval = 0;
00746    if (connectQos_.isNull()) {
00747       ConnectQos tmp(global_);
00748       delay        = tmp.getAddress()->getDelay();
00749       pingInterval = tmp.getAddress()->getPingInterval();
00750    }
00751    else {
00752       delay        = connectQos_->getAddress()->getDelay();
00753       pingInterval = connectQos_->getAddress()->getPingInterval();
00754    }
00755    if (log_.trace()) {
00756       log_.trace(ME, string("startPinger(status=") + 
00757                getStatusString() +
00758                "): parameters are: delay '" + lexical_cast<std::string>(delay) +
00759                "' and pingInterval '" + lexical_cast<std::string>(pingInterval) +
00760                " withInitialPing=" + lexical_cast<string>(withInitialPing));
00761    }
00762    if (delay > 0 && pingInterval > 0) {
00763       long delta = delay;
00764       if (status_ == ALIVE) delta = pingInterval;
00765       if (withInitialPing) delta = 400;
00766       pingPollTimerKey_ = global_.getPingTimer().addOrRefreshTimeoutListener(this, delta, NULL, pingPollTimerKey_);
00767    }
00768    return true;
00769 }
00770 
00771 string ConnectionsHandler::getStatusString() const
00772 {
00773    if (status_ == ALIVE) return "ALIVE";
00774    else if (status_ == POLLING) return "POLLING";
00775    else if (status_ == DEAD) return "DEAD";
00776    else if (status_ == START) return "START";
00777    return "END";;
00778 }
00779 
00780 
00781 bool ConnectionsHandler::isConnected() const
00782 {
00783    return status_ == ALIVE || status_ == POLLING;
00784 }
00785 
00786 bool ConnectionsHandler::isAlive() const
00787 {
00788    return status_ == ALIVE;
00789 }
00790 
00791 bool ConnectionsHandler::isPolling() const
00792 {
00793    return status_ == POLLING;
00794 }
00795 
00796 bool ConnectionsHandler::isDead() const
00797 {
00798    return status_ == DEAD;
00799 }
00800 
00801 ConnectReturnQosRef ConnectionsHandler::connectRaw(const ConnectQosRef& connectQos)
00802 {
00803    if (log_.call()) log_.call(ME, "::connectRaw");
00804    connectReturnQos_ = connection_->connect(connectQos);
00805    connectQos_ = connectQos;
00806    log_.info(ME, string("Successfully connected with sessionId = '") + connectReturnQos_->getSessionQos().getSecretSessionId() + "'");
00807    connectQos_->getSessionQos().setSecretSessionId(connectReturnQos_->getSessionQos().getSecretSessionId());
00808    return connectReturnQos_;
00809 }
00810 
00811 
00812 I_XmlBlasterConnection& ConnectionsHandler::getConnection() const
00813 {
00814    if (!connection_) {
00815       throw XmlBlasterException(INTERNAL_ILLEGALARGUMENT, ME + "::getConnection", "the connection is still NULL: it is not assigned yet. You probably called this method before a connection was made");
00816    }
00817    return *connection_;
00818 }
00819 
00820 
00821 ConnectReturnQosRef ConnectionsHandler::getConnectReturnQos()
00822 {
00823    return connectReturnQos_;
00824 }
00825 
00826 ConnectQosRef ConnectionsHandler::getConnectQos()
00827 {
00828    return connectReturnQos_; // contains everything and is typedef on ConnectQos
00829 }
00830 
00831 /*
00832 void ConnectionsHandler::setConnectReturnQos(const connectReturnQos& retQos)
00833 {
00834    if (connectReturnQos_)  {
00835       delete connectReturnQos_;
00836       connectReturnQos_ = NULL;
00837    }
00838    connectReturnQos_ = new ConnectReturnQos(retQos);
00839 }
00840 */
00841 
00842 }}}} // namespaces
00843 
00844