1 /*------------------------------------------------------------------------------
  2 Name:      TestReferenceCountSwap.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.topic;
  7 
  8 
  9 import java.util.logging.Logger;
 10 import java.util.logging.Level;
 11 import org.xmlBlaster.util.Global;
 12 import org.xmlBlaster.util.XmlBlasterException;
 13 import org.xmlBlaster.util.def.ErrorCode;
 14 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 15 import org.xmlBlaster.client.I_Callback;
 16 import org.xmlBlaster.client.I_XmlBlasterAccess;
 17 import org.xmlBlaster.client.key.UpdateKey;
 18 import org.xmlBlaster.client.key.PublishKey;
 19 import org.xmlBlaster.client.key.GetKey;
 20 import org.xmlBlaster.client.key.SubscribeKey;
 21 import org.xmlBlaster.client.key.UnSubscribeKey;
 22 import org.xmlBlaster.client.key.EraseKey;
 23 import org.xmlBlaster.client.qos.ConnectQos;
 24 import org.xmlBlaster.client.qos.GetQos;
 25 import org.xmlBlaster.client.qos.GetReturnQos;
 26 import org.xmlBlaster.client.qos.PublishQos;
 27 import org.xmlBlaster.client.qos.PublishReturnQos;
 28 import org.xmlBlaster.client.qos.UpdateQos;
 29 import org.xmlBlaster.client.qos.UpdateReturnQos;
 30 import org.xmlBlaster.client.qos.SubscribeQos;
 31 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 32 import org.xmlBlaster.client.qos.EraseQos;
 33 import org.xmlBlaster.client.qos.EraseReturnQos;
 34 import org.xmlBlaster.client.qos.UnSubscribeQos;
 35 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
 36 import org.xmlBlaster.client.I_ConnectionStateListener;
 37 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 38 import org.xmlBlaster.util.property.Property;
 39 import org.xmlBlaster.util.qos.address.Address;
 40 import org.xmlBlaster.util.qos.address.CallbackAddress;
 41 import org.xmlBlaster.util.MsgUnit;
 42 
 43 import org.xmlBlaster.test.Util;
 44 import org.xmlBlaster.test.Msg;
 45 import org.xmlBlaster.test.MsgInterceptor;
 46 import junit.framework.*;
 47 
 48 
 49 /**
 50  * Tests the correct reference counting for persistent messages
 51  * after recovery. 
 52  * <pre>
 53  *   1. Start xmlBlaster server, configure cache size to 2
 54  *   2. Publish two messages
 55  *   3. Subscribe to message in fail save mode and block in callback
 56  *   4. Kill client subscriber
 57  *   5. Publish two more message to swap the first two
 58  *   6. Start same subscriber without initial update - it will receive the messages from 3.
 59  *   7. Start another subscriber with history = 10: we expect 4 updates
 60  *   If the reference counter is not properly swapped and resored test 6. will fail.
 61  * </pre>
 62  * Invoke examples:<br />
 63  * <pre>
 64  *   java junit.textui.TestRunner org.xmlBlaster.test.topic.TestReferenceCountSwap
 65  *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.topic.TestReferenceCountSwap
 66  * </pre>
 67  */
 68 public class TestReferenceCountSwap extends TestCase implements I_ConnectionStateListener
 69 {
 70    private static String ME = "TestReferenceCountSwap";
 71    private Global glob;
 72    private static Logger log = Logger.getLogger(TestReferenceCountSwap.class.getName());
 73 
 74    private int serverPort = 7694;
 75    private EmbeddedXmlBlaster serverThread;
 76 
 77    private String oid = "referenceCountMsg";
 78 
 79    class Client {
 80       I_XmlBlasterAccess con;
 81       MsgInterceptor updateInterceptor;
 82    }
 83 
 84    public TestReferenceCountSwap(String testName) {
 85       this(null, testName);
 86    }
 87 
 88    /**
 89     * Constructs the TestReferenceCountSwap object.
 90     * <p />
 91     * @param testName  The name used in the test suite
 92     */
 93    public TestReferenceCountSwap(Global glob, String testName) {
 94       super(testName);
 95       this.glob = glob;
 96    }
 97 
 98    /**
 99     * Sets up the fixture.
100     * <p />
101     * Connect to xmlBlaster and login
102     */
103    protected void setUp() {
104       this.glob = (this.glob == null) ? new Global() : this.glob;
105 
106       this.glob.init(Util.getOtherServerPorts(serverPort));
107    }
108 
109    /**
110     * Tears down the fixture.
111     * <p />
112     * cleaning up .... erase() the previous message OID and logout
113     */
114    protected void tearDown() {
115       log.info("Entering tearDown(), test is finished");
116 
117       if (this.serverThread != null) {
118          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
119       }
120 
121       // reset to default server port (necessary if other tests follow in the same JVM).
122       Util.resetPorts(this.glob);
123       Global.instance().shutdown();
124       this.glob = null;
125      
126       this.serverThread = null;
127    }
128 
129    /**
130     * Create a new connection
131     * @param loginName The login name
132     * @param cb The callback handle or null
133     */
134    private Client doConnect(String loginName, I_Callback cb) {
135       try {
136          Client client = new Client();
137          Global gg = this.glob.getClone(null);
138          ConnectQos connectQos = new ConnectQos(gg, loginName, "secret");
139          CallbackAddress cbAddress = new CallbackAddress(glob);
140          cbAddress.setRetries(-1);       // -1 == forever to avoid server side clearing of queue
141          connectQos.addCallbackAddress(cbAddress);
142          client.con = gg.getXmlBlasterAccess();
143          client.con.registerConnectionListener(this);
144          client.updateInterceptor = new MsgInterceptor(gg, log, cb); // Collect received msgs
145          client.con.connect(connectQos, client.updateInterceptor); // Login to xmlBlaster
146          return client;
147       }
148       catch (XmlBlasterException e) {
149          log.warning("doConnect() - login failed: " + e.getMessage());
150          fail(ME+".doConnect() failed: " + e.getMessage());
151       }
152       return null;
153    }
154 
155    /**
156     * Subscribe to message. 
157     */
158    private void doSubscribe(I_XmlBlasterAccess con) {
159       if (log.isLoggable(Level.FINE)) log.fine("Subscribing using EXACT oid syntax ...");
160       try {
161          SubscribeKey subscribeKey = new SubscribeKey(con.getGlobal(), this.oid);
162          SubscribeQos subscribeQos = new SubscribeQos(con.getGlobal());
163          String subscribeOid = con.subscribe(subscribeKey, subscribeQos).getSubscriptionId();
164          log.info("Success: Subscribe on " + subscribeOid + " done");
165          assertTrue("returned null subscribeOid", subscribeOid != null);
166       } catch(XmlBlasterException e) {
167          log.severe("XmlBlasterException: " + e.getMessage());
168          fail(ME+".doSubscribe() failed: " + e.getMessage());
169       }
170    }
171 
172    /**
173     * Construct a message and publish it persistent. 
174     */
175    private void doPublish(I_XmlBlasterAccess con) {
176       if (log.isLoggable(Level.FINE)) log.fine("Publishing a message");
177       try {
178          PublishKey publishKey = new PublishKey(con.getGlobal(), this.oid);
179          PublishQos publishQos = new PublishQos(con.getGlobal());
180          publishQos.setPersistent(true);
181          String content = "Hi";
182          MsgUnit msgUnit = new MsgUnit(publishKey, content.getBytes(), publishQos);
183          con.publish(msgUnit);
184          log.info("Success: Publishing of " + this.oid + " done");
185       } catch(XmlBlasterException e) {
186          log.severe("XmlBlasterException: " + e.getMessage());
187          fail(ME+".doPublish() failed: " + e.getMessage());
188       }
189    }
190 
191    /**
192     * Erase the message. 
193     */
194    private void doErase(I_XmlBlasterAccess con) {
195       if (log.isLoggable(Level.FINE)) log.fine("Erasing ...");
196       try {
197          EraseKey eraseKey = new EraseKey(con.getGlobal(), this.oid);
198          EraseQos eraseQos = new EraseQos(con.getGlobal());
199          eraseQos.setForceDestroy(true);
200          EraseReturnQos[] arr = con.erase(eraseKey, eraseQos);
201       }
202       catch(XmlBlasterException e) {
203          log.severe("XmlBlasterException: " + e.getMessage());
204          fail(ME+".doErase() failed: " + e.getMessage());
205       }
206    }
207 
208    /**
209     * Test as described in class javadoc. 
210     */
211    public void testReferenceCount() {
212       log.info("testReferenceCount START");
213       log.info("STEP1: Start xmlBlaster server");
214       this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(serverPort);
215 
216       log.info("STEP2: Publish a message twice");
217       Client pub = doConnect("publisher", null);
218       doPublish(pub.con);
219       doPublish(pub.con);
220 
221       log.info("STEP3: Start subscriber and subscribe and block in callback");
222       Client sub1 = doConnect("subscribe/1", new I_Callback() {
223          public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
224             log.info("Receiving update of a message oid=" + updateKey.getOid() +
225                            " priority=" + updateQos.getPriority() +
226                            " state=" + updateQos.getState() +
227                            " we going to sleep and don't return control to server");
228             try { Thread.sleep(1000000L); } catch( InterruptedException i) {}
229             log.severe("Waiking up from sleep");
230             fail("Waiking up from sleep");
231             return "";
232          }
233       });
234       doSubscribe(sub1.con);
235       assertEquals("", 1, sub1.updateInterceptor.waitOnUpdate(1000L, 1));
236       sub1.updateInterceptor.clear();
237 
238       log.info("STEP4: Kill server and thereafter the clients");
239       EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
240       this.serverThread = null;
241       pub.con.disconnect(null);
242       sub1.con.disconnect(null);
243 
244       log.info("STEP5: Start server and recover message from persistence store");
245       this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(serverPort);
246 
247       log.info("STEP6: Start subscriber and expect the last not delivered message to be sent automatically");
248       sub1 = doConnect("subscribe/1", null);
249       assertEquals("", 1, sub1.updateInterceptor.waitOnUpdate(1000L, 1));
250       sub1.updateInterceptor.clear();
251       sub1.con.disconnect(null);
252 
253       log.info("STEP7: Start another subscriber and subscribe");
254       Client sub2 = doConnect("subscribe2", null);
255       doSubscribe(sub2.con);
256       assertEquals("", 1, sub2.updateInterceptor.waitOnUpdate(1000L, 1));
257       sub2.updateInterceptor.clear();
258 
259       log.info("testReferenceCount SUCCESS");
260 
261       log.info("STEP8: Cleanup");
262       doErase(sub2.con);
263       sub2.con.disconnect(null);
264 
265       EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
266       this.serverThread = null;
267    }
268 
269    /**
270     * This is the callback method invoked from I_XmlBlasterAccess
271     * informing the client in an asynchronous mode if the connection was established.
272     * <p />
273     * This method is enforced through interface I_ConnectionStateListener
274     */
275    public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
276       log.info("I_ConnectionStateListener-"+connection.getId()+": We were lucky, reconnected to xmlBlaster");
277    }
278 
279    public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
280    }
281 
282    public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
283       if (log!=null) log.warning("I_ConnectionStateListener-"+connection.getId()+": Lost connection to xmlBlaster");
284    }
285 
286    public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
287       if (log!=null) log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
288    }
289 
290    /**
291     * Method is used by TestRunner to load these tests
292     */
293    public static Test suite() {
294        TestSuite suite= new TestSuite();
295        suite.addTest(new TestReferenceCountSwap(null, "testReferenceCount"));
296        return suite;
297    }
298 
299    /**
300     * Invoke: java org.xmlBlaster.test.topic.TestReferenceCountSwap
301     * @deprecated Use the TestRunner from the testsuite to run it:<p />
302     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.topic.TestReferenceCountSwap</pre>
303     */
304    public static void main(String args[]) {
305       Global glob = new Global();
306       if (glob.init(args) != 0) {
307          System.err.println(ME + ": Init failed");
308          System.exit(1);
309       }
310       TestReferenceCountSwap testSub = new TestReferenceCountSwap(glob, "TestReferenceCountSwap");
311       testSub.setUp();
312       testSub.testReferenceCount();
313       testSub.tearDown();
314    }
315 }


syntax highlighted by Code2HTML, v. 0.9.1