util/qos/address/CallbackAddress.cpp

Go to the documentation of this file.
00001 /*------------------------------------------------------------------------------
00002 Name:      CallbackAddress.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Holding callback address string and protocol string
00006 ------------------------------------------------------------------------------*/
00007 
00008 #include <util/qos/address/CallbackAddress.h>
00009 #include <util/lexical_cast.h>
00010 #include <util/Global.h>
00011 
00012 namespace org { namespace xmlBlaster { namespace util { namespace qos { namespace address {
00013 
00014 using namespace std;
00015 using namespace org::xmlBlaster::util;
00016 
00017 
00018 inline void CallbackAddress::initialize()
00019 {
00020    initHostname(global_.getCbHostname()); // don't use setHostname() as it would set isCardcodedHostname=true
00021    setPort(global_.getProperty().getIntProperty("dispatch/callback/port", getPort()));
00022    setType(global_.getProperty().getStringProperty("protocol", getType()));
00023    setType(global_.getProperty().getStringProperty("dispatch/callback/protocol", getType()));
00024    setCollectTime(global_.getProperty().getLongProperty("dispatch/callback/burstMode/collectTime", DEFAULT_collectTime)); // sync update()
00025    setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/callback/burstMode/maxEntries", DEFAULT_burstModeMaxEntries));
00026    setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/callback/burstMode/maxBytes", DEFAULT_burstModeMaxBytes));
00027    setPingInterval(global_.getProperty().getLongProperty("dispatch/callback/pingInterval", defaultPingInterval_));
00028    setRetries(global_.getProperty().getIntProperty("dispatch/callback/retries", defaultRetries_));
00029    setDelay(global_.getProperty().getLongProperty("dispatch/callback/delay", defaultDelay_));
00030    useForSubjectQueue(global_.getProperty().getBoolProperty("dispatch/callback/useForSubjectQueue", DEFAULT_useForSubjectQueue));
00031    setOneway(global_.getProperty().getBoolProperty("dispatch/callback/oneway", DEFAULT_oneway));
00032    setDispatcherActive(global_.getProperty().getBoolProperty("dispatch/callback/dispatcherActive", DEFAULT_dispatcherActive));
00033    setCompressType(global_.getProperty().getStringProperty("dispatch/callback/compress.type", DEFAULT_compressType));
00034    setMinSize(global_.getProperty().getLongProperty("dispatch/callback/compress.minSize", DEFAULT_minSize));
00035    setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/callback/ptpAllowed", DEFAULT_ptpAllowed));
00036    setSecretSessionId(global_.getProperty().getStringProperty("dispatch/callback/sessionId", DEFAULT_sessionId));
00037    setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/callback/DispatchPlugin/defaultPlugin", DEFAULT_dispatchPlugin));
00038    if (nodeId_ != "") {
00039       setPort(global_.getProperty().getIntProperty("dispatch/callback/port["+nodeId_+"]", getPort()));
00040       setType(global_.getProperty().getStringProperty("dispatch/callback/protocol["+nodeId_+"]", getType()));
00041       setCollectTime(global_.getProperty().getLongProperty("dispatch/callback/burstMode/collectTime["+nodeId_+"]", collectTime_));
00042       setBurstModeMaxEntries(global_.getProperty().getIntProperty("dispatch/callback/burstMode/maxEntries["+nodeId_+"]", getBurstModeMaxEntries()));
00043       setBurstModeMaxBytes(global_.getProperty().getLongProperty("dispatch/callback/burstMode/maxBytes["+nodeId_+"]", getBurstModeMaxBytes()));
00044       setPingInterval(global_.getProperty().getLongProperty("dispatch/callback/pingInterval["+nodeId_+"]", pingInterval_));
00045       setRetries(global_.getProperty().getIntProperty("dispatch/callback/retries["+nodeId_+"]", retries_));
00046       setDelay(global_.getProperty().getLongProperty("dispatch/callback/delay["+nodeId_+"]", delay_));
00047       useForSubjectQueue(global_.getProperty().getBoolProperty("dispatch/callback/useForSubjectQueue["+nodeId_+"]", useForSubjectQueue_));
00048       setOneway(global_.getProperty().getBoolProperty("dispatch/callback/oneway["+nodeId_+"]", oneway_));
00049       setDispatcherActive(global_.getProperty().getBoolProperty("dispatch/callback/dispatcherActive["+nodeId_+"]", dispatcherActive_));
00050       setCompressType(global_.getProperty().getStringProperty("dispatch/callback/compress.type["+nodeId_+"]", compressType_));
00051       setMinSize(global_.getProperty().getLongProperty("dispatch/callback/compress.minSize["+nodeId_+"]", minSize_));
00052       setPtpAllowed(global_.getProperty().getBoolProperty("dispatch/callback/ptpAllowed["+nodeId_+"]", ptpAllowed_));
00053       setSecretSessionId(global_.getProperty().getStringProperty("dispatch/callback/sessionId["+nodeId_+"]", sessionId_));
00054       setDispatchPlugin(global_.getProperty().getStringProperty("dispatch/callback/DispatchPlugin/defaultPlugin["+nodeId_+"]", dispatchPlugin_));
00055    }
00056 }
00057 
00058 
00059 
00060 CallbackAddress::CallbackAddress(Global& global, const string& type, const string nodeId)
00061    : AddressBase(global, "callback")
00062 {
00063    defaultRetries_      = 0;
00064    defaultDelay_        = Constants::MINUTE_IN_MILLIS;
00065    defaultPingInterval_ = Constants::MINUTE_IN_MILLIS;
00066    ME = "CallbackAddress";
00067    if (nodeId != "") nodeId_ = nodeId;
00068    pingInterval_ = defaultPingInterval_;
00069    retries_      = defaultRetries_;
00070    delay_        = defaultDelay_;
00071    initialize();
00072    if (type != "") setType(type);
00073 }
00074 
00075 CallbackAddress::CallbackAddress(const AddressBase& addr) : AddressBase(addr)
00076 {
00077 }
00078 
00079 CallbackAddress& CallbackAddress::operator =(const AddressBase& addr)
00080 {
00081    AddressBase::copy(addr);
00082    return *this;
00083 }
00084 
00089 bool CallbackAddress::useForSubjectQueue()
00090 {
00091    return useForSubjectQueue_;
00092 }
00093 
00098 void CallbackAddress::useForSubjectQueue(bool useForSubjectQueue)
00099 {
00100    useForSubjectQueue_ = useForSubjectQueue;
00101 }
00102 
00103 string CallbackAddress::toString()
00104 {
00105    return getRawAddress();
00106 }
00107 
00111 string CallbackAddress::usage()
00112 {
00113    string text;
00114    text += string("Control xmlBlaster server side callback (if we install a local callback server):\n");
00115    text += string("   -dispatch/callback/sessionId []\n");
00116    text += string("                       The session ID which is passed to our callback server update() method.\n");
00117    text += string("   -dispatch/callback/burstMode/collectTime [") + lexical_cast<std::string>(DEFAULT_collectTime) + string("]\n");
00118    text += string("                       Number of milliseconds xmlBlaster shall collect callback messages.\n");
00119    text += string("                       The burst mode allows performance tuning, try set it to 200.\n");
00120    text += string("   -dispatch/callback/burstMode/maxEntries [" + lexical_cast<std::string>(DEFAULT_burstModeMaxEntries) + "]\n");
00121    text += string("                       The maximum number of callback queue entries to send in a bulk.\n");
00122    text += string("                       -1L takes all entries of highest priority available in the callback ram queue in a bulk.\n");
00123    text += string("   -dispatch/callback/burstMode/maxBytes [" + lexical_cast<std::string>(DEFAULT_burstModeMaxBytes) + "]\n");
00124    text += string("                       The maximum bulk size of callback messages.\n");
00125    text += string("                       -1L takes all entries of highest priority available in the callback ram queue in a bulk.\n");
00126 
00127    text += string("   -dispatch/callback/oneway [") + lexical_cast<std::string>(DEFAULT_oneway) + string("]\n");
00128    text += string("                       Shall the update() messages be send oneway (no application level ACK).\n");
00129 
00130    text += string("   -dispatch/callback/dispatcherActive [") + lexical_cast<string>(DEFAULT_dispatcherActive) + string("]\n");
00131    text += string("                       If false inhibit delivery of callback messages.\n");
00132 
00133    text += string("   -dispatch/callback/pingInterval [") + lexical_cast<std::string>(defaultPingInterval_) + string("]\n");
00134    text += string("                       Pinging every given milliseconds.\n");
00135    text += string("   -dispatch/callback/retries [") + lexical_cast<std::string>(defaultRetries_) + string("]\n");
00136    text += string("                       How often to retry if callback fails.\n");
00137    text += string("                       -1 forever, 0 no retry, > 0 number of retries.\n");
00138    text += string("   -dispatch/callback/delay [") + lexical_cast<std::string>(defaultDelay_) + string("]\n");
00139    text += string("                       Delay between callback retries in millisecond.\n");
00140    //text += string("   -dispatch/callback/compress.type   With which format message be compressed on callback [") + DEFAULT_compressType + string("]\n");
00141    //text += string("   -dispatch/callback/compress.minSize Messages bigger this size in bytes are compressed [") + lexical_cast<std::string>(DEFAULT_minSize) + string("]\n");
00142 
00143    //text += string("   -cb.ptpAllowed      PtP messages wanted? false prevents spamming [") + lexical_cast<std::string>(DEFAULT_ptpAllowed) + string("]\n");
00144    //text += "   -cb.DispatchPlugin/defaultPlugin  Specify your specific dispatcher plugin [" + CallbackAddress.DEFAULT_dispatchPlugin + "]\n";
00145    return text;
00146 }
00147 
00148 }}}}} // namespace
00149 
00150 #ifdef _XMLBLASTER_CLASSTEST
00151 
00152 using namespace std;
00153 using namespace org::xmlBlaster::util::qos::address;
00154 
00156 int main(int args, char* argv[])
00157 {
00158    try {
00159       {
00160          Global& glob = Global::getInstance();
00161          glob.initialize(args, argv);
00162 
00163          CallbackAddress a(glob);
00164          a.setType("SOCKET");
00165          a.setAddress("127.0.0.1:7600");
00166          a.setCollectTime(12345L);
00167          a.setPingInterval(54321L);
00168          a.setRetries(17);
00169          a.setDelay(7890L);
00170          a.setOneway(true);
00171          a.setSecretSessionId("0x4546hwi89");
00172          cout << a.toXml() << endl;
00173       }
00174       {
00175          string nodeId = "heron";
00176 
00177          int                nmax = 8;
00178          const char** argc = new const char*[nmax];
00179          string help = string("-dispatch/callback/sessionId[") +nodeId + string("]");
00180          argc[0] = help.c_str();
00181          argc[1] = "OK";
00182          argc[2] = "dispatch/callback/sessionId";
00183          argc[3] = "ERROR";
00184          argc[4] = "-cb.pingInterval";
00185          argc[5] = "8888";
00186          help = string("-cb.delay[") +nodeId + string("]");
00187          argc[6] = help.c_str();
00188          argc[7] = "8888";
00189 
00190          Global& glob = Global::getInstance();
00191          glob.initialize(nmax, argc);
00192          CallbackAddress a(glob, "RMI", nodeId);
00193          cout << a.toXml() << endl;
00194       }
00195    }
00196    catch(...) {
00197       cout << "Exception in main method" << endl;
00198    }
00199 }
00200 
00201 #endif