1 /*-----------------------------------------------------------------------------
  2 Name:      SubscribeDemo.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Little demo to show how a subscribe is done
  6 -----------------------------------------------------------------------------*/
  7 #include <client/XmlBlasterAccess.h>
  8 #include <util/Global.h>
  9 #include <util/lexical_cast.h>
 10 #include <iostream>
 11 #include <sstream>
 12 #include <stdexcept>
 13 
 14 namespace std {
 15   class UpdateException : public exception {
 16   public:
 17     explicit UpdateException(const string&  what_arg );
 18   };
 19 }
 20 
 21 using namespace std;
 22 using namespace org::xmlBlaster::client;
 23 using namespace org::xmlBlaster::util;
 24 using namespace org::xmlBlaster::util::qos;
 25 using namespace org::xmlBlaster::util::dispatch;
 26 using namespace org::xmlBlaster::client::qos;
 27 using namespace org::xmlBlaster::client::key;
 28 
 29 class SubscribeDemo;
 30 class ProgressListener : public org::xmlBlaster::client::protocol::I_ProgressListener
 31 {
 32    SubscribeDemo *demoP;
 33    public:
 34    ProgressListener(SubscribeDemo *p) { this->demoP = p; }
 35    void progress(const std::string& name, unsigned long currBytesRead, unsigned long numBytes);
 36 };
 37 
 38 
 39 /**
 40  * This subscriber subscribes on the topic 'Hello' and dumps
 41  * the received messages. 
 42  * Please start the publisher demo
 43  * <code>
 44  * PublishDemo -numPublish 10
 45  * </code>
 46  * to receives some messages
 47  */
 48 class SubscribeDemo : public I_Callback, public I_ConnectionProblems
 49 {
 50 private:
 51    string           ME;
 52    Global&          global_;
 53    I_Log&           log_;
 54    XmlBlasterAccess connection_;
 55    ProgressListener progressListener;
 56    int updateCounter;
 57    char ptr[1];
 58    string subscriptionId;
 59    bool dispatcherActive;
 60    bool disconnect;
 61    bool interactive;
 62    bool interactiveUpdate;
 63    bool firstTime;
 64    long updateSleep;
 65    bool reportUpdateProgress;
 66    string updateExceptionErrorCode;
 67    string updateExceptionMessage;
 68    string updateExceptionRuntime;
 69    string oid;
 70    string domain;
 71    string xpath;
 72    bool multiSubscribe;
 73    bool persistentSubscribe;
 74    bool notifyOnErase;
 75    bool local;
 76    bool initialUpdate;
 77    bool updateOneway;
 78    bool wantContent;
 79    int historyNumUpdates;
 80    bool historyNewestFirst;
 81    string filterType;
 82    string filterVersion;
 83    string filterQuery;
 84    bool unSubscribe;
 85    int maxContentLength;
 86 
 87 public:
 88    bool doContinue_;
 89 
 90    SubscribeDemo(Global& glob) 
 91       : ME("SubscribeDemo"), 
 92         global_(glob), 
 93         log_(glob.getLog("demo")),
 94         connection_(global_),
 95         progressListener(this),
 96         updateCounter(0)
 97    {
 98       initEnvironment();
 99       doContinue_ = true;
100       firstTime = true;
101       execute();
102    }
103 
104    I_Log& getLog() { return log_; }
105 
106    void execute()
107    {
108       connect();
109 
110       subscribe();
111       
112       log_.info(ME, "Please use PublishDemo to publish a message '"+oid+"', i'm waiting for updates ...");
113 
114       if (interactive) {
115          org::xmlBlaster::util::thread::Thread::sleepSecs(1);
116          bool stop = false;
117          while (!stop) {
118             string dd = dispatcherActive ? "'d' to deactivate dispatcher" : "'a' to activate dispatcher";
119             std::cout << "(Enter " << dd << " 'q' to exit) >> ";
120             std::cin.read(ptr,1);
121             if (*ptr == 'q') stop = true;
122             if (*ptr == 'a') connection_.setCallbackDispatcherActive(true), dispatcherActive=true;
123             if (*ptr == 'd') connection_.setCallbackDispatcherActive(false), dispatcherActive=false;
124          }
125       }
126       else {
127          log_.plain(ME, "I will exit when the publisher destroys the topic ...");
128          while (doContinue_) {
129             org::xmlBlaster::util::thread::Thread::sleepSecs(2);
130          }
131       }
132       
133       unSubscribe_();
134       
135       disconnect_();
136    }
137 
138    void initEnvironment()
139    {
140       dispatcherActive = global_.getProperty().get("dispatcherActive", true);
141       disconnect = global_.getProperty().get("disconnect", true);
142       interactive = global_.getProperty().get("interactive", true);
143       interactiveUpdate = global_.getProperty().get("interactiveUpdate", false);
144       updateSleep = global_.getProperty().get("updateSleep", 0L);
145       reportUpdateProgress = global_.getProperty().get("reportUpdateProgress", false);
146       updateExceptionErrorCode = global_.getProperty().get("updateException.errorCode", string(""));
147       updateExceptionMessage = global_.getProperty().get("updateException.message", string(""));
148       updateExceptionRuntime = global_.getProperty().get("updateException.runtime", string(""));
149       oid = global_.getProperty().get("oid", "");
150       domain = global_.getProperty().get("domain", "");
151       xpath = global_.getProperty().get("xpath", "");
152       multiSubscribe = global_.getProperty().get("multiSubscribe", true);
153       persistentSubscribe = global_.getProperty().get("persistentSubscribe", false);
154       notifyOnErase = global_.getProperty().get("notifyOnErase", true);
155       local = global_.getProperty().get("local", true);
156       initialUpdate = global_.getProperty().get("initialUpdate", true);
157       updateOneway = global_.getProperty().get("updateOneway", false);
158       wantContent = global_.getProperty().get("wantContent", true);
159       historyNumUpdates = global_.getProperty().get("historyNumUpdates", 1);
160       historyNewestFirst = global_.getProperty().get("historyNewestFirst", true);
161       filterType = global_.getProperty().get("filter.type", "GnuRegexFilter");// XPathFilter | ContentLenFilter | Sql92Filter
162       filterVersion = global_.getProperty().get("filter.version", "1.0");
163       filterQuery = global_.getProperty().get("filter.query", "");
164       unSubscribe = global_.getProperty().get("unSubscribe", true);
165       maxContentLength = global_.getProperty().get("maxContentLength", 250);
166 
167       if (oid == "" && xpath == "") {
168          log_.warn(ME, "No -oid or -xpath given, we subscribe to oid='Hello'.");
169          oid = "Hello";
170       }
171 
172       if (updateSleep > 0L && interactiveUpdate == true) {
173          log_.warn(ME, "You can't set 'updateSleep' and  'interactiveUpdate' simultaneous, we reset interactiveUpdate to false");
174          interactiveUpdate = false;
175       }
176 
177       if (updateExceptionErrorCode != "" && updateExceptionRuntime != "") {
178          log_.warn(ME, "You can't throw a runtime and an XmlBlasterException simultaneous, please check your settings "
179                         " -updateException.errorCode and -updateException.runtime");
180          updateExceptionRuntime = "";
181       }
182 
183       log_.info(ME, "Used settings are:");
184       log_.info(ME, "   -dispatcherActive    " + lexical_cast<string>(dispatcherActive));
185       log_.info(ME, "   -interactive         " + lexical_cast<string>(interactive));
186       log_.info(ME, "   -interactiveUpdate   " + lexical_cast<string>(interactiveUpdate));
187       log_.info(ME, "   -updateSleep         " + lexical_cast<string>(updateSleep));
188       log_.info(ME, "   -reportUpdateProgress      " + lexical_cast<string>(reportUpdateProgress));
189       log_.info(ME, "   -updateException.errorCode " + updateExceptionErrorCode);
190       log_.info(ME, "   -updateException.message   " + updateExceptionMessage);
191       log_.info(ME, "   -updateException.runtime   " + updateExceptionRuntime);
192       log_.info(ME, "   -oid                 " + oid);
193       log_.info(ME, "   -domain              " + domain);
194       log_.info(ME, "   -xpath               " + xpath);
195       log_.info(ME, "   -multiSubscribe      " + lexical_cast<string>(multiSubscribe));
196       log_.info(ME, "   -persistentSubscribe " + lexical_cast<string>(persistentSubscribe));
197       log_.info(ME, "   -notifyOnErase       " + lexical_cast<string>(notifyOnErase));
198       log_.info(ME, "   -local               " + lexical_cast<string>(local));
199       log_.info(ME, "   -initialUpdate       " + lexical_cast<string>(initialUpdate));
200       log_.info(ME, "   -updateOneway        " + lexical_cast<string>(updateOneway));
201       log_.info(ME, "   -historyNumUpdates   " + lexical_cast<string>(historyNumUpdates));
202       log_.info(ME, "   -historyNewestFirst  " + lexical_cast<string>(historyNewestFirst));
203       log_.info(ME, "   -wantContent         " + lexical_cast<string>(wantContent));
204       log_.info(ME, "   -maxContentLength    " + lexical_cast<string>(maxContentLength));
205       log_.info(ME, "   -unSubscribe         " + lexical_cast<string>(unSubscribe));
206       log_.info(ME, "   -disconnect          " + lexical_cast<string>(disconnect));
207       log_.info(ME, "   -filter.type         " + filterType);
208       log_.info(ME, "   -filter.version      " + filterVersion);
209       log_.info(ME, "   -filter.query        " + filterQuery);
210       log_.info(ME, "For more info please read:");
211       log_.info(ME, "   http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.subscribe.html");
212    }
213 
214    bool reachedAlive(StatesEnum /*oldState*/, I_ConnectionsHandler* connectionsHandler)
215    {
216       log_.info(ME, "reachedAlive()");
217       if (!firstTime && !connectionsHandler->getConnectReturnQos()->isReconnected() && !persistentSubscribe) {
218          subscribe(); // We lost the old subscription, initialize subscription again
219       }
220       return true;
221    }
222 
223    void reachedDead(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
224    {
225       log_.info(ME, "reachedDead()");
226    }
227 
228    void reachedPolling(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
229    {
230       log_.info(ME, "reachedPolling()");
231    }
232 
233    void connect()
234    {
235       connection_.initFailsafe(this);
236       ConnectQos connQos(global_);
237       connQos.getCbAddress()->setDispatcherActive(dispatcherActive);
238       if (log_.trace()) log_.trace(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos.toXml());
239       ConnectReturnQos retQos = connection_.connect(connQos, this);
240       if (log_.trace()) log_.trace(ME, "successfully connected to xmlBlaster. Return qos: " + retQos.toXml());
241       if (reportUpdateProgress) {
242          connection_.registerProgressListener(&progressListener);
243       }
244    }
245 
246    void subscribe()
247    {
248       SubscribeKey *sk = 0;
249       string qStr = "";
250       try {
251          sk = new SubscribeKey(global_);
252          if (oid.length() > 0) {
253             //sk = new SubscribeKey(global_, oid);
254             qStr = oid;
255             sk->setOid(oid);
256          }
257          else if (xpath.length() > 0) {
258             //sk = new SubscribeKey(global_, xpath, Constants::XPATH);
259             qStr = xpath;
260             sk->setQueryString(xpath);
261          }
262          if (domain.length() > 0) {  // cluster routing information
263             if (sk == 0) sk = new SubscribeKey(global_, "", Constants::D_O_M_A_I_N);
264             sk->setDomain(domain);
265             qStr = domain;
266          }
267          SubscribeQos sq(global_);
268          sq.setWantInitialUpdate(initialUpdate);
269          sq.setWantUpdateOneway(updateOneway);
270          sq.setMultiSubscribe(multiSubscribe);
271          sq.setPersistent(persistentSubscribe);
272          sq.setWantNotify(notifyOnErase);
273          sq.setWantLocal(local);
274          sq.setWantContent(wantContent);
275          
276          HistoryQos historyQos(global_);
277          historyQos.setNumEntries(historyNumUpdates);
278          historyQos.setNewestFirst(historyNewestFirst);
279          sq.setHistoryQos(historyQos);
280 
281          if (filterQuery.length() > 0) {
282             AccessFilterQos filter(global_, filterType, filterVersion, filterQuery);
283             sq.addAccessFilter(filter);
284          }
285 
286          log_.info(ME, "SubscribeKey=" + sk->toXml());
287          log_.info(ME, "SubscribeQos=" + sq.toXml());
288 
289          if (firstTime && interactive) {
290             log_.info(ME, "Hit a key to subscribe '" + qStr + "'");
291             std::cin.read(ptr,1);
292          }
293          firstTime = false;
294 
295          SubscribeReturnQos srq = connection_.subscribe(*sk, sq);
296          subscriptionId = srq.getSubscriptionId();
297 
298          log_.info(ME, "Subscribed on topic '" + ((oid.length() > 0) ? oid : xpath) +
299                       "', got subscription id='" + srq.getSubscriptionId() + "'\n" + srq.toXml());
300          if (log_.dump()) log_.dump("", "Subscribed: " + sk->toXml() + sq.toXml() + srq.toXml());
301          delete sk;
302       }
303       catch (...) { // a finally would have been more appropriate
304          delete sk;
305          throw;
306       }
307    }
308 
309    void unSubscribe_()
310    {
311       if (unSubscribe) {
312          if (interactive) {
313             log_.info(ME, "Hit a key to unSubscribe");
314             std::cin.read(ptr,1);
315          }
316 
317          UnSubscribeKey uk(global_, subscriptionId);
318          if (domain.length() > 0)  // cluster routing information TODO!!!
319             uk.setDomain(domain);
320          UnSubscribeQos uq(global_);
321          log_.info(ME, "UnSubscribeKey=" + uk.toXml());
322          log_.info(ME, "UnSubscribeQos=" + uq.toXml());
323          vector<UnSubscribeReturnQos> urqArr = connection_.unSubscribe(uk, uq);
324          log_.info(ME, "UnSubscribe on " + lexical_cast<string>(urqArr.size()) + " subscriptions done");
325       }
326    }
327 
328    void disconnect_()
329    {
330       if (disconnect) {
331          DisconnectQos dq(global_);
332          connection_.disconnect(dq);
333          log_.info(ME, "Disconnected");
334       }
335    }
336 
337    /**
338     * Here we receive the asynchronous callback messages from the xmlBlaster server. 
339     */
340    string update(const string& sessionId, UpdateKey& updateKey, const unsigned char *content, long contentSize, UpdateQos& updateQos)
341    {
342       stringstream sout;
343 
344       if (updateQos.isErased() && oid.length() > 0) { // Erased topic with EXACT subscription?
345          sout << endl << "============= Topic '" + updateKey.getOid() + "' is ERASED =======================" << endl;
346          log_.plain(ME, sout.str());
347          subscribe();              // topic is erased -> re-subsribe
348          return Constants::RET_OK; // "<qos><state id='OK'/></qos>";
349       }
350 
351       //const Global& global_ = updateKey.getGlobal();
352       ++updateCounter;
353 
354       //NOTE: subscribe("anotherDummy");  -> subscribe in update does not work
355       //      with single threaded 'mico' or SOCKET protocol
356 
357       log_.info(ME, "Receiving update #" + lexical_cast<string>(updateCounter) + " of a message, secret sessionId=" + sessionId + " ...");
358 
359       sout << endl << "============= START #" << updateCounter << " '" << updateKey.getOid() << "' =======================";
360       string contentStr((char*)content, (char*)(content)+contentSize);
361       sout << endl << "<xmlBlaster>";
362       sout << updateKey.toXml("  ");
363       sout << endl << " <content size='" << contentSize << "'>";
364       if (contentSize < maxContentLength)
365          sout << endl << contentStr;
366       else
367          sout << endl << contentStr.substr(0, maxContentLength-5) << " ...";
368       sout << endl << " </content>";
369       sout << updateQos.toXml("  ");
370       sout << endl << "</xmlBlaster>";
371       sout << endl << "============= END #" << updateCounter << " '" << updateKey.getOid() << "' =========================";
372       sout << endl;
373       log_.plain(ME, sout.str());
374 
375       // Dump the ClientProperties decoded (the above dump may contain Base64 encoding):
376       const QosData::ClientPropertyMap& propMap = updateQos.getClientProperties();
377       QosData::ClientPropertyMap::const_iterator mi;
378       for (mi=propMap.begin(); mi!=propMap.end(); ++mi) {
379          const ClientProperty& clientProperty = mi->second;
380          if (clientProperty.isBase64()) {
381             log_.info(ME, "ClientProperty decoded: "+mi->first+"=" + clientProperty.getStringValue());
382          }
383       }
384       // Examples for direct access:
385       if (updateQos.hasClientProperty(string("StringKey"))) {
386          log_.info(ME, "ClientProperty BLA=" +updateQos.getClientProperty("BLA", string("MISSING VALUE?")));
387       }
388       if (updateQos.hasClientProperty(string("ALONG"))) {
389          long aLongValue = updateQos.getClientProperty("ALONG", -1L);
390          log_.info(ME, "ClientProperty ALONG=" + lexical_cast<string>(aLongValue));
391       }
392       
393       if (updateSleep > 0L) {
394          log_.info(ME, "Sleeping for " + lexical_cast<string>(updateSleep) + " millis in callback ...");
395          org::xmlBlaster::util::thread::Thread::sleep(updateSleep);
396          log_.info(ME, "Waking up.");
397       }
398       else if (interactiveUpdate) {
399          log_.info(ME, "Hit a key to return from update() (we are blocking the server callback) ...");
400          std::cin.read(ptr,1);
401          log_.info(ME, "Returning update() - control goes back to server");
402       }
403 
404       if (updateExceptionErrorCode != "") {
405          log_.info(ME, "Throwing XmlBlasterException with errorCode='" + updateExceptionErrorCode + "' back to server ...");
406          throw XmlBlasterException(updateExceptionErrorCode, ME, updateExceptionMessage); 
407       }
408 
409       if (updateExceptionRuntime != "") {
410          log_.info(ME, "Throwing RuntimeException '" + updateExceptionRuntime + "'");
411          //throw UpdateException(updateExceptionRuntime);
412          throw logic_error(updateExceptionRuntime);
413       }
414 
415       return Constants::RET_OK;
416    }
417 };
418 
419 void ProgressListener::progress(const std::string& name, unsigned long currBytesRead, unsigned long numBytes) {
420    demoP->getLog().info("SubscribeDemo", name + "Progress of incoming message is currBytesRead=" +
421             lexical_cast<string>(currBytesRead) + " nbytes=" + lexical_cast<string>(numBytes));
422 }
423 
424 /**
425  * Try
426  * <pre>
427  *   SubscribeDemo -help
428  * </pre>
429  * for usage help
430  */
431 int main(int args, char ** argv)
432 {
433    org::xmlBlaster::util::Object_Lifetime_Manager::init();
434    //TimestampFactory& tsFactory = TimestampFactory::getInstance();
435    //Timestamp startStamp = tsFactory.getTimestamp();
436    //std::cout << " start time: " << tsFactory.toXml(startStamp, "", true) << std::endl;
437 
438    try {
439       Global& glob = Global::getInstance();
440       glob.initialize(args, argv);
441 
442       if (glob.wantsHelp()) {
443          glob.getLog().plain("", Global::usage());
444          glob.getLog().plain("", "\nExample:\n");
445          glob.getLog().plain("", "   SubscribeDemo -trace true -interactiveUpdate true\n");
446          glob.getLog().plain("", "   SubscribeDemo -xpath '//key' -filter.query '^H.*'\n");
447          org::xmlBlaster::util::Object_Lifetime_Manager::fini();
448          return 1;
449       }
450 
451       SubscribeDemo demo(glob);
452 
453       //Timestamp stopStamp = tsFactory.getTimestamp();
454       //std::cout << " end time: " << tsFactory.toXml(stopStamp, "", true) << std::endl;
455       //Timestamp diff = stopStamp - startStamp;
456       //std::cout << " time used for demo: " << tsFactory.toXml(diff, "", true) << std::endl;
457    }
458    catch (XmlBlasterException& ex) {
459       std::cout << ex.toXml() << std::endl;
460    }
461    catch (bad_exception& ex) {
462       cout << "bad_exception: " << ex.what() << endl;
463    }
464    catch (exception& ex) {
465       cout << " exception: " << ex.what() << endl;
466    }
467    catch (string& ex) {
468       cout << "string: " << ex << endl;
469    }
470    catch (char* ex) {
471       cout << "char* :  " << ex << endl;
472    }
473 
474    catch (...)
475    {
476       cout << "unknown exception occured" << endl;
477       XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
478       cout << e.toXml() << endl;
479    }
480 
481    org::xmlBlaster::util::Object_Lifetime_Manager::fini();
482    return 0;
483 }


syntax highlighted by Code2HTML, v. 0.9.1