demo/c++/PublishDemo.cpp

Go to the documentation of this file.
00001 /*-----------------------------------------------------------------------------
00002 Name:      PublishDemo.cpp
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Little demo to show how a publish is done
00006 -----------------------------------------------------------------------------*/
00007 
00008 #include <client/XmlBlasterAccess.h>
00009 #include <util/Global.h>
00010 #include <util/lexical_cast.h>
00011 #include <util/qos/ClientProperty.h>
00012 #include <util/queue/PublishQueueEntry.h>
00013 #include <authentication/SecurityQos.h>
00014 #include <iostream>
00015 #include <fstream>
00016 #include <map>
00017 
00018 using namespace std;
00019 using namespace org::xmlBlaster::client;
00020 using namespace org::xmlBlaster::util;
00021 using namespace org::xmlBlaster::util::qos;
00022 using namespace org::xmlBlaster::util::qos::storage;
00023 using namespace org::xmlBlaster::util::queue;
00024 using namespace org::xmlBlaster::client::qos;
00025 using namespace org::xmlBlaster::client::key;
00026 
00027 static unsigned long filesize(ifstream &ins)
00028 {
00029    unsigned long s,e,c;
00030    c = ins.tellg();        // save current file position
00031    ins.seekg(0, ios::end); // position at end
00032    e = ins.tellg();
00033    ins.seekg(0, ios::beg); // position at beginning
00034    s = ins.tellg();
00035    ins.seekg(c);           // restore file position
00036    return e-s;
00037 }
00038 
00039 static int fileRead(string &fn, string &content)
00040 {
00041    unsigned char *buf;
00042    ifstream ins(fn.c_str(), ios_base::binary);
00043    if (!ins.is_open()) return -1;
00044    int   fs   = filesize(ins);
00045    buf  = new unsigned char [fs+1];
00046    buf[fs]    = 0; // so we can assign to string
00047    ins.read((char *)buf,fs);
00048    ins.close();
00049    content = (char *)buf;
00050    delete [] buf;
00051    return fs;
00052 }
00053 
00054 
00055 class PublishDemo : public org::xmlBlaster::util::dispatch::I_PostSendListener
00056 {
00057 private:
00058    string ME;
00059    Global& global_;
00060    I_Log& log_;
00061    char ptr[2];
00062    XmlBlasterAccess connection_;
00063    bool interactive;
00064    bool oneway;
00065    long sleep;
00066    int numPublish;
00067    string oid;
00068    string domain;
00069    string clientTags;
00070    string contentStr;
00071    string contentFile;
00072    PriorityEnum priority;
00073    bool persistent;
00074    long lifeTime;
00075    bool forceUpdate;
00076    bool forceDestroy;
00077    bool readonly;
00078    long destroyDelay;
00079    bool createDomEntry;
00080    long historyMaxMsg;
00081    bool forceQueuing;
00082    bool subscribable;
00083    string destination;
00084    bool doErase;
00085    bool disconnect;
00086    bool eraseTailback;
00087    int contentSize;
00088    bool eraseForceDestroy;
00089    QosData::ClientPropertyMap clientPropertyMap;
00090 
00091 public:
00092    PublishDemo(Global& glob) 
00093       : ME("PublishDemo"), 
00094         global_(glob), 
00095         log_(glob.getLog("demo")),
00096         connection_(global_)
00097    {
00098       initEnvironment();
00099       run();
00100    }
00101 
00102    void run() 
00103    {
00104       connect();
00105       publish();
00106       erase();
00107       connection_.disconnect(DisconnectQos(global_));
00108    }
00109 
00110    void initEnvironment();
00111 
00112    void connect();
00113 
00114    void publish();
00115 
00116    void erase()
00117    {
00118       if (doErase) {
00119          if (interactive) {
00120             string outStr = "Hit 'e' to erase topic '" + oid + "' ('q' to exit without erase) >> "; 
00121             string ret = org::xmlBlaster::util::waitOnKeyboardHit(outStr);
00122             if (ret == "q") return;
00123          }
00124          log_.info(ME, "Erasing topic '" + oid + "'");
00125          EraseKey key(global_);
00126          key.setOid(oid);
00127          EraseQos eq(global_);
00128          eq.setForceDestroy(eraseForceDestroy);
00129          connection_.erase(key, eq);
00130       }
00131    }
00132    
00138    void postSend(const org::xmlBlaster::util::queue::MsgQueueEntry &msgQueueEntry)
00139    {
00140      if (msgQueueEntry.isPublish()) {
00141           const PublishQueueEntry* entry = dynamic_cast<const PublishQueueEntry*>(&msgQueueEntry);
00142         const PublishReturnQos* qos = entry->getPublishReturnQos();
00143         log_.info(ME, "Tailback message is send from client queue, state=" + qos->getState() + ": " + msgQueueEntry.getMsgUnit().getContentStr());
00144      }
00145      else {
00146         log_.info(ME, "Tailback message is send from client queue");
00147      }
00148    }
00149 
00150 };
00151 
00152 void PublishDemo::initEnvironment()
00153 {
00154    interactive = global_.getProperty().get("interactive", true);
00155    oneway = global_.getProperty().get("oneway", false);
00156    sleep = global_.getProperty().get("sleep", 1000L);
00157    numPublish = global_.getProperty().get("numPublish", 1);
00158    oid = global_.getProperty().get("oid", string("Hello"));
00159    domain = global_.getProperty().get("domain", string(""));
00160    clientTags = global_.getProperty().get("clientTags", ""); // "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>");
00161    contentStr = global_.getProperty().get("content", "Hi-%counter");
00162    contentFile = global_.getProperty().get("contentFile", "");
00163    priority = int2Priority(global_.getProperty().get("priority", NORM_PRIORITY));
00164    persistent = global_.getProperty().get("persistent", true);
00165    lifeTime = global_.getProperty().get("lifeTime", -1L);
00166    forceUpdate = global_.getProperty().get("forceUpdate", true);
00167    forceDestroy = global_.getProperty().get("forceDestroy", false);
00168    readonly = global_.getProperty().get("readonly", false);
00169    destroyDelay = global_.getProperty().get("destroyDelay", 60000L);
00170    createDomEntry = global_.getProperty().get("createDomEntry", true);
00171    historyMaxMsg = global_.getProperty().get("queue/history/maxEntries", -1L);
00172    forceQueuing = global_.getProperty().get("forceQueuing", true);
00173    subscribable = global_.getProperty().get("subscribable", true);
00174    destination = global_.getProperty().get("destination", "");
00175    doErase = global_.getProperty().get("doErase", true);
00176    disconnect = global_.getProperty().get("disconnect", true);
00177    eraseTailback = global_.getProperty().get("eraseTailback", false);
00178    contentSize = global_.getProperty().get("contentSize", -1); // 2000000);
00179    eraseForceDestroy = global_.getProperty().get("erase.forceDestroy", false);
00180 
00181    //TODO: Needs to be ported similar to Java
00182    //map<std::string,std::string> clientPropertyMap = global_.getProperty().get("clientProperty", map<std::string,std::string>());
00183    string clientPropertyKey = global_.getProperty().get("clientProperty.key", string(""));
00184    string clientPropertyValue = global_.getProperty().get("clientProperty.value", string(""));
00185    string clientPropertyEncoding = global_.getProperty().get("clientProperty.encoding", ""); // Force to Constants::ENCODING_BASE64="base64"
00186    string clientPropertyCharset = global_.getProperty().get("clientProperty.charset", ""); // Force to e.g. "windows-1252"
00187    string clientPropertyType = global_.getProperty().get("clientProperty.type", ""); // Date type, see Constants::TYPE_DOUBLE, Constants::TYPE_STRING etc
00188    if (clientPropertyKey != "") {
00189       ClientProperty cp(clientPropertyKey, clientPropertyValue, clientPropertyType, clientPropertyEncoding);
00190       if (clientPropertyCharset != "") cp.setCharset(clientPropertyCharset);
00191       //
00192       // Returns "en_US.UTF-8" on Linux and "English_United States.1252" on WinXP
00193       //char *p = setlocale(LC_CTYPE, "");
00194       //log_.info(ME, "setlocale CTYPE returns: " + string(p));
00195       // But java (server on Linux or Windows) can't handle "English_United States.1252" or "1252": java.io.UnsupportedEncodingException: 1252
00196       // but it can handle conversion from "windows-1252" to "UTF-8"
00197       // Further, java does: UnsupportedEncodingException: en_US.UTF-8
00198       // but likes "UTF-8"
00199       //What else instead of setlocal() could we use for automatic charset detection of this C++ client (which is compatible to Java used names)?
00200       clientPropertyMap.insert(QosData::ClientPropertyMap::value_type(clientPropertyKey, cp));
00201    }
00202 
00203    if (historyMaxMsg < 1 && !global_.getProperty().propertyExists("destroyDelay"))
00204       destroyDelay = 24L*60L*60L*1000L; // Increase destroyDelay to one day if no history queue is used
00205 
00206    log_.info(ME, "You can use for example '-session.name publisher/1 -passwd secret' to pass your credentials");
00207    log_.info(ME, "Used settings are:");
00208    log_.info(ME, "   -interactive    " + lexical_cast<string>(interactive));
00209    log_.info(ME, "   -sleep          " + lexical_cast<string>(sleep)); // org.jutils.time.TimeHelper.millisToNice(sleep));
00210    log_.info(ME, "   -oneway         " + lexical_cast<string>(oneway));
00211    log_.info(ME, "   -doErase        " + lexical_cast<string>(doErase));
00212    log_.info(ME, "   -disconnect     " + lexical_cast<string>(disconnect));
00213    log_.info(ME, "   -eraseTailback  " + lexical_cast<string>(eraseTailback));
00214    log_.info(ME, " Pub/Sub settings");
00215    log_.info(ME, "   -numPublish     " + lexical_cast<string>(numPublish));
00216    log_.info(ME, "   -oid            " + lexical_cast<string>(oid));
00217    log_.info(ME, "   -domain         " + lexical_cast<string>(domain));
00218    log_.info(ME, "   -clientTags     " + clientTags);
00219    if (contentSize >= 0) {
00220       log_.info(ME, "   -content        [generated]");
00221       log_.info(ME, "   -contentSize    " + lexical_cast<string>(contentSize));
00222    }
00223    else if (contentFile.size() > 0) {
00224       log_.info(ME, "   -contentFile    " + contentFile);
00225    }
00226    else {
00227       log_.info(ME, "   -content        " + contentStr);
00228       log_.info(ME, "   -contentSize    " + lexical_cast<string>(contentStr.length()));
00229    }
00230    log_.info(ME, "   -priority       " + lexical_cast<string>(priority));
00231    log_.info(ME, "   -persistent     " + lexical_cast<string>(persistent));
00232    log_.info(ME, "   -lifeTime       " + lexical_cast<string>(lifeTime)); // org.jutils.time.TimeHelper.millisToNice(lifeTime));
00233    log_.info(ME, "   -forceUpdate    " + lexical_cast<string>(forceUpdate));
00234    log_.info(ME, "   -forceDestroy   " + lexical_cast<string>(forceDestroy));
00235    if (clientPropertyMap.size() > 0) {
00236       QosData::ClientPropertyMap::const_iterator mi;
00237       for (mi=clientPropertyMap.begin(); mi!=clientPropertyMap.end(); ++mi) {
00238          log_.info(ME, "   -clientProperty["+mi->first+"]   " + mi->second.getStringValue());
00239       }
00240    }
00241    else {
00242       log_.info(ME, "   -clientProperty[]   ");
00243    }
00244    log_.info(ME, " Topic settings");
00245    log_.info(ME, "   -readonly       " + lexical_cast<string>(readonly));
00246    log_.info(ME, "   -destroyDelay   " + lexical_cast<string>(destroyDelay)); // org.jutils.time.TimeHelper.millisToNice(destroyDelay));
00247    log_.info(ME, "   -createDomEntry " + lexical_cast<string>(createDomEntry));
00248    log_.info(ME, "   -queue/history/maxEntries " + lexical_cast<string>(historyMaxMsg));
00249    log_.info(ME, " PtP settings");
00250    log_.info(ME, "   -subscribable  " + lexical_cast<string>(subscribable));
00251    log_.info(ME, "   -forceQueuing   " + lexical_cast<string>(forceQueuing));
00252    log_.info(ME, "   -destination    " + destination);
00253    log_.info(ME, " Erase settings");
00254    log_.info(ME, "   -erase.forceDestroy " + lexical_cast<string>(eraseForceDestroy));
00255    log_.info(ME, "For more info please read:");
00256    log_.info(ME, "   http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html");
00257 }
00258 
00259 void PublishDemo::connect()
00260 {
00261    ConnectQos connQos(global_);
00262    //org::xmlBlaster::authentication::SecurityQos sec(global_, "jack", "secret", "htpasswd,1.0");
00263    //connQos.setSecurityQos(sec);
00264    
00265    connection_.registerPostSendListener(this);
00266 
00267    log_.trace(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos.toXml());
00268    ConnectReturnQos retQos = connection_.connect(connQos, NULL); // no callback
00269    log_.trace(ME, "successfully connected to " + connection_.getServerNodeId() + ". Return qos: " + retQos.toXml());
00270 }
00271 
00272 void PublishDemo::publish()
00273 {
00274    for(int i=0; i<numPublish; i++) {
00275    
00276       if (interactive) {
00277          std::cout << "Hit a key to publish '" + oid + "' #" + lexical_cast<string>(i+1) + "/" + lexical_cast<string>(numPublish) + " ('b' to break) >> ";
00278          std::cin.read(ptr,1);
00279          if (*ptr == 'b') break;
00280       }
00281       else {
00282          if (sleep > 0) {
00283             try {
00284                org::xmlBlaster::util::thread::Thread::sleep(sleep);
00285             }
00286             catch(XmlBlasterException e) {
00287                log_.error(ME, e.toXml());
00288             }
00289          }
00290          log_.info(ME, "Publish '" + oid + "' #" + lexical_cast<string>(i+1) + "/" + lexical_cast<string>(numPublish));
00291       }
00292 
00293       PublishKey key(global_, oid, "text/xml", "1.0");
00294       key.setClientTags(clientTags);
00295       if (domain != "")  key.setDomain(domain);
00296       if (i==0) log_.info(ME, "PublishKey: " + key.toXml());
00297 
00298       PublishQos pq(global_);
00299       pq.setPriority(priority);
00300       pq.setPersistent(persistent);
00301       pq.setLifeTime(lifeTime);
00302       pq.setForceUpdate(forceUpdate);
00303       pq.setForceDestroy(forceDestroy);
00304       pq.setSubscribable(subscribable);
00305       if (clientPropertyMap.size() > 0) {
00306          pq.setClientProperties(clientPropertyMap);
00307          //This is the correct way for a typed property:
00308          pq.addClientProperty("ALONG", long(12L));
00309       }
00310       
00311       if (i == 0) {
00312          TopicProperty topicProperty(global_);
00313          topicProperty.setDestroyDelay(destroyDelay);
00314          topicProperty.setCreateDomEntry(createDomEntry);
00315          topicProperty.setReadonly(readonly);
00316          if (historyMaxMsg >= 0L) {
00317             HistoryQueueProperty prop(global_, "");
00318             prop.setMaxEntries(historyMaxMsg);
00319             topicProperty.setHistoryQueueProperty(prop);
00320          }
00321          pq.setTopicProperty(topicProperty);
00322          log_.info(ME, "Added TopicProperty on first publish: " + topicProperty.toXml());
00323       }
00324       
00325       if (destination != "") {
00326          SessionName sessionName(global_, destination);
00327          Destination dest(global_, sessionName);
00328          dest.forceQueuing(forceQueuing);
00329          pq.addDestination(dest);
00330       }
00331 
00332       log_.info(ME, "mapSize=" + lexical_cast<string>(clientPropertyMap.size()) + " PublishQos: " + pq.toXml());
00333 
00334       string contentTmp = contentStr;
00335       if (contentSize >= 0) {
00336          contentTmp = "";
00337          for (int j=0; j<contentSize; j++)
00338             contentTmp += "X";
00339       }
00340       else if (contentFile.size() > 0) {
00341          fileRead(contentFile, contentTmp);
00342       }
00343       else {
00344          contentTmp = StringTrim::replaceAll(contentTmp, "%counter", lexical_cast<string>(i+1));
00345       }
00346 
00347       MessageUnit msgUnit(key, contentTmp, pq);
00348       if (oneway) {
00349          log_.trace(ME, string("publishOneway() message unit: ") + msgUnit.toXml());
00350          vector<MessageUnit> msgUnitArr;
00351          msgUnitArr.push_back(msgUnit);
00352          connection_.publishOneway(msgUnitArr);
00353          log_.trace(ME, "publishOneway() done");
00354       }
00355       else {
00356          log_.trace(ME, string("publish() message unit: ") + msgUnit.toXml());
00357          PublishReturnQos tmp = connection_.publish(msgUnit);
00358          log_.trace(ME, string("publish return qos: ") + tmp.toXml());
00359       }
00360    }
00361 }
00362 
00363 
00364 static void usage(I_Log& log) 
00365 {
00366    log.plain("PublishDemo usage:", Global::usage());
00367    string str = "\nPlus many more additional command line arguments:";
00368    str += "\n -numPublish (int): the number of publishes which have to be done";
00369    str += "\n -sleep (ms): the delay to wait between each publish. If negative (default) it does not wait";
00370    str += "\n ...";
00371    str += "\nExample:\n";
00372    str += "   PublishDemo -trace true -numPublish 1000\n";
00373    str += "   PublishDemo -destination joe -oid Hello -content 'Hi joe'\n";
00374    log.plain("PublishDemo", str);
00375    exit(0);
00376 }
00377 
00378 
00388 int main(int args, char ** argv)
00389 {
00390    try {
00391       org::xmlBlaster::util::Object_Lifetime_Manager::init();
00392       Global& glob = Global::getInstance();
00393       glob.initialize(args, argv);
00394       I_Log& log  = glob.getLog("demo");
00395 
00396       if (glob.wantsHelp()) {
00397          usage(log);
00398       }
00399 
00400       PublishDemo demo(glob);
00401    }
00402    catch (XmlBlasterException& ex) {
00403       std::cout << ex.toXml() << std::endl;
00404    }
00405    catch (bad_exception& ex) {
00406       cout << "bad_exception: " << ex.what() << endl;
00407    }
00408    catch (exception& ex) {
00409       cout << " exception: " << ex.what() << endl;
00410    }
00411    catch (string& ex) {
00412       cout << "string: " << ex << endl;
00413    }
00414    catch (char* ex) {
00415       cout << "char* :  " << ex << endl;
00416    }
00417    catch (...) {
00418       cout << "unknown exception occured" << endl;
00419       XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
00420       cout << e.toXml() << endl;
00421    }
00422 
00423    try {
00424       org::xmlBlaster::util::Object_Lifetime_Manager::fini();
00425    }
00426    catch (...) {
00427       cout << "unknown exception occured in fini()" << endl;
00428       XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
00429       cout << e.toXml() << endl;
00430    }
00431 
00432    return 0;
00433 }