1 /*------------------------------------------------------------------------------
  2 Name:      TestFailSafeAsync.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.client;
  7 
  8 import java.util.logging.Logger;
  9 import java.util.logging.Level;
 10 import org.xmlBlaster.util.Global;
 11 import org.xmlBlaster.util.def.Constants;
 12 import org.xmlBlaster.util.property.PropString;
 13 import org.xmlBlaster.util.XmlBlasterException;
 14 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 15 import org.xmlBlaster.client.qos.ConnectQos;
 16 import org.xmlBlaster.client.I_XmlBlasterAccess;
 17 import org.xmlBlaster.client.key.PublishKey;
 18 import org.xmlBlaster.client.qos.PublishQos;
 19 import org.xmlBlaster.client.key.UpdateKey;
 20 import org.xmlBlaster.client.qos.UpdateQos;
 21 import org.xmlBlaster.client.qos.EraseQos;
 22 import org.xmlBlaster.client.qos.EraseReturnQos;
 23 import org.xmlBlaster.client.I_Callback;
 24 import org.xmlBlaster.client.I_ConnectionStateListener;
 25 import org.xmlBlaster.util.qos.address.Address;
 26 import org.xmlBlaster.util.MsgUnit;
 27 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 28 
 29 import org.xmlBlaster.test.Util;
 30 import org.xmlBlaster.test.MsgInterceptor;
 31 import junit.framework.*;
 32 
 33 
 34 /**
 35  * Tests the fail save behavior of the I_XmlBlasterAccess client helper class,
 36  * especially the asynchronous playback of messages. 
 37  * <p />
 38  * When the connection to xmlBlaster is lost, and you continue to publish messages
 39  * they are stored locally with the invocation recorder on harddisk.<br />
 40  * On reconnect they are flushed with an adjustable rate in background.<br />
 41  * If your client code decides to publish new messages during playback recovery,
 42  * your new messages will overtake some of the play back messages.
 43  * <p />
 44  * If you want guaranteed sequence, please don't send messages during playback.
 45  * <p />
 46  * Invoke examples:<br />
 47  * <pre>
 48  *   java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestFailSafeAsync
 49  *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestFailSafeAsync
 50  * </pre>
 51  */
 52 public class TestFailSafeAsync extends TestCase implements I_Callback, I_ConnectionStateListener
 53 {
 54    private static String ME = "TestFailSafeAsync";
 55    private Global glob;
 56    private static Logger log = Logger.getLogger(TestFailSafeAsync.class.getName());
 57    private boolean messageArrived = false;
 58 
 59    private int serverPort = 7604;
 60    private EmbeddedXmlBlaster serverThread;
 61 
 62    private MsgInterceptor updateInterceptor;
 63    private I_XmlBlasterAccess con;
 64    private String senderName;
 65 
 66    private int numReceived;
 67    private int numTailbackReceived;
 68    private int numNormalPublishReceived;
 69 
 70    private final String contentMime = "text/plain";
 71 
 72    private boolean reconnected;
 73    private boolean allTailbackAreFlushed;
 74 
 75    /** TEST: Sendin 0-19 directly, sending 20-39 to recorder (no connection), sending 40-100 directly */
 76    private final int maxEntries = 100;
 77    private final int failMsg = 20;
 78    private final int reconnectMsg = 40;
 79 
 80    /** publish rate msg/sec */
 81    private final long publishRate = 5;
 82    /** publish rate of tailback msg/sec */
 83    private final long pullbackRate = 1;
 84 
 85    PublishKey publishKeyWrapper;
 86    PublishQos publishQosWrapper;
 87 
 88    public TestFailSafeAsync(String testName) {
 89       this(null, testName);
 90    }
 91 
 92    public TestFailSafeAsync(Global glob, String testName) {
 93       super(testName);
 94       this.glob = glob;
 95       this.senderName = testName;
 96    }
 97 
 98    /**
 99     * Sets up the fixture.
100     * <p />
101     * Connect to xmlBlaster and login
102     */
103    protected void setUp()
104    {
105       this.glob = (this.glob == null) ? new Global() : this.glob;
106 
107 
108       numReceived = 0;
109       numTailbackReceived = 0;
110       numNormalPublishReceived = 0;
111 
112       reconnected = false;
113       allTailbackAreFlushed = true;
114 
115       glob.init(Util.getOtherServerPorts(serverPort));
116 
117       serverThread = EmbeddedXmlBlaster.startXmlBlaster(Util.getOtherServerPorts(serverPort));
118       try {
119          numReceived = 0;
120 
121          con = glob.getXmlBlasterAccess(); // Find server
122 
123          String passwd = "secret";
124          ConnectQos connectQos = new ConnectQos(glob, senderName, passwd);
125 
126          // Setup fail save handling ...
127          Address addressProp = new Address(glob);
128          addressProp.setDelay(400L);          // retry connecting every 400 milli sec
129          addressProp.setRetries(-1);          // -1 == forever
130          addressProp.setPingInterval(400L);   // ping every 400 milli second
131          con.registerConnectionListener(this);
132 
133          connectQos.setAddress(addressProp);
134          
135          this.updateInterceptor = new MsgInterceptor(this.glob, log, this); // Collect received msgs
136 
137          // and do the login ...
138          con.connect(connectQos, this.updateInterceptor); // Login to xmlBlaster
139       }
140       catch (XmlBlasterException e) {
141           log.warning("setUp() - login failed: " + e.toString());
142           fail("setUp() - login failed: " + e.toString());
143       }
144       catch (Exception e) {
145           log.severe("setUp() - login failed: " + e.toString());
146           e.printStackTrace();
147           fail("setUp() - login failed: " + e.toString());
148       }
149 
150       publishKeyWrapper = new PublishKey(glob, "emptyOid", contentMime);
151       publishKeyWrapper.setClientTags("<TestFailSafeAsync-AGENT id='192.168.124.10' subId='1' type='generic'/>");
152       /*
153          String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'>\n" +
154                          "   <TestFailSafeAsync-AGENT id='192.168.124.10' subId='1' type='generic'>" +
155                          "   </TestFailSafeAsync-AGENT>" +
156                          "</key>";
157       */
158       publishQosWrapper = new PublishQos(glob); // == "<qos></qos>"
159    }
160 
161    /**
162     * Tears down the fixture.
163     * <p />
164     * cleaning up .... erase() the previous message OID and logout
165     */
166    protected void tearDown()
167    {
168       log.info("Entering tearDown(), test is finished");
169       String xmlKey = "<key oid='' queryType='XPATH'>\n" +
170                       "   //TestFailSafeAsync-AGENT" +
171                       "</key>";
172       //String eraseQos = "<qos><notify>false</notify></qos>";
173       EraseQos eraseQos = new EraseQos(glob);
174       eraseQos.setForceDestroy(true);
175       try {
176          try {
177             EraseReturnQos[] arr = con.erase(xmlKey, eraseQos.toXml());
178 
179 
180             PropString defaultPlugin = new PropString("CACHE,1.0");
181             String propName = defaultPlugin.setFromEnv(this.glob, glob.getStrippedId(), null, "persistence", Constants.RELATING_TOPICSTORE, "defaultPlugin");
182             log.info("Lookup of propName=" + propName + " defaultValue=" + defaultPlugin.getValue());
183             
184             if (defaultPlugin.getValue().startsWith("RAM"))
185                assertEquals("Wrong number of message erased", (maxEntries-failMsg), arr.length);
186                // expect 80 to delete as the first 20 are lost when server 'crashed'
187             else
188                assertEquals("Wrong number of message erased", maxEntries, arr.length);
189          } catch(XmlBlasterException e) { assertTrue("tearDown - XmlBlasterException: " + e.getMessage(), false); }
190 
191          con.disconnect(null);
192       }
193       finally {
194          try { Thread.sleep(200L); } catch( InterruptedException i) {}    // Wait some time
195          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
196          this.serverThread = null;
197 
198          // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
199          Util.resetPorts(glob);
200          this.glob = null;
201         
202          this.con = null;
203          this.updateInterceptor = null;
204          Global.instance().shutdown();
205          publishKeyWrapper = null;
206          publishQosWrapper = null;
207       }
208    }
209 
210 
211    /**
212     * TEST: Subscribe to messages with XPATH.
213     */
214    public void subscribe()
215    {
216       if (log.isLoggable(Level.FINE)) log.fine("Subscribing using EXACT oid syntax ...");
217 
218       String xmlKey = "<key oid='' queryType='XPATH'>\n" +
219                       "   //TestFailSafeAsync-AGENT" +
220                       "</key>";
221       String qos = "<qos></qos>";
222       try {
223          String subscribeOid = con.subscribe(xmlKey, qos).getSubscriptionId();
224          log.info("Success: Subscribe on " + subscribeOid + " done");
225          assertTrue("returned null subscribeOid", subscribeOid != null);
226       } catch(XmlBlasterException e) {
227          log.warning("XmlBlasterException: " + e.getMessage());
228          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
229       }
230    }
231 
232 
233    /**
234     * Construct a message and publish it.
235     */
236    public void publish(int counter) {
237       if (log.isLoggable(Level.FINE)) log.fine("Publishing a message ...");
238 
239       long publishDelay = 1000/publishRate;  // 20 msg/sec -> send every 50 milli one
240       String oid = "MSG-" + counter;
241       try {
242          publishKeyWrapper.setOid(oid);
243          String content = "" + counter;
244          MsgUnit msgUnit = new MsgUnit(publishKeyWrapper.toXml(), content.getBytes(), publishQosWrapper.toXml());
245 
246          con.publish(msgUnit);
247          Util.delay(publishDelay);  // Wait some time
248       }
249       catch(XmlBlasterException e) {
250          fail(ME + ": Publish failed: " + e.toString());
251       }
252       log.info("Success: Publishing of " + oid + " done");
253    }
254 
255 
256    /**
257     * TEST: Sendin 0-19 directly, sending 20-39 to recorder (no connection), sending 40-100 directly
258     */
259    public void testFailSafe() {
260       // subscribe(); see reachedAlive()
261 
262       for (int ii=0; ii<maxEntries; ii++) {
263          if (ii==failMsg) {
264             EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
265             this.serverThread = null;
266             Util.delay(600L);    // Wait some time, ping should activate login polling
267             log.info("lostConnection, sending message " + ii + " - " + (reconnectMsg-1));
268          }
269 
270          if (ii==reconnectMsg) {
271             serverThread = EmbeddedXmlBlaster.startXmlBlaster(Util.getOtherServerPorts(serverPort));
272             while (true) {
273                if (reconnected == true)
274                   break;
275                Util.delay(10L); // Wait some time, to allow the login poller to reconnect
276             }
277             log.info("Reconnected, sending message " + ii + " - " + (maxEntries-1));
278          }
279 
280          publish(ii);
281       }
282 
283       int numFailsave = reconnectMsg-failMsg;  // 20
284       int numPublish = maxEntries-numFailsave;     // 80
285       long wait = 5000L + (long)((1000.0 * numPublish / publishRate) + (1000.0 * numFailsave / pullbackRate));
286       assertEquals("", maxEntries, this.updateInterceptor.waitOnUpdate(wait, maxEntries));
287       log.info("******* testFailSafe() DONE");
288    }
289 
290    public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
291       log.info("I_ConnectionStateListener: We were lucky, (re)connected to xmlBlaster");
292       subscribe();    // initialize subscription again
293       reconnected = true;
294       allTailbackAreFlushed = true;
295    }
296 
297    public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
298       if (log != null) log.warning("I_ConnectionStateListener: Lost connection to xmlBlaster");
299       allTailbackAreFlushed = false;
300    }
301 
302    public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
303       if (log != null) log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
304    }
305 
306    /**
307     * This is the callback method invoked from xmlBlaster
308     * delivering us a new asynchronous message. 
309     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
310     */
311    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
312       synchronized (this) {
313 
314          String oid = updateKey.getOid();
315 
316          if (updateQos.isErased()) {
317             return "";
318          }
319 
320          numReceived++;
321 
322          assertEquals("Wrong sender", senderName, updateQos.getSender().getLoginName());
323          assertEquals("Message contentMime is corrupted", contentMime, updateKey.getContentMime());
324 
325          int ii = 0;
326          try {
327             ii = Integer.parseInt(oid.substring("MSG-".length()));
328          } catch(NumberFormatException e) {
329             log.severe("Can't extract message number " + oid);
330             fail("Can't extract message number " + oid);
331          }
332 
333          if (ii >= failMsg && ii < reconnectMsg)
334             numTailbackReceived++;
335          else
336             numNormalPublishReceived++;
337 
338          // Check content
339          try {
340             int contentCounter = 0;
341             String cnt = new String(content);
342             contentCounter = Integer.parseInt(cnt);
343             assertEquals("Wrong counter in content", ii, contentCounter);
344          } catch(NumberFormatException e) {
345             log.severe("Can't extract message number '" + new String(content) + "': " + updateQos.toXml());
346          }
347 
348          log.info("Update message oid=" + oid + " numReceived=" + numReceived + ", numNormalPublishReceived=" + numNormalPublishReceived + " numTailbackReceived=" + numTailbackReceived + " ...");
349 
350          /* NOT SUPPORTED ANYMORE SINCE CLIENT SIDE QUEUE EMBEDDING (before supported by Recorder framework)
351          // Check here async behavior:
352          if (numReceived == 80) {
353             int expectedTailback = (int)((80.-reconnectMsg)*(1.*pullbackRate/publishRate));
354             int diff = Math.abs(numTailbackReceived - expectedTailback);
355 
356             if (diff > 6) {
357                String text = "Expected tailback updates = " + expectedTailback + " but got " + numTailbackReceived;
358                log.severe(text);
359                fail(text);
360             }
361             log.info("TEST SUCCESS: Expected tailback updates = " + expectedTailback + " and got " + numTailbackReceived);
362          }
363          */
364 
365          messageArrived = true;
366       
367       } // synchronized as we have the client as publisher and the invocation recorder as a publisher
368       
369       return "";
370    }
371 
372    /**
373     * Invoke: java org.xmlBlaster.test.client.TestFailSafeAsync
374     * @deprecated Use the TestRunner from the testsuite to run it:<p />
375     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestFailSafeAsync</pre>
376     */
377    public static void main(String args[]) {
378       Global glob = new Global();
379       if (glob.init(args) != 0) {
380          log.severe("Init failed");
381          System.exit(1);
382       }
383       TestFailSafeAsync testSub = new TestFailSafeAsync(glob, "TestFailSafeAsync");
384       testSub.setUp();
385       testSub.testFailSafe();
386       testSub.tearDown();
387    }
388 }


syntax highlighted by Code2HTML, v. 0.9.1