1 /*-----------------------------------------------------------------------------
  2 Name:      TestFailsafe.cpp
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Testing the Timeout Features
  6 -----------------------------------------------------------------------------*/
  7 #include "TestSuite.h"
  8 #include <iostream>
  9 
 10 namespace org { namespace xmlBlaster { namespace test {
 11 
 12 using namespace std;
 13 using namespace org::xmlBlaster::util;
 14 using namespace org::xmlBlaster::util::qos;
 15 using namespace org::xmlBlaster::util::dispatch;
 16 using namespace org::xmlBlaster::util::thread;
 17 using namespace org::xmlBlaster::util::qos::address;
 18 using namespace org::xmlBlaster::client;
 19 using namespace org::xmlBlaster::client::qos;
 20 using namespace org::xmlBlaster::client::key;
 21 
 22 class TestFailsafe : public virtual I_Callback, public virtual I_ConnectionProblems, public TestSuite
 23 {
 24 private:
 25    ConnectQos       *connQos_;
 26    ConnectReturnQos *connRetQos_;
 27    SubscribeQos     *subQos_;
 28    SubscribeKey     *subKey_;
 29    PublishQos       *pubQos_;
 30    PublishKey       *pubKey_;
 31    Mutex            updateMutex_;
 32    bool             isConnected_;
 33    int              numOfUpdates_;
 34    bool useSessionMarker_;  // Remove again at version 2.0
 35 
 36 public:
 37    TestFailsafe(int args, char ** argv) 
 38       : TestSuite(args, argv, "TestFailsafe"),
 39         updateMutex_()
 40    {
 41       connQos_        = 0;
 42       connRetQos_     = 0;
 43       subQos_         = 0;
 44       subKey_         = 0;
 45       pubQos_         = 0;
 46       pubKey_         = 0;
 47       isConnected_    = false;
 48       numOfUpdates_   = 0;
 49 
 50       SessionName sn(global_, "client/dummy");
 51       useSessionMarker_ = sn.useSessionMarker();
 52    }
 53 
 54 
 55    virtual ~TestFailsafe()
 56    {
 57       if (log_.call()) log_.call(ME, "destructor");
 58       delete connQos_;
 59       delete connRetQos_;
 60       delete subQos_;
 61       delete subKey_;
 62       delete pubQos_;
 63       delete pubKey_;
 64       if (log_.trace()) log_.trace(ME, "destructor ended");
 65    }
 66 
 67    bool reachedAlive(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
 68    {
 69       log_.info(ME, "reconnected");
 70       isConnected_ = true;
 71       return true;
 72    }
 73 
 74    void reachedDead(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
 75    {
 76       log_.info(ME, "lost connection");
 77       isConnected_ = false;
 78    }
 79 
 80    void reachedPolling(StatesEnum /*oldState*/, I_ConnectionsHandler* /*connectionsHandler*/)
 81    {
 82       log_.info(ME, "going to poll modus");
 83       isConnected_ = false;
 84    }
 85 
 86    AddressBaseRef getAddress() {
 87       AddressBaseRef address = new Address(global_);
 88       address->setDelay(1000);
 89       address->setPingInterval(1000);
 90       return address;
 91    }
 92 
 93    void setUp()
 94    {
 95       TestSuite::setUp();
 96       try {   
 97          connection_.initFailsafe(this);
 98 
 99          connQos_ = new ConnectQos(global_, "guy", "secret");
100          connQos_->setAddress(getAddress());
101          log_.info(ME, string("connecting to xmlBlaster. Connect qos: ") + connQos_->toXml());
102          // Login to xmlBlaster, register for updates
103          connRetQos_ = new ConnectReturnQos(connection_.connect(*connQos_, this));  
104          log_.info(ME, "successfully connected to xmlBlaster. Return qos: " + connRetQos_->toXml());
105 
106          subKey_ = new SubscribeKey(global_);
107          subKey_->setOid("TestFailsafe");
108          subQos_ = new SubscribeQos(global_);
109          log_.info(ME, string("subscribing to xmlBlaster with key: ") + subKey_->toXml() + " and qos: " + subQos_->toXml());
110 
111          SubscribeReturnQos subRetQos = connection_.subscribe(*subKey_, *subQos_);
112          log_.info(ME, string("successfully subscribed to xmlBlaster. Return qos: ") + subRetQos.toXml());
113       }
114       catch (XmlBlasterException& ex) {
115          log_.error(ME, string("exception occurred in setUp. ") + ex.toXml());
116          assert(0);
117       }
118    }
119 
120 
121    /**
122     * This test does the following:
123     * - tears down , i.e. it erases the message 'TestFailsafe' and disconnects.
124     * - shuts down the server if embedded, otherwise waits you to shutdown for 20 s.
125     * - tries to reconnect (and should fail since the server is not connected and the session id is negative)
126     * - 
127     */
128    void testReconnect()
129    {
130       log_.info(ME, "testReconnect START");
131       tearDown();
132       // DisconnectQos disconnectQos(global_);
133       // connection_.disconnect(disconnectQos);
134       Thread::sleep(500);
135       if (useEmbeddedServer_) {
136          stopEmbeddedServer();
137          Thread::sleepSecs(2);
138       }
139       else {
140          waitOnKeyboardHit("Please stop the server now and hit 'c' to continue >> ");
141          //log_.info(ME, "please stop the server now (I will wait 20 s)");
142          //Thread::sleepSecs(20);
143       }
144       log_.info(ME, "the communication is now down: ready to start the tests");
145       ConnectQos connQos(global_);
146       connQos.setAddress(getAddress());
147       SessionQos sessionQos(global_,"client/Fritz/-2");
148       connQos.setSessionQos(sessionQos);
149       bool wentInException = false;
150 
151       try {
152          connection_.connect(connQos, this);
153       }
154       catch (XmlBlasterException &ex) {
155          log_.info(ME, "Exception is wanted: " + ex.toString());
156          wentInException = true;
157       }   
158       assertEquals(log_, ME, true, wentInException, "reconnecting when communication down and not giving positive publicSessionId: exception must be thrown");
159 
160       sessionQos = SessionQos(global_,"client/Fritz/-1");
161       connQos.setSessionQos(sessionQos);
162       wentInException = false;
163       try {
164          connection_.connect(connQos, this);
165       }
166       catch (XmlBlasterException &ex) {
167          log_.info(ME, "Exception is wanted: " + ex.toString());
168          wentInException = true;
169       }   
170       assertEquals(log_, ME, true, wentInException, "reconnecting for the second time when communication down and not giving positive publicSessionId: exception must be thrown (again)");
171 
172       log_.info(ME, "TESTING FAIL SAFE ...");
173       sessionQos = SessionQos(global_,"client/Fritz/7");
174       connQos.setSessionQos(sessionQos);
175       wentInException = false;
176       try {
177          ConnectReturnQos retQos = connection_.connect(connQos, this);
178          string name = retQos.getSessionQos().getRelativeName();
179          if (useSessionMarker_)
180             assertEquals(log_, ME, string("client/Fritz/session/7"), name, "checking that return qos has the correct sessionId");
181          else
182             assertEquals(log_, ME, string("client/Fritz/7"), name, "checking that return qos has the correct sessionId");
183       }
184       catch (XmlBlasterException &ex) {
185          log_.error(ME, ex.toXml());
186          wentInException = true;
187       }   
188       assertEquals(log_, ME, false, wentInException, "reconnecting when communication down and giving positive publicSessionId: no exception expected");
189 
190       sessionQos = SessionQos(global_,"client/Fritz/2");
191       connQos.setSessionQos(sessionQos);
192       wentInException = false;
193       try {
194          connection_.connect(connQos, this);
195       }
196       catch (XmlBlasterException &/*ex*/) {
197          wentInException = true;
198       }   
199       assertEquals(log_, ME, false, wentInException, "reconnecting second time when communication down and giving positive publicSessionId: no exception expected but a warning should have come");
200 
201 
202       DisconnectQos discQos(global_);
203       wentInException = false;
204       try {
205          connection_.disconnect(discQos);
206       }
207       catch (XmlBlasterException &/*ex*/) {
208          wentInException = true;
209       }   
210       assertEquals(log_, ME, true, wentInException, "disconnecting when no communication should give an exception");
211 
212       // and now we are reconnecting ...
213       if (useEmbeddedServer_) {
214          startEmbeddedServer();
215          Thread::sleepSecs(1);
216       }
217       else {
218          for (int i=0; i < 30; i++) {
219             if (isConnected_) break;
220             log_.info(ME, "please restart the server now");
221             Thread::sleepSecs(2);
222             if (connection_.isAlive()) {
223                break;
224             }
225          }
226       }
227 
228       // making  a subscription now should work ...
229       SubscribeKey subKey(global_);
230       subKey.setOid("TestReconnect");
231       SubscribeQos subQos(global_);
232       wentInException = false;
233       try {
234          connection_.subscribe(subKey, subQos);
235       }
236       catch (XmlBlasterException &ex) {
237          wentInException = true;
238          log_.info(ME, string("exception when subscribing: ") + ex.toXml());
239       }   
240       assertEquals(log_, ME, false, wentInException, "subscribing when communication should not give an exception");
241 
242       log_.info(ME, "disconnecting now the newly established connection");
243       connection_.disconnect(DisconnectQos(global_));
244       log_.info(ME, "going to call setUp to reestablish the initial setup");
245 
246       setUp();
247 
248       // publishing something to make it happy
249       PublishQos pubQos(global_);
250       PublishKey pubKey(global_);
251       pubKey.setOid("TestFailsafe");
252 
253       string msg = "dummy";
254       MessageUnit msgUnit(pubKey, msg, pubQos);
255       connection_.publish(msgUnit);
256 
257       log_.info(ME, "testReconnect END");
258    }
259 
260 
261    void testFailsafe() 
262    {
263       int imax = 30;
264       try {
265          pubQos_ = new PublishQos(global_);
266          pubKey_ = new PublishKey(global_);
267          pubKey_->setOid("TestFailsafe");
268 
269          for (int i=0; i < imax; i++) {
270             string msg = lexical_cast<string>(i);
271             MessageUnit msgUnit(*pubKey_, msg, *pubQos_);
272             log_.info(ME, string("publishing msg '") + msg + "'");
273             /*PublishReturnQos pubRetQos =*/ connection_.publish(msgUnit);
274 
275             if (i == 2) stopEmbeddedServer();
276             if (i == 12) startEmbeddedServer();
277             try {
278                Thread::sleepSecs(1);
279             }
280             catch(XmlBlasterException e) {
281                cout << e.toXml() << endl;
282             }
283 
284          }
285       }
286       catch (XmlBlasterException& ex) {
287          log_.error(ME, string("exception occurred in testFailSafe. ") + ex.toXml());
288          assert(0);
289       }
290 
291       int i = 0;
292       while (numOfUpdates_ < (imax-1) && i < 100) {
293          i++;
294          Thread::sleep(100);
295       }
296 
297 
298    }
299 
300 
301    void tearDown()
302    {
303       try {
304          EraseKey eraseKey(global_);
305          eraseKey.setOid("TestFailsafe");
306          EraseQos eraseQos(global_);
307          log_.info(ME, string("erasing the published message. Key: ") + eraseKey.toXml() + " qos: " + eraseQos.toXml());
308          vector<EraseReturnQos> eraseRetQos = connection_.erase(eraseKey, eraseQos);
309          for (size_t i=0; i < eraseRetQos.size(); i++ ) {
310             log_.info(ME, string("successfully erased the message. return qos: ") + eraseRetQos[i].toXml());
311          }
312 
313          // log_.info(ME, "going to sleep for one minute");
314          // org::xmlBlaster::util::thread::Thread::sleep(60000);
315 
316          DisconnectQos disconnectQos(global_);
317          connection_.disconnect(disconnectQos);
318       }
319       catch (XmlBlasterException& ex) {
320          log_.error(ME, string("exception occurred in tearDown. ") + ex.toXml());
321          assert(0);
322       }
323 
324       delete connQos_; connQos_ = 0;
325       delete subQos_; subQos_ = 0;
326       delete subKey_; subKey_ = 0;
327       delete connRetQos_; connRetQos_ = 0;
328       delete pubQos_; pubQos_ = 0;
329       delete pubKey_; pubKey_ = 0;
330 
331       TestSuite::tearDown();
332    }
333 
334    string update(const string& sessionId, UpdateKey& updateKey, const unsigned char *content, long contentSize, UpdateQos& updateQos)
335    {
336       Lock lock(updateMutex_);
337       if (log_.trace()) log_.trace(ME, "update: session: " + sessionId);
338       if (log_.trace()) log_.trace(ME, "update: key    : " + updateKey.toXml());
339       if (log_.trace()) log_.trace(ME, "update: qos    : " + updateQos.toXml());
340       string help((char*)content, (char*)(content)+contentSize);
341       if (log_.trace()) log_.trace(ME, "update: content: " + help);
342       if (updateQos.getState() == "ERASED" ) return "";
343 
344       int count = atoi(help.c_str());
345       assertEquals(log_, ME, numOfUpdates_, count, string("update check ") + help);
346       numOfUpdates_++;
347       return "";
348    }
349 
350 };
351 
352 }}}
353 
354 
355 using namespace org::xmlBlaster::test;
356 
357 /**
358  * Try
359  * <pre>
360  *   java TestFailsafe -help
361  * </pre>
362  * for usage help
363  *
364  * To disable the embedded server add -embeddedServer false
365  */
366 int main(int args, char ** argv)
367 {
368    TestFailsafe *testFailsafe = 0;
369    try {
370       org::xmlBlaster::util::Object_Lifetime_Manager::init();
371       testFailsafe = new TestFailsafe(args, argv);
372       testFailsafe->setUp();
373       testFailsafe->testReconnect();
374       
375       // testFailsafe.testFailsafe();
376       testFailsafe->tearDown();
377       delete testFailsafe;
378       testFailsafe = 0; 
379       org::xmlBlaster::util::Object_Lifetime_Manager::fini();
380    }
381    catch (XmlBlasterException& ex) {
382       std::cout << ex.toXml() << std::endl;
383    }
384    catch (bad_exception& ex) {
385       cout << "bad_exception: " << ex.what() << endl;
386    }
387    catch (exception& ex) {
388       cout << " exception: " << ex.what() << endl;
389    }
390    catch (string& ex) {
391       cout << "string: " << ex << endl;
392    }
393    catch (char* ex) {
394       cout << "char* :  " << ex << endl;
395    }
396 
397    catch (...)
398    {
399       cout << "unknown exception occured" << endl;
400       XmlBlasterException e(INTERNAL_UNKNOWN, "main", "main thread");
401       cout << e.toXml() << endl;
402    }
403 
404    return 0;
405 }


syntax highlighted by Code2HTML, v. 0.9.1