client/XmlBlasterAccess.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      XmlBlasterAccess.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 ------------------------------------------------------------------------------*/
00006 
00007 #include <client/XmlBlasterAccess.h>
00008 #include <util/Global.h>
00009 #include <util/lexical_cast.h>
00010 #include <util/Timestamp.h>
00011 #include <util/dispatch/DispatchManager.h>
00012 #include <util/parser/ParserFactory.h>
00013 
00014 namespace org { namespace xmlBlaster { namespace client {
00015 
00016 using namespace std;
00017 using namespace org::xmlBlaster::util;
00018 using namespace org::xmlBlaster::util::qos;
00019 using namespace org::xmlBlaster::util::dispatch;
00020 using namespace org::xmlBlaster::util::dispatch;
00021 using namespace org::xmlBlaster::util::qos::storage;
00022 using namespace org::xmlBlaster::util::qos::address;
00023 using namespace org::xmlBlaster::authentication;
00024 using namespace org::xmlBlaster::client::protocol;
00025 using namespace org::xmlBlaster::client::key;
00026 using namespace org::xmlBlaster::client::qos;
00027 
00028 XmlBlasterAccess::XmlBlasterAccess(Global& global)
00029    : ME(string("XmlBlasterAccess-UNCONNECTED")),
00030      global_(global), 
00031      globalRef_(NULL), 
00032      log_(global.getLog("org.xmlBlaster.client")),
00033      serverNodeId_("xmlBlaster"), 
00034      connectQos_(new ConnectQos(global)), 
00035      connectReturnQos_((ConnectReturnQos*)0),
00036      subscriptionCallbackMap_(),
00037      updateMutex_(),
00038      invocationMutex_(global.getProperty().get("xmlBlaster/invocationMutex/recursive", true)),
00039      postSendListener_(0)
00040 {
00041    log_.call(ME, "::constructor");
00042    cbServer_           = NULL;
00043    updateClient_       = NULL;
00044    connection_         = NULL;
00045    dispatchManager_    = NULL;
00046    connectionProblems_ = NULL;
00047    instanceName_       = lexical_cast<std::string>(TimestampFactory::getInstance().getTimestamp());
00048 
00049    // Hack for Windows: Initialize it from main thread, using the callback thread fails undeterminable (with xerces)
00050    org::xmlBlaster::util::parser::ParserFactory::getFactory().initialize(global);
00051 }
00052 
00053 XmlBlasterAccess::XmlBlasterAccess(GlobalRef globalRef)
00054    : ME(string("XmlBlasterAccess-UNCONNECTED")),
00055      global_(*globalRef), 
00056      globalRef_(globalRef), 
00057      log_(global_.getLog("org.xmlBlaster.client")),
00058      serverNodeId_("xmlBlaster"), 
00059      connectQos_(new ConnectQos(global_)), 
00060      connectReturnQos_((ConnectReturnQos*)0),
00061      subscriptionCallbackMap_(),
00062      updateMutex_(),
00063      invocationMutex_(globalRef->getProperty().get("xmlBlaster/invocationMutex/recursive", true)),
00064      postSendListener_(0)
00065 {
00066    log_.call(ME, "::constructor");
00067    cbServer_           = NULL;
00068    updateClient_       = NULL;
00069    connection_         = NULL;
00070    dispatchManager_    = NULL;
00071    connectionProblems_ = NULL;
00072    instanceName_       = lexical_cast<std::string>(TimestampFactory::getInstance().getTimestamp());
00073 
00074    // Hack for Windows: Initialize it from main thread, using the callback thread fails undeterminable (with xerces)
00075    org::xmlBlaster::util::parser::ParserFactory::getFactory().initialize(global_);
00076 }
00077 
00078 XmlBlasterAccess::~XmlBlasterAccess()
00079 {
00080    if (log_.call()) log_.call(ME, "destructor");
00081    cleanup(true);
00082    dispatchManager_    = NULL;
00083    updateClient_       = NULL;
00084    connectionProblems_ = NULL;
00085    if (log_.trace()) log_.trace(ME, "destructor ended");
00086 }
00087 
00088 void XmlBlasterAccess::cleanup(bool doLock)
00089 {
00090    if (log_.call()) log_.call(ME, "cleanup");
00091    if (doLock) {
00092       // synchronization
00093       org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00094       org::xmlBlaster::util::thread::Lock lock1(updateMutex_);
00095       subscriptionCallbackMap_.clear();
00096    }
00097    else {
00098       org::xmlBlaster::util::thread::Lock lock1(updateMutex_);
00099       subscriptionCallbackMap_.clear();
00100    }
00101 
00102    if (cbServer_) {
00103       CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
00104       const AddressBaseRef& addr = prop.getCurrentCallbackAddress(); // c++ may not return null
00105       global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() );
00106       cbServer_ = NULL;
00107    }
00108    if (connection_) {
00109       if (log_.trace()) log_.trace(ME, "destructor: going to delete the connection");
00110       connection_->shutdown();
00111       delete connection_;
00112       connection_ = NULL;
00113    }
00114 }
00115 
00116 
00117 ConnectReturnQos XmlBlasterAccess::connect(const ConnectQos& qos, I_Callback *clientCb)
00118 {
00119    ME = string("XmlBlasterAccess-") + qos.getSessionQos().getAbsoluteName();
00120    if (log_.call()) log_.call(ME, "::connect");
00121    if (log_.dump()) log_.dump(ME, string("::connect: qos: ") + qos.toXml());
00122 
00123    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00124 
00125    cleanup(false);
00126 
00127    global_.setId(qos.getSessionQos().getAbsoluteName()); // global_.setId(loginName + currentTimeMillis());
00128    connectQos_ = new ConnectQos(qos);
00129    connectQos_->setInstanceId(global_.getInstanceId());
00130 
00131    SecurityQos securityQos = connectQos_->getSecurityQos();
00132 
00133    ME = string("XmlBlasterAccess-") + getId();
00134    string typeVersion = global_.getProperty().getStringProperty("queue/defaultPlugin", "CACHE,1.0");
00135    typeVersion = global_.getProperty().getStringProperty("queue/connection/defaultPlugin", "typeVersion");
00136    updateClient_ = clientCb;
00137 
00138    if (updateClient_) createDefaultCbServer();
00139 
00140    if (log_.trace()) log_.trace(ME, string("::connect. CbServer done"));
00141    // currently the simple version will do it ...
00142    if (!dispatchManager_) dispatchManager_ = &(global_.getDispatchManager());
00143 
00144    if (!connection_) {
00145       connection_ = dispatchManager_->getConnectionsHandler(instanceName_);
00146       connection_->registerPostSendListener(this);
00147    }
00148 
00149    if (connectionProblems_) {
00150       if (log_.trace()) log_.trace(ME, "::connect. Registering initFailsafe");
00151       connection_->initFailsafe(connectionProblems_);
00152       connectionProblems_ = NULL;
00153    }
00154    if (log_.trace()) log_.trace(ME, string("::connect. connectQos: ") + connectQos_->toXml());
00155    // do connect() now:
00156    connectReturnQos_ = connection_->connect(connectQos_);
00157 
00158    ME = string("XmlBlasterAccess-") + connectReturnQos_->getSessionQos().getAbsoluteName();
00159 
00160    setServerNodeId(connectReturnQos_->getSessionQos().getClusterNodeId());
00161    
00162    // Is done in ConnectionsHandler.cpp
00163    //global_.setId(connectReturnQos_->getSessionQos().getAbsoluteName());
00164 
00165    return *connectReturnQos_;
00166 }
00167 
00168 org::xmlBlaster::util::Global& XmlBlasterAccess::getGlobal()
00169 {
00170    return this->global_;
00171 }
00172 
00173 org::xmlBlaster::util::queue::I_Queue* XmlBlasterAccess::getQueue()
00174 {
00175    if (connection_) {
00176       return connection_->getQueue();
00177    }
00178    return 0;
00179 }
00180 
00181 
00182 org::xmlBlaster::client::I_Callback* XmlBlasterAccess::getCallback()
00183 {
00184    return this->updateClient_;
00185 }
00186 
00187 void XmlBlasterAccess::setCallbackDispatcherActive(bool isActive)
00188 {
00189    string command = getSessionName() + "/?dispatcherActive=" + lexical_cast<string>(isActive);
00190    sendAdministrativeCommand(command);
00191    connectQos_->getCbAddress()->setDispatcherActive(isActive);
00192 }
00193 
00194 string XmlBlasterAccess::sendAdministrativeCommand(const string &command)
00195 {
00196    bool isGet = command.find("get ") == 0 || command.find("GET ") == 0;
00197    bool isSet = command.find("set ") == 0 || command.find("SET ") == 0;
00198    const string cmd = ((isGet || isSet)) ? command.substr(4) : command;
00199    
00200    if (isSet || (!isGet && cmd.find("=") != string::npos)) {
00201       string oid = string("__cmd:") + cmd;
00202       PublishKey  key(global_, oid); // oid="__cmd:/client/joe/1/?dispatcherActive=false"
00203       PublishQos  qos(global_);
00204       MessageUnit msgUnit(key, "", qos);
00205       try {
00206          PublishReturnQos ret = publish(msgUnit);
00207          if (log_.trace()) log_.trace(ME, "Send '" + cmd + " '");
00208          return ret.getState();
00209       }
00210       catch (XmlBlasterException &e) {
00211          if (log_.trace()) log_.trace(ME, "Sending of '" + cmd + " ' failed: " + e.getMessage());
00212          throw e;
00213       }
00214    }
00215    else {
00216       string oid = string("__cmd:") + cmd;
00217       GetKey getKey(global_);
00218       getKey.setOid(oid);
00219       GetQos getQos(global_);
00220       try {
00221          vector<MessageUnit> msgVec = get(getKey, getQos);
00222          if (log_.trace()) log_.trace(ME, "Send '" + cmd + " ', got array of size " + lexical_cast<string>(msgVec.size()));
00223          if (msgVec.size() == 0)
00224             return "";
00225          return msgVec[0].getContentStr();
00226       }
00227       catch (XmlBlasterException &e) {
00228          if (log_.trace()) log_.trace(ME, "Sending of '" + cmd + " ' failed: " + e.getMessage());
00229          throw e;
00230       }
00231    }
00232 }
00233 
00234 
00235 void XmlBlasterAccess::createDefaultCbServer()
00236 {
00237    log_.call(ME, "::createDefaultCbServer");
00238 
00239    CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
00240    const AddressBaseRef &addr = prop.getCurrentCallbackAddress();
00241 
00242    if(!cbServer_)
00243      cbServer_ = initCbServer(getLoginName(), addr->getType(), addr->getVersion());
00244 
00245    addr->setAddress(cbServer_->getCbAddress());
00246    addr->setType(cbServer_->getCbProtocol());
00247    // !!!!! prop.setCallbackAddress(addr);
00248    connectQos_->setSessionCbQueueProperty(prop);
00249    if (log_.trace()) log_.trace(ME, string("::createDefaultCbServer: connectQos: ") + connectQos_->toXml());
00250    log_.info(ME, "Callback settings: " + prop.getSettings());
00251 }
00252 
00253 I_CallbackServer*
00254 XmlBlasterAccess::initCbServer(const string& loginName, const string& type, const string& version)
00255 {
00256    if (log_.call()) log_.call(ME, string("::initCbServer: loginName='") + loginName + "' type='" + type + "' version='" + version +"'");
00257    if (log_.trace()) log_.trace(ME, string("Using 'client.cbProtocol=") + type + string("' to be used by ") + getServerNodeId() + string(", trying to create the callback server ..."));
00258    I_CallbackServer* server = &(global_.getCbServerPluginManager().getPlugin(instanceName_, type, version));
00259    if (log_.trace()) log_.trace(ME, "After callback plugin creation");
00260    server->initialize(loginName, *this);
00261    if (log_.trace()) log_.trace(ME, "After callback plugin initialize");
00262    return server;
00263 }
00264 
00265 org::xmlBlaster::util::dispatch::I_PostSendListener* XmlBlasterAccess::registerPostSendListener(org::xmlBlaster::util::dispatch::I_PostSendListener *listener) {
00266    I_PostSendListener* old = this->postSendListener_;
00267    this->postSendListener_ = listener;
00268    //if (connection_)
00269    //   return connection_->registerPostSendListener(this);
00270    return old;
00271 }
00272 
00273 // I_PostSendListener
00274 void XmlBlasterAccess::postSend(const org::xmlBlaster::util::queue::MsgQueueEntry &msgQueueEntry)
00275 {
00276    I_PostSendListener* l = this->postSendListener_;
00277    if (l)
00278       l->postSend(msgQueueEntry);
00279 }
00280 
00281 org::xmlBlaster::client::protocol::I_ProgressListener* XmlBlasterAccess::registerProgressListener(org::xmlBlaster::client::protocol::I_ProgressListener *listener)
00282 {
00283    return (this->cbServer_) ? this->cbServer_->registerProgressListener(listener) : 0;
00284 }
00285 
00286 org::xmlBlaster::util::qos::ConnectQosRef XmlBlasterAccess::getConnectQos() {
00287    return connectQos_;
00288 }
00289 
00290 //org::xmlBlaster::util::qos::ConnectReturnQosRef XmlBlasterAccess::getConnectReturnQos() {
00291 //}
00292 
00293 void
00294 XmlBlasterAccess::initSecuritySettings(const string& /*secMechanism*/, const string& /*secVersion*/)
00295 {
00296    log_.error(ME, "initSecuritySettings not implemented yet");
00297 }
00298 
00299 void XmlBlasterAccess::leaveServer(const StringMap &/*map*/)
00300 {
00301    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00302    if (!isConnected()) {
00303       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::leaveServer", "You are not connected to the xmlBlaster");
00304    }
00305    
00306    if (cbServer_) {
00307       if (log_.trace()) log_.trace(ME, "destructor: going to delete the callback connection");
00308       cbServer_->shutdownCb();
00309    }
00310    
00311    if (connection_) {
00312       if (log_.trace()) log_.trace(ME, "destructor: going to delete the connection");
00313       connection_->shutdown();
00314    }
00315    
00316    if (cbServer_) {
00317       CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
00318       AddressBaseRef addr = prop.getCurrentCallbackAddress(); // c++ may not return null
00319       global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() );
00320       cbServer_ = NULL;
00321    }
00322    
00323    if (connection_) {
00324       delete connection_;
00325       connection_ = NULL;
00326    }
00327    log_.info(ME, "leaveServer() done");
00328 }
00329 
00330 
00331 bool
00332 XmlBlasterAccess::disconnect(const DisconnectQos& qos, bool flush, bool shutdown, bool shutdownCb)
00333 {
00334    // locking until finished 
00335    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00336    bool ret1 = true;
00337    bool ret3 = true;
00338    if (log_.call()) {
00339       log_.call(ME, string("disconnect called with flush='") + Global::getBoolAsString(flush) + 
00340                               "' shutdown='" + Global::getBoolAsString(shutdown) + 
00341                     "' shutdownCb='" + Global::getBoolAsString(shutdownCb) + "'");
00342    }
00343 
00344    if (log_.trace()) log_.trace(ME, "disconnecting the client connection");
00345    if (log_.dump()) log_.dump(ME, string("disconnect: the qos is:\n") + qos.toXml());
00346    if (connection_ != NULL) {
00347       ret1  = connection_->disconnect(qos);
00348       if (shutdown) connection_->shutdown();
00349    }
00350    else {
00351       ret1 = false;
00352    }
00353    if (shutdownCb) {
00354       if (cbServer_) {
00355          ret3 = cbServer_->shutdownCb();
00356 
00357          CbQueueProperty prop = connectQos_->getSessionCbQueueProperty(); // Creates a default property for us if none is available
00358          const AddressBaseRef &addr = prop.getCurrentCallbackAddress();
00359          global_.getCbServerPluginManager().releasePlugin( instanceName_, addr->getType(), addr->getVersion() );
00360          cbServer_ = NULL;
00361       }
00362       else ret3 = false;
00363    }
00364    return ret1 && ret3;
00365 }
00366 
00367 string XmlBlasterAccess::getId()
00368 {
00369    return getSessionName();
00370 }
00371 
00372 SessionNameRef XmlBlasterAccess::getSessionNameRef()
00373 {
00374    if (!connectReturnQos_.isNull()) return connectReturnQos_->getSessionQos().getSessionName();
00375    return connectQos_->getSessionQos().getSessionName();
00376 }
00377 
00378 string XmlBlasterAccess::getSessionName()
00379 {
00380    string ret;
00381    if (!connectReturnQos_.isNull()) ret = connectReturnQos_->getSessionQos().getAbsoluteName();
00382    if (ret == "") ret = connectQos_->getSessionQos().getAbsoluteName();
00383    return ret;
00384 }
00385 
00386 string XmlBlasterAccess::getLoginName()
00387 {
00388    try {
00389       string nm = connectQos_->getSecurityQos().getUserId();
00390       if (nm != "") return nm;
00391    }
00392    catch (XmlBlasterException e) {
00393       log_.warn(ME, e.toString());
00394    }
00395    return string("client?");
00396 }
00397 
00398 void XmlBlasterAccess::setServerNodeId(const string& nodeId)
00399 {
00400    serverNodeId_ = nodeId;
00401 }
00402 
00403 string XmlBlasterAccess::getServerNodeId() const
00404 {
00405    return serverNodeId_;
00406 }
00407 
00408 /*
00409 MsgQueueEntry
00410 XmlBlasterAccess::queueMessage(const MsgQueueEntry& entry)
00411 {
00412  return entry;
00413 }
00414 
00415 vector<MsgQueueEntry*>
00416 XmlBlasterAccess::queueMessage(const vector<MsgQueueEntry*>& entries)
00417 {
00418    return entries;
00419 }
00420 */
00421 
00422 SubscribeReturnQos XmlBlasterAccess::subscribe(const SubscribeKey& key, const SubscribeQos& qos, I_Callback *callback)
00423 {
00424    // locking until finished 
00425    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00426    
00427    if (!isConnected()) {
00428       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::subscribe", "you are not connected to the xmlBlaster");
00429    }
00430    if (log_.call()) log_.call(ME, "subscribe");
00431    if (log_.dump()) {
00432       log_.dump(ME, string("subscribe. The key:\n") + key.toXml());
00433       log_.dump(ME, string("subscribe. The Qos:\n") + qos.toXml());
00434    }
00435 
00436    SessionNameRef sessionName = getSessionNameRef();
00437    if (sessionName->getPubSessionId() > 0 &&
00438       qos.getMultiSubscribe()==false &&
00439       !qos.hasSubscriptionId()) {
00440       // For failsave clients we generate on client side the subscriptionId
00441       // In case of offline/clientSideQueued operation we guarantee like this a not changing
00442       // subscriptionId and the client code can reliably use the subscriptionId for further dispatching
00443       // of update() messages.
00444       SubscribeQos& q = const_cast<SubscribeQos&>(qos);
00445       q.generateSubscriptionId(sessionName, key);
00446       if (log_.trace()) log_.trace(ME, "subscribe: generated client side subscriptionId=" + q.getData().getSubscriptionId());
00447    }
00448 
00449    if (callback != 0) { // using a subscribe specific callback?
00450       if (log_.trace()) log_.trace(ME, "subscribe: inserting individual callback in callback map");
00451       org::xmlBlaster::util::thread::Lock lockUpdate(updateMutex_);
00452       SubscribeReturnQos retQos = connection_->subscribe(key, qos);
00453       std::string subId = retQos.getSubscriptionId();
00454       subscriptionCallbackMap_.insert(std::map<std::string, I_Callback*>::value_type(subId, callback));
00455       return retQos;
00456    }
00457    else {
00458       if (log_.trace()) log_.trace(ME, "subscribe: no specific callback");
00459       return connection_->subscribe(key, qos);
00460    }
00461 }
00462 
00463 vector<MessageUnit> XmlBlasterAccess::get(const GetKey& key, const GetQos& qos)
00464 {
00465    // locking until finished 
00466    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00467    if (!isConnected()) {
00468       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::get", "you are not connected to the xmlBlaster");
00469    }
00470    if (log_.call()) log_.call(ME, "get");
00471    if (log_.dump()) {
00472       log_.dump(ME, string("get. The key:\n") + key.toXml());
00473       log_.dump(ME, string("get. The Qos:\n") + qos.toXml());
00474    }
00475    return connection_->get(key, qos);
00476 }
00477 
00478 vector<MessageUnit> XmlBlasterAccess::receive(string oid, int maxEntries, long timeout, bool consumable) {
00479    if (!isConnected()) {
00480       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::receive", "you are not connected to the xmlBlaster");
00481    }
00482    if (log_.call()) log_.call(ME, "receive");
00483    
00484     //topic/hello          to access a history queue,
00485     //client/joe           to access a subject queue or
00486     //client/joe/session/1 
00487    if (oid.find("topic") != string::npos)
00488       oid = "__cmd:"+oid+"/?historyQueueEntries"; // "__cmd:topic/hello/?historyQueueEntries"
00489    else if (oid.find("session") != string::npos)
00490       oid = "__cmd:"+oid+"/?callbackQueueEntries"; // "__cmd:client/joe/session/1/?callbackQueueEntries";
00491    else if (oid.find("subject") != string::npos)
00492       oid = "__cmd:"+oid+"/?subjectQueueEntries"; // "__cmd:client/joe/?subjectQueueEntries"
00493    else
00494       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::receive", "Can't parse '" + oid + "'");
00495 
00496    GetKey getKey(global_, oid);
00497    QueryQosData data(global_);
00498    data.setQueryQos(maxEntries, timeout, consumable);
00499    GetQos getQos(global_, data);
00500    vector<MessageUnit> msgs = get(getKey, getQos);
00501    if (log_.trace()) log_.trace(ME, string("receive - got '") + lexical_cast<std::string>(msgs.size()) + "'");
00502    return msgs;
00503 }
00504 
00505 
00506 vector<UnSubscribeReturnQos>
00507 XmlBlasterAccess::unSubscribe(const UnSubscribeKey& key, const UnSubscribeQos& qos)
00508 {
00509    // locking until finished 
00510    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00511    if (!isConnected()) {
00512       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::unsSubscribe", "you are not connected to the xmlBlaster");
00513    }
00514    if (log_.call()) log_.call(ME, "unSubscribe");
00515    if (log_.dump()) {
00516       log_.dump(ME, string("unSubscribe. The key:\n") + key.toXml());
00517       log_.dump(ME, string("unSubscribe. The Qos:\n") + qos.toXml());
00518    }
00519    // synchronization
00520    org::xmlBlaster::util::thread::Lock lock1(updateMutex_);
00521    vector<UnSubscribeReturnQos> ret = connection_->unSubscribe(key, qos);
00522    vector<UnSubscribeReturnQos>::iterator iter = ret.begin();
00523    while (iter != ret.end()) {
00524       if (log_.trace()) log_.trace(ME, std::string("unSubscribe: removing callback for '") + (*iter).getSubscriptionId() + "'");
00525       subscriptionCallbackMap_.erase((*iter).getSubscriptionId());
00526       iter++;
00527    }
00528    return ret;
00529 }
00530 
00531 PublishReturnQos XmlBlasterAccess::publish(const MessageUnit& msgUnit)
00532 {
00533    // locking until finished 
00534    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00535    if (!isConnected()) {
00536       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publish", "you are not connected to the xmlBlaster");
00537    }
00538    if (log_.call()) log_.call(ME, "publish");
00539    if (log_.dump()) {
00540       log_.dump(ME, string("publish. The msgUnit:\n") + msgUnit.toXml());
00541    }
00542    return connection_->publish(msgUnit);
00543 }
00544 
00545 void XmlBlasterAccess::publishOneway(const vector<MessageUnit>& msgUnitArr)
00546 {
00547    // locking until finished 
00548    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00549    if (!isConnected()) {
00550       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publishOneway", "you are not connected to the xmlBlaster");
00551    }
00552    if (log_.call()) log_.call(ME, "publishOneway");
00553    if (log_.dump()) {
00554       for (vector<MessageUnit>::size_type i=0; i < msgUnitArr.size(); i++) {
00555          log_.dump(ME, string("publishOneway. The msgUnit[") + lexical_cast<std::string>(i) + "]:\n" + msgUnitArr[i].toXml());
00556       }
00557    }
00558    connection_->publishOneway(msgUnitArr);
00559 }
00560 
00561 vector<PublishReturnQos> XmlBlasterAccess::publishArr(const vector<MessageUnit> &msgUnitArr)
00562 {
00563    // locking until finished 
00564    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00565    if (!isConnected()) {
00566       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::publishArr", "you are not connected to the xmlBlaster");
00567    }
00568    if (log_.call()) log_.call(ME, "publishArr");
00569    if (log_.dump()) {
00570       for (vector<MessageUnit>::size_type i=0; i < msgUnitArr.size(); i++) {
00571          log_.dump(ME, string("publishArr. The msgUnit[") + lexical_cast<std::string>(i) + "]:\n" + msgUnitArr[i].toXml());
00572       }
00573    }
00574    return connection_->publishArr(msgUnitArr);
00575 }
00576 
00577 vector<EraseReturnQos> XmlBlasterAccess::erase(const EraseKey& key, const EraseQos& qos)
00578 {
00579    // locking until finished 
00580    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00581    if (!isConnected()) {
00582       throw XmlBlasterException(USER_NOT_CONNECTED, ME + "::erase", "you are not connected to the xmlBlaster");
00583    }
00584    if (log_.call()) log_.call(ME, "erase");
00585    if (log_.dump()) {
00586       log_.dump(ME, string("erase. The key:\n") + key.toXml());
00587       log_.dump(ME, string("erase. The Qos:\n") + qos.toXml());
00588    }
00589    return connection_->erase(key, qos);
00590 }
00591 
00592 string
00593 XmlBlasterAccess::update(const string &sessionId, UpdateKey &updateKey, const unsigned char *content, long contentSize, UpdateQos &updateQos)
00594 {
00595    if (log_.call()) log_.call(ME, "::update");
00596    if (log_.trace()) log_.trace(ME, string("update. The sessionId is '") + sessionId + "'");
00597    if (log_.dump()) {
00598       log_.dump(ME, string("update. The key:\n") + updateKey.toXml());
00599       log_.dump(ME, string("update. The Qos:\n") + updateQos.toXml());
00600    }
00601 
00602    if (!subscriptionCallbackMap_.empty()) {
00603       // This is synchronized but you must ensure the callback is still in scope when the update method is 
00604       // invoked. This could be more robust with a reference counted I_Callback.
00605       I_Callback* subscriptionCallback = 0;
00606       {
00607          org::xmlBlaster::util::thread::Lock lock(updateMutex_);
00608          CallbackMapType::iterator iter = subscriptionCallbackMap_.end();
00609          iter = subscriptionCallbackMap_.find(updateQos.getSubscriptionId());
00610          if (iter != subscriptionCallbackMap_.end()) subscriptionCallback = (*iter).second;
00611       }
00612 
00613       if (subscriptionCallback != 0) {
00614          if (log_.trace()) log_.trace(ME, std::string("update: invoking specific subscription callback"));
00615          return subscriptionCallback->update(sessionId, updateKey, content, contentSize, updateQos);
00616       }
00617    }
00618 
00619    if (updateClient_)
00620       return updateClient_->update(sessionId, updateKey, content, contentSize, updateQos);
00621    else {
00622       // See similar behavior in XmlBlasterAccess.java
00623       log_.error(ME, string("Ignoring unexpected update message as client has not registered a callback: ") + updateKey.toXml() + "" + updateQos.toXml());
00624    }
00625 
00626    return Constants::RET_OK; // "<qos><state id='OK'/></qos>";
00627 }
00628 
00629 std::string XmlBlasterAccess::usage()
00630 {
00631    string text = string("\n");
00632    text += string("Choose a connection protocol:\n");
00633    text += string("   -protocol           Specify a protocol to talk with xmlBlaster, choose 'SOCKET' or 'IOR' depending on your compilation.\n");
00634    text += string("                       Current setting is '") + Global::getInstance().getProperty().getStringProperty("protocol", Global::getDefaultProtocol());
00635    text += string("\n\n");
00636    text += string("Security features:\n");
00637    text += string("   -Security.Client.DefaultPlugin \"gui,1.0\"\n");
00638    text += string("                       Force the given authentication schema, here the GUI is enforced\n");
00639    text += string("                       Clients can overwrite this with ConnectQos.java\n");
00640 
00641    return text; // std::cout << text << std::endl;
00642 }
00643 
00644 void XmlBlasterAccess::initFailsafe(I_ConnectionProblems* connectionProblems)
00645 {
00646    if (connection_) connection_->initFailsafe(connectionProblems);
00647    else connectionProblems_ = connectionProblems;   
00648 }
00649 
00650 string XmlBlasterAccess::ping()
00651 {
00652    // locking until finished 
00653    org::xmlBlaster::util::thread::Lock lock(invocationMutex_);
00654    return connection_->ping("<qos/>");
00655 }
00656 
00657 long XmlBlasterAccess::flushQueue()
00658 {
00659    if (!connection_) {
00660       throw XmlBlasterException(INTERNAL_NULLPOINTER, ME + "::flushQueue", "no connection exists when trying to flush the queue: try to connect to xmlBlaster first");
00661    }
00662    return connection_->flushQueue();
00663 }
00664 
00665 
00666 bool XmlBlasterAccess::isConnected() const
00667 {
00668    if (!connection_) return false;
00669    return connection_->isConnected();
00670 }
00671 
00672 bool XmlBlasterAccess::isAlive() const
00673 {
00674    if (!connection_) return false;
00675    return connection_->isAlive();
00676 }
00677 
00678 bool XmlBlasterAccess::isPolling() const
00679 {
00680    if (!connection_) return false;
00681    return connection_->isPolling();
00682 }
00683 
00684 bool XmlBlasterAccess::isDead() const
00685 {
00686    if (!connection_) return false;
00687    return connection_->isDead();
00688 }
00689  
00690 
00691 std::string XmlBlasterAccess::getStatusString() const
00692 {
00693    if (!connection_) return "DEAD";
00694    return connection_->getStatusString();
00695 }
00696 
00697 
00698 }}} // namespaces
00699 
00700 
00701 #ifdef _XMLBLASTER_CLASSTEST
00702 
00703 #include <util/Timestamp.h>
00704 #include <util/thread/ThreadImpl.h>
00705 
00706 using namespace std;
00707 using namespace org::xmlBlaster::client;
00708 using namespace org::xmlBlaster::util::thread;
00709 
00710 int main(int args, char* argv[])
00711 {
00712     // Init the XML platform
00713     try
00714     {
00715        Global& glob = Global::getInstance();
00716        glob.initialize(args, argv);
00717        Log& log = glob.getLog("org.xmlBlaster.client");
00718 
00719        XmlBlasterAccess xmlBlasterAccess(glob);
00720        ConnectQos connectQos(glob);
00721 
00722        log.info("main", string("the connect qos is: ") + connectQos.toXml());
00723 
00724        ConnectReturnQosRef retQos = xmlBlasterAccess.connect(connectQos, NULL);
00725        log.info("", "Successfully connect to xmlBlaster");
00726 
00727        if (log.trace()) log.trace("main", "Subscribing using XPath syntax ...");
00728        SubscribeKey subKey(glob,"//test","XPATH");
00729        log.info("main", string("subscribe key: ") + subKey.toXml());
00730        SubscribeQos subQos(glob);
00731        log.info("main", string("subscribe qos: ")  + subQos.toXml());
00732        try {
00733           SubscribeReturnQos subReturnQos = xmlBlasterAccess.subscribe(subKey, subQos);
00734           log.info("main", string("Success: Subscribe return qos=") +
00735                    subReturnQos.toXml() + " done");
00736        }
00737        catch (XmlBlasterException &ex) {
00738           log.error("main", ex.toXml());
00739        }
00740 
00741        PublishKey pubKey(glob);
00742        pubKey.setOid("HelloWorld");
00743        pubKey.setClientTags("<test></test>");
00744        PublishQos pubQos(glob);
00745        MessageUnit msgUnit(pubKey, string("Hi"), pubQos);
00746 
00747        PublishReturnQos pubRetQos = xmlBlasterAccess.publish(msgUnit);
00748        log.info("main", string("successfully published, publish return qos: ") + pubRetQos.toXml());
00749 
00750        log.info("", "Successfully published a message to xmlBlaster");
00751        log.info("", "Sleeping");
00752        Timestamp delay = 10000000000ll; // 10 seconds
00753        Thread::sleep(delay);
00754    }
00755    catch (XmlBlasterException &ex) {
00756       std::cout << ex.toXml() << std::endl;
00757    }
00758    return 0;
00759 }
00760 
00761 #endif