1 /*------------------------------------------------------------------------------
  2 Name:      TestFailSafeAsync.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.client;
  7 
  8 import java.util.logging.Level;
  9 import java.util.logging.Logger;
 10 
 11 import junit.framework.TestCase;
 12 
 13 import org.xmlBlaster.client.I_Callback;
 14 import org.xmlBlaster.client.I_ConnectionStateListener;
 15 import org.xmlBlaster.client.I_XmlBlasterAccess;
 16 import org.xmlBlaster.client.key.PublishKey;
 17 import org.xmlBlaster.client.key.UpdateKey;
 18 import org.xmlBlaster.client.qos.ConnectQos;
 19 import org.xmlBlaster.client.qos.EraseQos;
 20 import org.xmlBlaster.client.qos.EraseReturnQos;
 21 import org.xmlBlaster.client.qos.PublishQos;
 22 import org.xmlBlaster.client.qos.UpdateQos;
 23 import org.xmlBlaster.test.MsgInterceptor;
 24 import org.xmlBlaster.test.Util;
 25 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 26 import org.xmlBlaster.util.Global;
 27 import org.xmlBlaster.util.MsgUnit;
 28 import org.xmlBlaster.util.XmlBlasterException;
 29 import org.xmlBlaster.util.def.Constants;
 30 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 31 import org.xmlBlaster.util.property.PropString;
 32 import org.xmlBlaster.util.qos.address.Address;
 33 
 34 
 35 /**
 36  * Tests the fail save behavior of the I_XmlBlasterAccess client helper class,
 37  * especially the asynchronous playback of messages. 
 38  * <p />
 39  * When the connection to xmlBlaster is lost, and you continue to publish messages
 40  * they are stored locally with the invocation recorder on harddisk.<br />
 41  * On reconnect they are flushed with an adjustable rate in background.<br />
 42  * If your client code decides to publish new messages during playback recovery,
 43  * your new messages will overtake some of the play back messages.
 44  * <p />
 45  * If you want guaranteed sequence, please don't send messages during playback.
 46  * <p />
 47  * Invoke examples:<br />
 48  * <pre>
 49  *   java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestFailSafeAsync
 50  *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestFailSafeAsync
 51  * </pre>
 52  */
 53 public class TestFailSafeAsync extends TestCase implements I_Callback, I_ConnectionStateListener
 54 {
 55    private static String ME = "TestFailSafeAsync";
 56    private Global glob;
 57    private static Logger log = Logger.getLogger(TestFailSafeAsync.class.getName());
 58 
 59    private int serverPort = 7604;
 60    private EmbeddedXmlBlaster serverThread;
 61 
 62    private MsgInterceptor updateInterceptor;
 63    private I_XmlBlasterAccess con;
 64    private String senderName;
 65 
 66    private int numReceived;
 67    private int numTailbackReceived;
 68    private int numNormalPublishReceived;
 69 
 70    private final String contentMime = "text/plain";
 71 
 72    private boolean reconnected;
 73 
 74    /** TEST: Sendin 0-19 directly, sending 20-39 to recorder (no connection), sending 40-100 directly */
 75    private final int maxEntries = 100;
 76    private final int failMsg = 20;
 77    private final int reconnectMsg = 40;
 78 
 79    /** publish rate msg/sec */
 80    private final long publishRate = 5;
 81    /** publish rate of tailback msg/sec */
 82    private final long pullbackRate = 1;
 83 
 84    PublishKey publishKeyWrapper;
 85    PublishQos publishQosWrapper;
 86 
 87    public TestFailSafeAsync(String testName) {
 88       this(null, testName);
 89    }
 90 
 91    public TestFailSafeAsync(Global glob, String testName) {
 92       super(testName);
 93       this.glob = glob;
 94       this.senderName = testName;
 95    }
 96 
 97    /**
 98     * Sets up the fixture.
 99     * <p />
100     * Connect to xmlBlaster and login
101     */
102    protected void setUp()
103    {
104       this.glob = (this.glob == null) ? new Global() : this.glob;
105 
106 
107       numReceived = 0;
108       numTailbackReceived = 0;
109       numNormalPublishReceived = 0;
110 
111       reconnected = false;
112 
113       glob.init(Util.getOtherServerPorts(serverPort));
114 
115       serverThread = EmbeddedXmlBlaster.startXmlBlaster(Util.getOtherServerPorts(serverPort));
116       try {
117          numReceived = 0;
118 
119          con = glob.getXmlBlasterAccess(); // Find server
120 
121          String passwd = "secret";
122          ConnectQos connectQos = new ConnectQos(glob, senderName, passwd);
123 
124          // Setup fail save handling ...
125          Address addressProp = new Address(glob);
126          addressProp.setDelay(400L);          // retry connecting every 400 milli sec
127          addressProp.setRetries(-1);          // -1 == forever
128          addressProp.setPingInterval(400L);   // ping every 400 milli second
129          con.registerConnectionListener(this);
130 
131          connectQos.setAddress(addressProp);
132          
133          this.updateInterceptor = new MsgInterceptor(this.glob, log, this); // Collect received msgs
134 
135          // and do the login ...
136          con.connect(connectQos, this.updateInterceptor); // Login to xmlBlaster
137       }
138       catch (XmlBlasterException e) {
139           log.warning("setUp() - login failed: " + e.toString());
140           fail("setUp() - login failed: " + e.toString());
141       }
142       catch (Exception e) {
143           log.severe("setUp() - login failed: " + e.toString());
144           e.printStackTrace();
145           fail("setUp() - login failed: " + e.toString());
146       }
147 
148       publishKeyWrapper = new PublishKey(glob, "emptyOid", contentMime);
149       publishKeyWrapper.setClientTags("<TestFailSafeAsync-AGENT id='192.168.124.10' subId='1' type='generic'/>");
150       /*
151          String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'>\n" +
152                          "   <TestFailSafeAsync-AGENT id='192.168.124.10' subId='1' type='generic'>" +
153                          "   </TestFailSafeAsync-AGENT>" +
154                          "</key>";
155       */
156       publishQosWrapper = new PublishQos(glob); // == "<qos></qos>"
157    }
158 
159    /**
160     * Tears down the fixture.
161     * <p />
162     * cleaning up .... erase() the previous message OID and logout
163     */
164    protected void tearDown()
165    {
166       log.info("Entering tearDown(), test is finished");
167       String xmlKey = "<key oid='' queryType='XPATH'>\n" +
168                       "   //TestFailSafeAsync-AGENT" +
169                       "</key>";
170       //String eraseQos = "<qos><notify>false</notify></qos>";
171       EraseQos eraseQos = new EraseQos(glob);
172       eraseQos.setForceDestroy(true);
173       try {
174          try {
175             EraseReturnQos[] arr = con.erase(xmlKey, eraseQos.toXml());
176 
177 
178             PropString defaultPlugin = new PropString("CACHE,1.0");
179             String propName = defaultPlugin.setFromEnv(this.glob, glob.getStrippedId(), null, "persistence", Constants.RELATING_TOPICSTORE, "defaultPlugin");
180             log.info("Lookup of propName=" + propName + " defaultValue=" + defaultPlugin.getValue());
181             
182             if (defaultPlugin.getValue().startsWith("RAM"))
183                assertEquals("Wrong number of message erased", (maxEntries-failMsg), arr.length);
184                // expect 80 to delete as the first 20 are lost when server 'crashed'
185             else
186                assertEquals("Wrong number of message erased", maxEntries, arr.length);
187          } catch(XmlBlasterException e) { assertTrue("tearDown - XmlBlasterException: " + e.getMessage(), false); }
188 
189          con.disconnect(null);
190       }
191       finally {
192          try { Thread.sleep(200L); } catch( InterruptedException i) {}    // Wait some time
193          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
194          this.serverThread = null;
195 
196          // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
197          Util.resetPorts(glob);
198          this.glob = null;
199         
200          this.con = null;
201          this.updateInterceptor = null;
202          Global.instance().shutdown();
203          publishKeyWrapper = null;
204          publishQosWrapper = null;
205       }
206    }
207 
208 
209    /**
210     * TEST: Subscribe to messages with XPATH.
211     */
212    public void subscribe()
213    {
214       if (log.isLoggable(Level.FINE)) log.fine("Subscribing using EXACT oid syntax ...");
215 
216       String xmlKey = "<key oid='' queryType='XPATH'>\n" +
217                       "   //TestFailSafeAsync-AGENT" +
218                       "</key>";
219       String qos = "<qos></qos>";
220       try {
221          String subscribeOid = con.subscribe(xmlKey, qos).getSubscriptionId();
222          log.info("Success: Subscribe on " + subscribeOid + " done");
223          assertTrue("returned null subscribeOid", subscribeOid != null);
224       } catch(XmlBlasterException e) {
225          log.warning("XmlBlasterException: " + e.getMessage());
226          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
227       }
228    }
229 
230 
231    /**
232     * Construct a message and publish it.
233     */
234    public void publish(int counter) {
235       if (log.isLoggable(Level.FINE)) log.fine("Publishing a message ...");
236 
237       long publishDelay = 1000/publishRate;  // 20 msg/sec -> send every 50 milli one
238       String oid = "MSG-" + counter;
239       try {
240          publishKeyWrapper.setOid(oid);
241          String content = "" + counter;
242          MsgUnit msgUnit = new MsgUnit(publishKeyWrapper.toXml(), content.getBytes(), publishQosWrapper.toXml());
243 
244          con.publish(msgUnit);
245          Util.delay(publishDelay);  // Wait some time
246       }
247       catch(XmlBlasterException e) {
248          fail(ME + ": Publish failed: " + e.toString());
249       }
250       log.info("Success: Publishing of " + oid + " done");
251    }
252 
253 
254    /**
255     * TEST: Sendin 0-19 directly, sending 20-39 to recorder (no connection), sending 40-100 directly
256     */
257    public void testFailSafe() {
258       // subscribe(); see reachedAlive()
259 
260       for (int ii=0; ii<maxEntries; ii++) {
261          if (ii==failMsg) {
262             EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
263             this.serverThread = null;
264             Util.delay(600L);    // Wait some time, ping should activate login polling
265             log.info("lostConnection, sending message " + ii + " - " + (reconnectMsg-1));
266          }
267 
268          if (ii==reconnectMsg) {
269             serverThread = EmbeddedXmlBlaster.startXmlBlaster(Util.getOtherServerPorts(serverPort));
270             while (true) {
271                if (reconnected == true)
272                   break;
273                Util.delay(10L); // Wait some time, to allow the login poller to reconnect
274             }
275             log.info("Reconnected, sending message " + ii + " - " + (maxEntries-1));
276          }
277 
278          publish(ii);
279       }
280 
281       int numFailsave = reconnectMsg-failMsg;  // 20
282       int numPublish = maxEntries-numFailsave;     // 80
283       long wait = 5000L + (long)((1000.0 * numPublish / publishRate) + (1000.0 * numFailsave / pullbackRate));
284       assertEquals("", maxEntries, this.updateInterceptor.waitOnUpdate(wait, maxEntries));
285       log.info("******* testFailSafe() DONE");
286    }
287 
288    public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
289    }
290    
291    public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
292       log.info("I_ConnectionStateListener: We were lucky, (re)connected to xmlBlaster");
293       subscribe();    // initialize subscription again
294       reconnected = true;
295    }
296    
297    public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
298       if (log != null) log.warning("I_ConnectionStateListener: Lost connection to xmlBlaster");
299    }
300 
301    public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
302       if (log != null) log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
303    }
304 
305    /**
306     * This is the callback method invoked from xmlBlaster
307     * delivering us a new asynchronous message. 
308     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
309     */
310    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
311       synchronized (this) {
312 
313          String oid = updateKey.getOid();
314 
315          if (updateQos.isErased()) {
316             return "";
317          }
318 
319          numReceived++;
320 
321          assertEquals("Wrong sender", senderName, updateQos.getSender().getLoginName());
322          assertEquals("Message contentMime is corrupted", contentMime, updateKey.getContentMime());
323 
324          int ii = 0;
325          try {
326             ii = Integer.parseInt(oid.substring("MSG-".length()));
327          } catch(NumberFormatException e) {
328             log.severe("Can't extract message number " + oid);
329             fail("Can't extract message number " + oid);
330          }
331 
332          if (ii >= failMsg && ii < reconnectMsg)
333             numTailbackReceived++;
334          else
335             numNormalPublishReceived++;
336 
337          // Check content
338          try {
339             int contentCounter = 0;
340             String cnt = new String(content);
341             contentCounter = Integer.parseInt(cnt);
342             assertEquals("Wrong counter in content", ii, contentCounter);
343          } catch(NumberFormatException e) {
344             log.severe("Can't extract message number '" + new String(content) + "': " + updateQos.toXml());
345          }
346 
347          log.info("Update message oid=" + oid + " numReceived=" + numReceived + ", numNormalPublishReceived=" + numNormalPublishReceived + " numTailbackReceived=" + numTailbackReceived + " ...");
348 
349          /* NOT SUPPORTED ANYMORE SINCE CLIENT SIDE QUEUE EMBEDDING (before supported by Recorder framework)
350          // Check here async behavior:
351          if (numReceived == 80) {
352             int expectedTailback = (int)((80.-reconnectMsg)*(1.*pullbackRate/publishRate));
353             int diff = Math.abs(numTailbackReceived - expectedTailback);
354 
355             if (diff > 6) {
356                String text = "Expected tailback updates = " + expectedTailback + " but got " + numTailbackReceived;
357                log.severe(text);
358                fail(text);
359             }
360             log.info("TEST SUCCESS: Expected tailback updates = " + expectedTailback + " and got " + numTailbackReceived);
361          }
362          */
363 
364       } // synchronized as we have the client as publisher and the invocation recorder as a publisher
365       
366       return "";
367    }
368 
369    /**
370     * Invoke: java org.xmlBlaster.test.client.TestFailSafeAsync
371     * @deprecated Use the TestRunner from the testsuite to run it:<p />
372     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestFailSafeAsync</pre>
373     */
374    public static void main(String args[]) {
375       Global glob = new Global();
376       if (glob.init(args) != 0) {
377          log.severe("Init failed");
378          System.exit(1);
379       }
380       TestFailSafeAsync testSub = new TestFailSafeAsync(glob, "TestFailSafeAsync");
381       testSub.setUp();
382       testSub.testFailSafe();
383       testSub.tearDown();
384    }
385 }


syntax highlighted by Code2HTML, v. 0.9.1