client/protocol/corba/DefaultCallback.cpp

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002 Name:      DefaultCallback.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Default implementation of the POA_serverIdl::BlasterCallback.
00006 -----------------------------------------------------------------------------*/
00007 
00008 #ifndef _CLIENT_PROTOCOL_CORBA_DEFAULTCALLBACK_C
00009 #define _CLIENT_PROTOCOL_CORBA_DEFAULTCALLBACK_C
00010 
00011 #include <client/protocol/corba/DefaultCallback.h>
00012 #include <util/Global.h>
00013 
00014 using namespace std;
00015 using namespace org::xmlBlaster::util;
00016 using namespace org::xmlBlaster::client;
00017 using namespace org::xmlBlaster::client::protocol::corba;
00018 using namespace org::xmlBlaster::client::qos;
00019 using namespace org::xmlBlaster::client::key;
00020 
00021 
00022 DefaultCallback::DefaultCallback(Global& global, const string &name, I_Callback *boss,
00023                 /*BlasterCache*/ void* /*cache*/) 
00024 :global_(global), log_(global.getLog("org.xmlBlaster.client.protocol.corba")), msgKeyFactory_(global), msgQosFactory_(global)
00025 {
00026    boss_         = boss;
00027    loginName_    = name;
00028    // cache_ = cache;
00029    if (log_.call()) log_.call(me(),"Entering constructor with argument");
00030 }
00031 
00048 serverIdl::XmlTypeArr* DefaultCallback::update(const char* sessionId,
00049                        const serverIdl::MessageUnitArr& msgUnitArr) UPDATE_THROW_SPECIFIER
00050 {
00051    serverIdl::XmlTypeArr *res = new serverIdl::XmlTypeArr(msgUnitArr.length());
00052    res->length(msgUnitArr.length());
00053 
00054    if (log_.call()) { log_.call(me(), "Receiving update of " + lexical_cast<std::string>(msgUnitArr.length()) + " message ..."); }
00055    
00056    if (msgUnitArr.length() == 0) {
00057       log_.warn(me(), "Entering update() with 0 messages");
00058       return res;
00059    }
00060    for (string::size_type i=0; i < msgUnitArr.length(); i++) {
00061       const serverIdl::MessageUnit &msgUnit = msgUnitArr[i];
00062       UpdateKey *updateKey = 0;
00063       UpdateQos *updateQos = 0;
00064       try {
00065          if (log_.dump()) {
00066             log_.dump(me(), string("update: the key: ") + corbaWStringToString(msgUnit.xmlKey));
00067             log_.dump(me(), string("update: the qos: ") + corbaWStringToString(msgUnit.qos));
00068          }
00069          updateKey = new UpdateKey(global_, msgKeyFactory_.readObject(corbaWStringToString(msgUnit.xmlKey)));
00070          updateQos = new UpdateQos(global_, msgQosFactory_.readObject(corbaWStringToString(msgUnit.qos)));
00071          // Now we know all about the received msg, dump it or do 
00072          // some checks
00073          if (log_.dump()) log_.dump("UpdateKey", string("\n") + updateKey->toXml());
00074          if (log_.dump()) {
00075             string msg = "\n";
00076             for (string::size_type j=0; j < msgUnit.content.length(); j++) 
00077                msg += (char)msgUnit.content[j];
00078             log_.dump("content", "Message received '" + msg + "' with size=" + lexical_cast<std::string>(msgUnit.content.length()));
00079          }
00080          if (log_.dump()) log_.dump("UpdateQos", "\n" + updateQos->toXml());
00081          if (log_.trace()) log_.trace(me(), "Received message [" + updateKey->getOid() + "] from publisher " + updateQos->getSender()->getAbsoluteName());
00082 
00083          //Checking whether the Update is for the Cache or for the boss
00084          //The boss should not be interested in cache updates
00085          bool forCache = false;
00086          //          if( cache_ != null ) {
00087          //             forCache = cache_.update(updateQos.getSubscriptionId(), 
00088          //                                      updateKey.toXml(), msgUnit.content);
00089          //          }
00090          string oneRes = "<qos><state id='OK'/></qos>";
00091          if (!forCache) {
00092             if (boss_) {
00093                int size = 0;
00094                size = msgUnit.content.length();
00095                const unsigned char *content = NULL;
00096                if (size > 0) content = (const unsigned char*)&msgUnit.content[0];
00097                if (log_.trace()) log_.trace(me(), "going to invoke client specific update");
00098                oneRes = boss_->update(sessionId, *updateKey, content, size, *updateQos); 
00099                // Call my boss
00100             }
00101             else log_.warn(me(), "can not update: no callback defined");
00102          }
00103          (*res)[i] = toCorbaWString(oneRes);
00104       } 
00105       catch (serverIdl::XmlBlasterException &e) {
00106          log_.error(me(), string(e.message) + " message is on error state: " + updateKey->toXml());
00107          string oneRes = "<qos><state id='ERROR'/></qos>";
00108          (*res)[i] = toCorbaWString(oneRes);
00109       }
00110       catch(...) {
00111          string tmp = "Exception caught in update() " + lexical_cast<std::string>(msgUnitArr.length()) + " messages are handled as not delivered";
00112          log_.error(me(), tmp);
00113          throw serverIdl::XmlBlasterException("user.update.error", "org.xmlBlaster.client", 
00114                                               "client update failed", "en",
00115                                               tmp.c_str(), "", "", "", "", 
00116                                               "", "");
00117       }
00118 
00119       delete updateKey;
00120       delete updateQos;
00121    } // for every message
00122    return res;
00123 }
00124 
00129 void DefaultCallback::updateOneway(const char* sessionId,
00130                       const serverIdl::MessageUnitArr& msgUnitArr) PING_THROW_SPECIFIER
00131 {
00132    if (log_.call()) { log_.call(me(), "Receiving updateOneway of " + lexical_cast<std::string>(msgUnitArr.length()) + " message ..."); }
00133    
00134    if (msgUnitArr.length() == 0) {
00135       log_.warn(me(), "Entering updateOneway() with 0 messages");
00136       return;
00137    }
00138 
00139    for (string::size_type i=0; i < msgUnitArr.length(); i++) {
00140       UpdateKey *updateKey = 0;
00141       UpdateQos *updateQos = 0;
00142       try {
00143          const serverIdl::MessageUnit &msgUnit = msgUnitArr[i];
00144          try {
00145             updateKey = new UpdateKey(global_, msgKeyFactory_.readObject(corbaWStringToString(msgUnit.xmlKey)));
00146             updateQos = new UpdateQos(global_, msgQosFactory_.readObject(corbaWStringToString(msgUnit.qos)));
00147          } 
00148          catch (serverIdl::XmlBlasterException &e) {
00149             log_.error(me(), string(e.message) );
00150          }
00151 
00152          if (log_.trace()) log_.trace(me(), "Received oneway message [" + updateKey->getOid() + "] from publisher " + updateQos->getSender()->getAbsoluteName());
00153 
00154          if (boss_) {
00155             boss_->update(sessionId, *updateKey,
00156                            (const unsigned char*)&msgUnit.content[0], 
00157                            msgUnit.content.length(), *updateQos); 
00158          }
00159          else
00160             log_.warn(me(), "can not update: no callback defined");
00161       }
00162       catch (const exception& e) {
00163          log_.error(me(), string("Exception caught in updateOneway(), it is not transferred to server: ") + e.what());
00164       }
00165       catch(...) {
00166          log_.error(me(), "Exception caught in updateOneway(), it is not transferred to server");
00167       }
00168 
00169       delete updateKey;
00170       delete updateQos;
00171    } // for each message
00172 
00173 } // updateOneway
00174 
00179 char* DefaultCallback::ping(const char *qos) PING_THROW_SPECIFIER
00180 {
00181    if (log_.call()) log_.call(me(), "ping(" + string(qos) + ") ...");
00182    return CORBA::string_dup("");
00183 } // ping
00184 
00185 
00186 #endif