1 /*------------------------------------------------------------------------------
  2 Name:      TestPtPPersistent.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.client;
  7 
  8 import java.util.logging.Logger;
  9 import java.util.logging.Level;
 10 import org.xmlBlaster.util.Global;
 11 import org.xmlBlaster.util.SessionName;
 12 import org.xmlBlaster.client.key.EraseKey;
 13 import org.xmlBlaster.client.key.GetKey;
 14 import org.xmlBlaster.client.key.PublishKey;
 15 import org.xmlBlaster.client.qos.ConnectQos;
 16 import org.xmlBlaster.client.qos.EraseQos;
 17 import org.xmlBlaster.client.qos.GetQos;
 18 import org.xmlBlaster.util.XmlBlasterException;
 19 import org.xmlBlaster.util.def.Constants;
 20 import org.xmlBlaster.util.property.PropString;
 21 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 22 import org.xmlBlaster.client.qos.PublishQos;
 23 import org.xmlBlaster.client.I_XmlBlasterAccess;
 24 import org.xmlBlaster.util.qos.address.Address;
 25 import org.xmlBlaster.util.qos.address.Destination;
 26 import org.xmlBlaster.util.MsgUnit;
 27 
 28 import org.xmlBlaster.test.Msg;
 29 import org.xmlBlaster.test.Util;
 30 import org.xmlBlaster.test.MsgInterceptor;
 31 import org.xmlBlaster.test.util.PtPDestination;
 32 
 33 import junit.framework.*;
 34 
 35 
 36 /**
 37  * Tests the sending of persistent PtP messages to a session
 38  * while resourcea are critical (swapping of all queues and callback
 39  * queue overflow) when both the server and client crash.
 40  *
 41  * Invoke examples:<br />
 42  * <pre>
 43  *   java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestPtPPersistent
 44  *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestPtPPersistent
 45  * </pre>
 46  * @see org.xmlBlaster.client.I_XmlBlasterAccess
 47  */
 48 public class TestPtPPersistent extends TestCase  {
 49    private static String ME = "TestPtPPersistent";
 50    private static final long PUB_DELAY=250L;
 51    private Global glob;
 52    private static Logger log = Logger.getLogger(TestPtPPersistent.class.getName());
 53 
 54    private int serverPort = 7674;
 55    private EmbeddedXmlBlaster serverThread;
 56 
 57    private MsgInterceptor updateInterceptor;
 58    private String senderName;
 59 
 60    private final long reconnectDelay = 500L;
 61    private PtPDestination destination;
 62 
 63    public TestPtPPersistent(String testName) {
 64       this(null, testName);
 65    }
 66 
 67    public TestPtPPersistent(Global glob, String testName) {
 68       super(testName);
 69       this.glob = glob;
 70       this.senderName = testName;
 71    }
 72 
 73    /**
 74     * Sets up the fixture.
 75     * <p />
 76     * Connect to xmlBlaster and login
 77     */
 78    protected void setUp() {
 79       this.glob = (this.glob == null) ? Global.instance() : this.glob;
 80 
 81       glob.init(Util.getOtherServerPorts(serverPort));
 82 
 83       serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
 84       log.info("XmlBlaster is ready for testing on bootstrapPort " + serverPort);
 85       try {
 86          I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess(); // Find orb
 87          String passwd = "secret";
 88          ConnectQos connectQos = new ConnectQos(glob, senderName, passwd); // == "<qos>...</qos>";
 89          // Setup fail save handling for connection ...
 90          Address addressProp = new Address(glob);
 91          addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
 92          addressProp.setRetries(-1);       // -1 == forever
 93          addressProp.setPingInterval(500L); // switched off
 94          connectQos.setAddress(addressProp);
 95          con.connect(connectQos, null);  // Login to xmlBlaster, register for updates
 96       }
 97       catch (XmlBlasterException e) {
 98           log.warning("setUp() - login failed: " + e.getMessage());
 99           fail("setUp() - login fail: " + e.getMessage());
100       }
101       catch (Exception e) {
102           log.severe("setUp() - login failed: " + e.toString());
103           e.printStackTrace();
104           fail("setUp() - login fail: " + e.toString());
105       }
106    }
107 
108    /**
109     * Tears down the fixture.
110     * <p />
111     * cleaning up .... erase() the previous message OID and logout
112     */
113    protected void tearDown() {
114       I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
115       try {
116          log.info("Entering tearDown(), test is finished");
117          PropString defaultPlugin = new PropString("CACHE,1.0");
118          String propName = defaultPlugin.setFromEnv(this.glob, glob.getStrippedId(), null, "persistence", Constants.RELATING_TOPICSTORE, "defaultPlugin");
119          log.info("Lookup of propName=" + propName + " defaultValue=" + defaultPlugin.getValue());
120          EraseKey eraseKey = new EraseKey(this.glob, "//airport", "XPATH");      
121          EraseQos eraseQos = new EraseQos(this.glob);
122          con.erase(eraseKey, eraseQos);
123       }
124       catch (XmlBlasterException ex) {
125          ex.printStackTrace();
126       }
127       finally {
128          con.disconnect(null);
129          try {
130             Thread.sleep(1000L);
131          }
132          catch (Exception ex) {
133          }
134          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
135          this.serverThread = null;
136          // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
137          Util.resetPorts(glob);
138          this.glob = null;
139          con = null;
140          Global.instance().shutdown();
141       }
142    }
143 
144    /**
145     * TEST: Construct a message and publish it.
146     * 
147     * <p />
148     */
149    public void doPublish(int counter, String oid, boolean doGc, long sleep) throws XmlBlasterException {
150       String content = "" + counter;
151       log.info("Publishing message " + content);
152 
153       PublishKey key = new PublishKey(this.glob);
154       if (oid != null) key.setOid(oid);
155       key.setClientTags("<airport/>");
156       PublishQos qos = new PublishQos(glob);
157       qos.setPersistent(true);
158       qos.setVolatile(true);
159       qos.addDestination(new Destination(new SessionName(this.glob, "joe")));
160       MsgUnit msgUnit = new MsgUnit(key, content, qos);
161 
162       this.glob.getXmlBlasterAccess().publish(msgUnit);
163       if (doGc) Util.gc(2);
164       try {
165          Thread.sleep(sleep);
166       }
167       catch (Exception ex) {
168       }
169       log.info("Success: Publishing of " + content + " done");
170    }
171 
172    public void testPersistentPtPOneOidWithGc() {
173       persistentPtP("persistentPtP", true);
174    }
175 
176    public void testPersistentPtPOneOidNoGc() {
177       persistentPtP("persistentPtP", false);
178    }
179 
180    public void testPersistentPtPNoOidWithGc() {
181       persistentPtP(null, true);
182    }
183 
184    public void testPersistentPtPNoOidNoGc() {
185       persistentPtP(null, false);
186    }
187    /**
188     * TEST: <br />
189     * Sets up a PtP destination (a subject)
190     * 
191     */
192    public void persistentPtP(String oid, boolean doGc) {
193       long cbMaxEntries = 3;
194       long cbMaxEntriesCache = 2;
195       long subjMaxEntries = 3;
196       long subjMaxEntriesCache = 2;
197       
198       long exLimit = cbMaxEntries + subjMaxEntries + 2;
199 
200       
201       this.destination = new PtPDestination(this.glob, "joe/1");
202       /** wants PtP messages and does not shutdown */
203       boolean wantsPtP = true;
204       boolean shutdownCB = false;
205       try {
206          this.destination.init(wantsPtP, shutdownCB, cbMaxEntries, cbMaxEntriesCache, subjMaxEntries, subjMaxEntriesCache);
207       }
208       catch (XmlBlasterException ex) {
209          assertTrue("an exception while initing the destination should not occur " + ex.getMessage(), false);               
210       }
211       
212       for (int i=0; i < exLimit; i++) {
213          try {
214             doPublish(i, oid, doGc, PUB_DELAY);
215          }
216          catch (Exception ex) {
217             assertTrue("an exception on publish '" + i + "' should not occur " + ex.getMessage(), false);
218          }
219       }
220          
221       int ret = this.destination.getUpdateInterceptor().waitOnUpdate(3000L*exLimit, (int)exLimit);
222       assertEquals("wrong number of entries arrived", (int)exLimit, ret);
223       // wait half a second more to see if more messages come
224       ret = this.destination.getUpdateInterceptor().waitOnUpdate(500L, (int)exLimit+1);
225       assertEquals("wrong number of entries arrived", (int)exLimit, ret);
226       
227       Msg[] msg = this.destination.getUpdateInterceptor().getMsgs();
228       assertEquals("wrong number of messages", exLimit, msg.length);
229       for (int i=0; i < exLimit; i++) {
230          assertEquals("wrong message sequence at ", i, msg[i].getContentInt());
231       }
232       this.destination.getUpdateInterceptor().clear();      
233 
234       // now stop the receiver by shutting down its cbServer and fill cbQueue and subjQueue
235       this.destination.getConnection().leaveServer(null);
236 
237       for (long i=exLimit; i < 2 * exLimit; i++) {
238          try {
239             doPublish((int)i, oid, doGc, PUB_DELAY);
240          }
241          catch (XmlBlasterException ex) {
242             assertTrue("an exception on publish '" + i + "' should not occur " + ex.getMessage(), false);
243          }
244       }
245       this.destination.check(250L, 0);
246       
247       for (long i=2*exLimit; i < 2*exLimit + 2; i++) {
248          try {
249             doPublish((int)i, oid, doGc, PUB_DELAY);
250             assertTrue("an exception on publish '" + i + "' should have occurred ", false);
251          }
252          catch (XmlBlasterException ex) {
253             log.info("this is an allowed exception since queues are overflown");
254          }
255       }
256 
257       this.destination.check(250L, 0);
258 
259       // stop and restart the server
260       EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
261       this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(Util.getOtherServerPorts(this.serverPort));
262 
263       // reconnect to server (for the destination, the publisher never left) 
264       this.destination = new PtPDestination(this.glob, "joe/1");
265       /** wants PtP messages and does not shutdown */
266       
267       try {
268          // we pass -1 -1 for the subject queue to avoid reconviguration 
269          // otherwise it will shut down the callback
270          this.destination.init(true, false, cbMaxEntries, cbMaxEntriesCache, subjMaxEntries, subjMaxEntriesCache);
271          Thread.sleep(1000L);
272       }
273       catch (Exception ex) {
274          assertTrue("an exception while initing the destination should not occur " + ex.getMessage(), false);
275       }
276 
277       for (long i=2*exLimit; i < 3*exLimit; i++) {
278          try {
279             doPublish((int)i, oid, doGc, PUB_DELAY);
280          }
281          catch (Exception ex) {
282             assertTrue("an exception on publish '" + i + "' should not occur " + ex.getMessage(), false);
283          }
284       }
285 
286       ret = this.destination.getUpdateInterceptor().waitOnUpdate(3000L*exLimit, (int)(2*exLimit));
287       assertEquals("wrong number of messages arrived", (int)2*exLimit, ret);
288       ret = this.destination.getUpdateInterceptor().waitOnUpdate(1000L, (int)(2*exLimit));
289       assertEquals("wrong number of messages arrived", (int)(2*exLimit), ret);
290       
291       msg = this.destination.getUpdateInterceptor().getMsgs();
292       if (oid != null) { // if oid is different sequence is not garanteed
293          for (int i=0; i < msg.length; i++) {
294             assertEquals("wrong message sequence (number of entries arrived: " + msg.length + ") ", i+(int)exLimit, msg[i].getContentInt());
295          }
296       }
297       if ((long)msg.length != exLimit*2) {
298          try {
299             GetKey getKey = new GetKey(this.glob, "__cmd?dump");
300             GetQos getQos = new GetQos(this.glob); 
301             MsgUnit[] tmp = this.glob.getXmlBlasterAccess().get(getKey, getQos);
302             if (tmp.length > 0) 
303                log.info(tmp[0].getContentStr());
304          }
305          catch (XmlBlasterException ex) {
306             ex.printStackTrace();
307          }
308       }
309       assertEquals("wrong number of entries arrived", exLimit*2, (long)msg.length);
310       this.destination.getUpdateInterceptor().clear();            
311       this.destination.shutdown(true);
312       this.destination = null;
313    }
314 
315 
316    /**
317     * Invoke: java org.xmlBlaster.test.client.TestPtPPersistent
318     * <p />
319     * @deprecated Use the TestRunner from the testsuite to run it:<p />
320     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestPtPPersistent</pre>
321     */
322    public static void main(String args[])
323    {
324       Global glob = new Global();
325       if (glob.init(args) != 0) {
326          System.out.println(ME + ": Init failed");
327          System.exit(1);
328       }
329 
330       TestPtPPersistent test = new TestPtPPersistent(glob, "TestPtPPersistent/1");
331 
332       test.setUp();
333       test.testPersistentPtPOneOidWithGc();
334       test.tearDown();
335 
336       test.setUp();
337       test.testPersistentPtPOneOidNoGc();
338       test.tearDown();
339 
340       test.setUp();
341       test.testPersistentPtPNoOidWithGc();
342       test.tearDown();
343 
344       test.setUp();
345       test.testPersistentPtPNoOidNoGc();
346       test.tearDown();
347 
348    }
349 }


syntax highlighted by Code2HTML, v. 0.9.1