1 /*-----------------------------------------------------------------------------
  2 Name:      TestSub.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Demo code for a client using xmlBlaster
  6 Version:   $Id: TestSub.cpp 12916 2004-11-18 14:55:44Z ruff $
  7 -----------------------------------------------------------------------------*/
  8 #include "TestSuite.h"
  9 #include <iostream>
 11 /**
 12  * This client tests the method subscribe() with a later publish() with XPath
 13  * query.<br />
 14  * The subscribe() should be recognized for this later arriving publish()<p>
 15  * This client may be invoked multiple time on the same xmlBlaster server,
 16  * as it cleans up everything after his tests are done.
 17  * <p>
 18  */
 20 using namespace std;
 21 using namespace org::xmlBlaster::util;
 22 using namespace org::xmlBlaster::util::qos;
 23 using namespace org::xmlBlaster::util::thread;
 24 using namespace org::xmlBlaster::client;
 25 using namespace org::xmlBlaster::client::qos;
 26 using namespace org::xmlBlaster::client::key;
 27 using namespace org::xmlBlaster::authentication;
 29 namespace org { namespace xmlBlaster { namespace test {
 31 class SpecificCallback : public I_Callback {
 32 private:
 33    int numReceived_;
 34    string name_;
 35    I_Log& log_;
 37 public:
 38    SpecificCallback(I_Log& log, const string& name) : log_(log) {
 39       name_ = name;
 40       numReceived_ = 0;
 41    }
 43    int getCount() {
 44       return numReceived_;
 45    }
 48    string update(const string &sessionId,
 49                UpdateKey &updateKey,
 50                const unsigned char * /*content*/, long /*contentSize*/,
 51                UpdateQos &updateQos) 
 52    {
 53       log_.info("update", string("Receiving update on callback '") + name_ + "' of message oid=" +
 54                 updateKey.getOid() + " state=" + updateQos.getState() +
 55                 " authentication sessionId=" + sessionId + " ...");
 56       numReceived_++;
 57       return "<qos><state id='OK'/></qos>";
 58    }
 61 };
 64 class TestSub: public TestSuite, public virtual I_Callback 
 65 {
 66 private:
 67    bool   messageArrived_;      // = false;
 68    int    numReceived_;         //  = 0;         // error checking
 69    string subscribeOid_;
 70    string publishOid_;          // = "dummy";
 71    string senderName_;
 72    string senderContent_;
 73    string receiverName_;        // sender/receiver is here the same client
 74    string contentMime_;         // = "text/xml";
 75    string contentMimeExtended_; //  = "1.0";
 76    ConnectReturnQos returnQos_;
 77    SpecificCallback *cb1_;
 78    SpecificCallback *cb2_;
 79    SpecificCallback *cb3_;
 81    /** Publish tests */
 82    enum TestType {
 84    };
 86    /**
 87     * Constructs the TestSub object.
 88     * <p />
 89     * @param testName  The name used in the test suite
 90     * @param loginName The name to login to the xmlBlaster
 91     */
 92  public:
 93    TestSub(int args, char *argc[], const string &loginName)
 94       : TestSuite(args, argc, "TestSub"), returnQos_(global_)
 95    {
 96       senderName_          = loginName;
 97       receiverName_        = loginName;
 98       numReceived_         = 0;
 99       publishOid_          = "dummy";
100       contentMime_         = "text/xml";
101       contentMimeExtended_ = "1.0";
102       senderContent_       = "Yeahh, i'm the new content";
103       cb1_ = new SpecificCallback(log_, "callback1");
104       cb2_ = new SpecificCallback(log_, "callback2");
105       cb3_ = new SpecificCallback(log_, "callback3");
106    }
108    virtual ~TestSub() 
109    {
110       delete cb1_;
111       delete cb2_; 
112       delete cb3_;
113    }
115    /**
116     * Sets up the fixture. <p />
117     * Connect to xmlBlaster and login
118     */
119    void setUp() 
120    {
121       log_.info(ME, "Trying to connect to xmlBlaster with C++ client lib " + Global::getVersion() + " from " + Global::getBuildTimestamp());
122       TestSuite::setUp();
123       try {
124          string passwd = "secret";
125          SecurityQos secQos(global_, senderName_, passwd);
126          ConnectQos connQos(global_);
127          connQos.getSessionQosRef()->setPubSessionId(3L);
128          returnQos_ = connection_.connect(connQos, this);
129          string name = returnQos_.getSessionQos().getAbsoluteName();
130          string name1 = returnQos_.getSessionQosRef()->getAbsoluteName();
131          assertEquals(log_, ME, name, name1, string("name comparison for reference"));
133          log_.info(ME, string("connection setup: the session name is '") + name + "'");
134          // Login to xmlBlaster
135       }
136       catch (XmlBlasterException &e) {
137          log_.error(ME, string("Login failed: ") + e.toXml());
138          usage();
139          assert(0);
140       }
141    }
144    /**
145     * Tears down the fixture. <p />
146     * cleaning up .... erase() the previous message OID and logout
147     */
148    void tearDown() 
149    {
150       log_.info(ME, "Cleaning up test - erasing message.");
152       EraseKey eraseKey(global_);
153       eraseKey.setOid(publishOid_);
154       EraseQos eraseQos(global_);
156       vector<EraseReturnQos> retArr;
157       try {
158          retArr = connection_.erase(eraseKey, eraseQos);
159       }
160       catch(XmlBlasterException &e) {
161          log_.error(ME, string("XmlBlasterException: ") + e.toXml());
162       }
163       if (retArr.size() != 1) {
164          log_.error(ME, "Erased " + lexical_cast<string>(retArr.size()) + " messages");
165       }
166       connection_.disconnect(DisconnectQos(global_));
167       TestSuite::tearDown();
168    }
171    /**
172     * TEST: Subscribe to messages with XPATH.<p />
173     * The returned subscribeOid is checked
174     */
175    void testSubscribeXPath() 
176    {
177       if (log_.trace()) log_.trace(ME, "Subscribing using XPath syntax ...");
178       SubscribeKey subKey(global_);
179       subKey.setQueryString("//TestSub-AGENT");
180       SubscribeQos subQos(global_);
181       numReceived_ = 0;
182       subscribeOid_ = "";
183       try {
184          subscribeOid_ = connection_.subscribe(subKey, subQos).getSubscriptionId();
185          log_.info(ME, string("Success: Subscribe subscription-id=") +
186                    subscribeOid_ + " done");
187       }
188       catch(XmlBlasterException &e) {
189          log_.warn(ME, string("XmlBlasterException: ")
190                       + e.toXml());
191          cerr << "subscribe - XmlBlasterException: " << e.toXml() << endl;
192          assert(0);
193       }
194       if (subscribeOid_ == "") {
195          cerr << "returned null subscribeOid" << endl;
196          assert(0);
197       }
198       if (subscribeOid_.length() == 0) {
199          cerr << "returned subscribeOid is empty" << endl;
200          assert(0);
201       }
202    }
204    /**
205     * TEST: Subscribe to messages with specific callback<p />
206     */
207    void testSubscribeSpecificCallback() 
208    {
209       if (log_.trace()) log_.trace(ME, "Subscribing using a specific callback pro subscription ...");
210       string oid1("oid1");
211       string oid2("oid2");
212       string oid3("oid3");
214       SubscribeKey subKey1(global_, oid1);
215       SubscribeKey subKey2(global_, oid2);
216       SubscribeKey subKey3(global_, oid3);
217       SubscribeQos subQos(global_);
219       numReceived_ = 0;
220       subscribeOid_ = "";
221       try {
222          subscribeOid_ = connection_.subscribe(subKey1, subQos, cb1_).getSubscriptionId();
223          /*string sub1 =*/ connection_.subscribe(subKey2, subQos, cb2_).getSubscriptionId();
224          /*string sub2 =*/ connection_.subscribe(subKey3, subQos, cb3_).getSubscriptionId();
226          log_.info(ME, string("Success: Subscribe subscription-id=") + subscribeOid_ + " done");
228          {
229             PublishKey pubKey1(global_);
230             pubKey1.setOid(oid1);
231             PublishQos pubQos(global_);
232             MessageUnit msgUnit(pubKey1, senderContent_, pubQos);
233             connection_.publish(msgUnit);
234          }
236          for (int i=0; i < 2; i++) {
237             PublishKey pubKey2(global_);
238             pubKey2.setOid(oid2);
239             PublishQos pubQos(global_);
240             MessageUnit msgUnit(pubKey2, senderContent_, pubQos);
241             connection_.publish(msgUnit);
242          }
244          for (int i=0; i < 3; i++) {
245             PublishKey pubKey3(global_);
246             pubKey3.setOid(oid3);
247             PublishQos pubQos(global_);
248             MessageUnit msgUnit(pubKey3, senderContent_, pubQos);
249             connection_.publish(msgUnit);
250          }
252          org::xmlBlaster::util::thread::Thread::sleep(2000L); 
253          assertEquals(log_, "specificCallback", 1, cb1_->getCount(), string("callback 1"));
254          assertEquals(log_, "specificCallback", 2, cb2_->getCount(), string("callback 2"));
255          assertEquals(log_, "specificCallback", 3, cb3_->getCount(), string("callback 3"));
257          UnSubscribeKey key(global_);
258          key.setOid(oid1);
259          UnSubscribeQos qos(global_);
260          connection_.unSubscribe(key, qos);
261          key.setOid(oid2);
262          connection_.unSubscribe(key, qos);
263          key.setOid(oid3);
264          connection_.unSubscribe(key, qos);
265       }
266       catch(XmlBlasterException &e) {
267          log_.warn(ME, string("XmlBlasterException: ") + e.toXml());
268          cerr << "subscribe - XmlBlasterException: " << e.toXml() << endl;
269          assert(0);
270       }
271       if (subscribeOid_ == "") {
272          cerr << "returned null subscribeOid" << endl;
273          assert(0);
274       }
275       if (subscribeOid_.length() == 0) {
276          cerr << "returned subscribeOid is empty" << endl;
277          assert(0);
278       }
279    }
282    /**
283     * TEST: Construct a message and publish it. <p />
284     * The returned publishOid is checked
285     */
286    void testPublishCorbaMethods(TestType testType) 
287    {
288       if (log_.trace()) log_.trace(ME, "Publishing a message (old style) ...");
289       numReceived_ = 0;
290       PublishKey pubKey(global_);
291       pubKey.setOid(publishOid_);
292       pubKey.setContentMime(contentMime_);
293       pubKey.setContentMimeExtended(contentMimeExtended_);
294       string xmlKey = string("") +
295          "   <TestSub-AGENT id='' subId='1' type='generic'>" +
296          "      <TestSub-DRIVER id='FileProof' pollingFreq='10'>" +
297          "      </TestSub-DRIVER>"+
298          "   </TestSub-AGENT>";
299       pubKey.setClientTags(xmlKey);
301       PublishQos pubQos(global_);
302       MessageUnit msgUnit(pubKey, senderContent_, pubQos);
303       try {
305          if (testType == TEST_ONEWAY) {
306             vector<MessageUnit> msgUnitArr;
307             msgUnitArr.insert(msgUnitArr.begin(), msgUnit);
308             connection_.publishOneway(msgUnitArr);
309             log_.info(ME, string("Success: Publishing oneway done (old style)"));
310          }
311          else if (testType == TEST_PUBLISH) {
312             string tmp = connection_.publish(msgUnit).getKeyOid();
313             if (tmp.find(publishOid_) == string::npos) {
314                log_.error(ME, "Wrong publishOid: " + tmp);
315                assert(0);
316             }
317             log_.info(ME, string("Success: Publishing with ACK done (old style), returned oid=") +
318                       publishOid_);
319          }
320          else {
321             vector<MessageUnit> msgUnitArr;
322             msgUnitArr.insert(msgUnitArr.begin(), msgUnit);
323             connection_.publishArr(msgUnitArr);
324             log_.info(ME, string("Success: Publishing array done (old style)"));
325          }
326       }
327       catch(XmlBlasterException &e) {
328          log_.warn(ME, string("XmlBlasterException: ")+e.toXml());
329          assert(0);
330       }
331    }
334    /**
335     * TEST: Construct a message and publish it. <p />
336     * The returned publishOid is checked
337     */
338    void testPublishSTLMethods(TestType testType) 
339    {
340       if (log_.trace()) log_.trace(ME, "Publishing a message (the STL way) ...");
341       numReceived_ = 0;
342       string clientTags = string("") +
343          "   <TestSub-AGENT id='' subId='1' type='generic'>" +
344          "      <TestSub-DRIVER id='FileProof' pollingFreq='10'>" +
345          "      </TestSub-DRIVER>"+
346          "   </TestSub-AGENT>";
348       PublishKey key(global_, publishOid_, contentMime_, contentMimeExtended_);
349       key.setClientTags(clientTags);
350       PublishQos pubQos(global_);
351       MessageUnit msgUnit(key, senderContent_, pubQos);
352       try {
353          if (testType == TEST_ONEWAY) {
354             vector<MessageUnit> msgVec;
355             msgVec.push_back(msgUnit);
356             connection_.publishOneway(msgVec);
357             log_.info(ME, string("Success: Publishing oneway done (the STL way)"));
358          }
359          else if (testType == TEST_PUBLISH) {
360             string tmp = connection_.publish(msgUnit).getKeyOid();
361             log_.info(ME, string("the publish oid ='") + tmp + "'");
362          }
363          else {
364             vector<MessageUnit> msgVec;
365             msgVec.push_back(msgUnit);
366             vector<PublishReturnQos> retArr = connection_.publishArr(msgVec);
367             log_.info(ME, string("Success: Publishing array of size " + lexical_cast<string>(retArr.size())
368                                    + " done (the STL way)"));
369          }
370       }
371       catch(XmlBlasterException &e) {
372          log_.warn(ME, string("XmlBlasterException: ")+e.toXml());
373          assert(0);
374       }
375    }
378    /**
379     * TEST: Construct a message and publish it,<br />
380     * the previous XPath subscription should match and send an update.
381     */
382    void testPublishAfterSubscribeXPath() 
383    {
384       testSubscribeXPath();
385       waitOnUpdate(1000L);
386       // Wait some time for callback to arrive ...
387       if (numReceived_ != 0) {
388          log_.error(ME, "numReceived after subscribe = " + lexical_cast<string>(numReceived_));
389          assert(0);
390       }
392       /*
393       testSubscribeXPath();
394       waitOnUpdate(1000L);
395       // Wait some time for callback to arrive ...
396       if (numReceived_ != 0) {
397          log_.error(ME, "numReceived after subscribe = " + lexical_cast<string>(numReceived_));
398          assert(0);
399       }
400       */
402 /*
403       testPublishCorbaMethods(TEST_ONEWAY);
404       waitOnUpdate(2000L);
405       if (numReceived_ != 1) {
406          log_.error(ME,"numReceived after publishing oneway = " + lexical_cast<string>(numReceived_));
407          assert(0);
408       }
410       testPublishCorbaMethods(TEST_PUBLISH);
411       waitOnUpdate(2000L);
412       if (numReceived_ != 1) {
413          log_.error(ME,"numReceived after publishing with ACK = " + lexical_cast<string>(numReceived_));
414          assert(0);
415       }
417       testPublishCorbaMethods(TEST_ARRAY);
418       waitOnUpdate(2000L);
419       if (numReceived_ != 1) {
420          log_.error(ME,"numReceived after publishing with ACK = " + lexical_cast<string>(numReceived_));
421          assert(0);
422       }
423 */
424       testPublishSTLMethods(TEST_ONEWAY);
425       waitOnUpdate(2000L);
426       if (numReceived_ != 1) {
427          log_.error(ME,"numReceived after publishing STL oneway = " + lexical_cast<string>(numReceived_));
428          assert(0);
429       }
430       numReceived_ = 0;
432       testPublishSTLMethods(TEST_PUBLISH);
433       waitOnUpdate(2000L);
434       if (numReceived_ != 1) {
435          log_.error(ME,"numReceived after publishing STL with ACK = " + lexical_cast<string>(numReceived_));
436          assert(0);
437       }
438       numReceived_ = 0;
440       testPublishSTLMethods(TEST_ARRAY);
441       waitOnUpdate(2000L);
442       if (numReceived_ != 1) {
443          log_.error(ME,"numReceived after publishing STL with ACK = " + lexical_cast<string>(numReceived_));
444          assert(0);
445       }
446       numReceived_ = 0;
447    }
450    /**
451     * This is the callback method (I_Callback) invoked from XmlBlasterAccess
452     * informing the client in an asynchronous mode about a new message.
453     * <p />
454     * The raw CORBA-BlasterCallback.update() is unpacked and for each arrived
455     * message this update is called.
456     *
457     * @param sessionId The sessionId to authenticate the callback
458     *                  This sessionId was passed on subscription
459     *                  we can use it to decide if we trust this update()
460     * @param updateKey The arrived key
461     * @param content   The arrived message content
462     * @param qos       Quality of Service of the MessageUnit
463     */
464    string update(const string &sessionId,
465                UpdateKey &updateKey,
466                const unsigned char *content, long contentSize,
467                UpdateQos &updateQos) 
468    {
469       log_.info(ME, string("Receiving update of message oid=") +
470                 updateKey.getOid() + " state=" + updateQos.getState() +
471                 " authentication sessionId=" + sessionId + " ...");
472       numReceived_ ++;
474       string contentStr(reinterpret_cast<char *>(const_cast<unsigned char *>(content)), contentSize);
476       if (updateQos.getState() != Constants::STATE_OK &&
477           updateQos.getState() != org::xmlBlaster::util::Constants::STATE_ERASED) {
478          log_.error(ME, "Unexpected message state=" + updateQos.getState());
479          assert(0);
480       }
482       string name = returnQos_.getSessionQos().getAbsoluteName();
483       if (/*senderName_*/ name != updateQos.getSender()->getAbsoluteName()) {
484          log_.error(ME, string("Wrong Sender, should be: '") + name + "' but is: '" + updateQos.getSender()->getAbsoluteName());
485          assert(0);
486       }
487       if (subscribeOid_.find(updateQos.getSubscriptionId()) == string::npos) {
488          log_.error(ME, string("engine.qos.update.subscriptionId: ")
489                     + "Wrong subscriptionId, expected=" + subscribeOid_ + " received=" + updateQos.getSubscriptionId());
490          //assert(0);
491       }
492       if (publishOid_ != updateKey.getOid()) {
493          log_.error(ME, "Wrong oid of message returned");
494          assert(0);
495       }
497       if (updateQos.getState() == Constants::STATE_OK && senderContent_ != contentStr) {
498          log_.error(ME, "Corrupted content expected '" + senderContent_ + "' size=" +
499                            lexical_cast<string>(senderContent_.size()) + " but was '" + contentStr +
500                            "' size=" + lexical_cast<string>(contentStr.size()) + " and contentSize=" +
501                            lexical_cast<string>(contentSize));
502          assert(0);
503       }
504       if (contentMime_ != updateKey.getContentMime()) {
505          log_.error(ME, "Message contentMime is corrupted");
506          assert(0);
507       }
508       if (contentMimeExtended_ != updateKey.getContentMimeExtended()) {
509          log_.error(ME, "Message contentMimeExtended is corrupted");
510          assert(0);
511       }
512       messageArrived_ = true;
514       log_.info(ME, "Success, message oid=" + updateKey.getOid() + " state=" + updateQos.getState() + " arrived as expected.");
515       return "<qos><state id='OK'/></qos>";
516    }
519    /**
520     * Little helper, waits until the variable 'messageArrive' is set
521     * to true, or returns when the given timeout occurs.
522     * @param timeout in milliseconds
523     */
524 private:
525    void waitOnUpdate(long timeout) {
526       long delay = timeout;
527       Thread::sleep(delay);
528 /*
529       util::StopWatch stopWatch(timeout);
530       while (stopWatch.isRunning()) {
531          connection_.orbPerformWork();
532          if (messageArrived_) {
533             messageArrived_ = false;
534             return;
535          }
536       }
537 */
538       log_.warn(ME, "Timeout of " + lexical_cast<string>(timeout) + " milliseconds occured");
539    }
541    void usage() const
542    {
543       TestSuite::usage();
544       log_.plain(ME, "----------------------------------------------------------");
545       log_.plain(ME, "Testing C++/CORBA access to xmlBlaster with subscribe()");
546       log_.plain(ME, "Usage:");
547       XmlBlasterAccess::usage();
548       log_.usage();
549       log_.plain(ME, "Example:");
550       log_.plain(ME, "   TestSub -bootstrapHostname myHost.myCompany.com -bootstrapPort 3412 -trace true");
551       log_.plain(ME, "----------------------------------------------------------");
552    }
553 };
555 }}} // namespace
557 using namespace org::xmlBlaster::test;
559 int main(int args, char *argc[]) 
560 {
561    try {
562       org::xmlBlaster::util::Object_Lifetime_Manager::init();
563       TestSub testSub(args, argc, "Tim");
565       testSub.setUp();
566       testSub.testPublishAfterSubscribeXPath();
567       testSub.testSubscribeSpecificCallback();
568       testSub.tearDown();
570       Thread::sleepSecs(1);
571    }
572    catch (XmlBlasterException& ex) {
573       std::cout << ex.toXml() << std::endl;
574    }
575    catch (bad_exception& ex) {
576       cout << "bad_exception: " << ex.what() << endl;
577    }
578    catch (exception& ex) {
579       cout << " exception: " << ex.what() << endl;
580    }
581    catch (string& ex) {
582       cout << "string: " << ex << endl;
583    }
584    catch (char* ex) {
585       cout << "char* :  " << ex << endl;
586    }
588    catch (...)
589    {
590       cout << "unknown exception occured" << endl;
591       XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
592       cout << e.toXml() << endl;
593    }
595    org::xmlBlaster::util::Object_Lifetime_Manager::fini();
596    return 0;
597 }

