1 /*------------------------------------------------------------------------------
  2 Name:      TestReferenceCount.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.topic;
  7 
  8 import java.util.logging.Logger;
  9 import java.util.logging.Level;
 10 import org.xmlBlaster.util.Global;
 11 import org.xmlBlaster.util.XmlBlasterException;
 12 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 13 import org.xmlBlaster.client.I_Callback;
 14 import org.xmlBlaster.client.I_XmlBlasterAccess;
 15 import org.xmlBlaster.client.key.UpdateKey;
 16 import org.xmlBlaster.client.key.PublishKey;
 17 import org.xmlBlaster.client.key.SubscribeKey;
 18 import org.xmlBlaster.client.key.EraseKey;
 19 import org.xmlBlaster.client.qos.ConnectQos;
 20 import org.xmlBlaster.client.qos.PublishQos;
 21 import org.xmlBlaster.client.qos.UpdateQos;
 22 import org.xmlBlaster.client.qos.SubscribeQos;
 23 import org.xmlBlaster.client.qos.EraseQos;
 24 import org.xmlBlaster.client.qos.EraseReturnQos;
 25 import org.xmlBlaster.client.I_ConnectionStateListener;
 26 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 27 import org.xmlBlaster.util.qos.address.CallbackAddress;
 28 import org.xmlBlaster.util.MsgUnit;
 29 
 30 import org.xmlBlaster.test.Util;
 31 import org.xmlBlaster.test.MsgInterceptor;
 32 import junit.framework.*;
 33 
 34 
 35 /**
 36  * Tests the correct reference counting for persistent messages
 37  * after recovery. 
 38  * <pre>
 39  *   1. Start xmlBlaster server
 40  *   2. Publish two messages
 41  *   3. Subscribe to message and block in callback
 42  *   4. Kill server and clients
 43  *   5. Restart server
 44  *   6. Start same subscriber - it will receive the message from 3.
 45  *   7. Start another subscriber: we expect an update
 46  *   If the reference counter is not properly set on recovery
 47  *   test 7. will fail.
 48  * </pre>
 49  * Invoke examples:<br />
 50  * <pre>
 51  *   java junit.textui.TestRunner org.xmlBlaster.test.topic.TestReferenceCount
 52  *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.topic.TestReferenceCount
 53  * </pre>
 54  */
 55 public class TestReferenceCount extends TestCase implements I_ConnectionStateListener
 56 {
 57    private static String ME = "TestReferenceCount";
 58    private Global glob;
 59    private static Logger log = Logger.getLogger(TestReferenceCount.class.getName());
 60 
 61    private int serverPort = 7694;
 62    private EmbeddedXmlBlaster serverThread;
 63 
 64    private String oid = "referenceCountMsg";
 65 
 66    class Client {
 67       I_XmlBlasterAccess con;
 68       MsgInterceptor updateInterceptor;
 69    }
 70 
 71    public TestReferenceCount(String testName) {
 72       this(null, testName);
 73    }
 74 
 75    /**
 76     * Constructs the TestReferenceCount object.
 77     * <p />
 78     * @param testName  The name used in the test suite
 79     */
 80    public TestReferenceCount(Global glob, String testName) {
 81       super(testName);
 82       this.glob = glob;
 83    }
 84 
 85    /**
 86     * Sets up the fixture.
 87     * <p />
 88     * Connect to xmlBlaster and login
 89     */
 90    protected void setUp() {
 91       this.glob = (this.glob == null) ? new Global() : this.glob;
 92 
 93       this.glob.init(Util.getOtherServerPorts(serverPort));
 94    }
 95 
 96    /**
 97     * Tears down the fixture.
 98     * <p />
 99     * cleaning up .... erase() the previous message OID and logout
100     */
101    protected void tearDown() {
102       log.info("Entering tearDown(), test is finished");
103 
104       if (this.serverThread != null) {
105          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
106       }
107 
108       // reset to default server port (necessary if other tests follow in the same JVM).
109       Util.resetPorts(this.glob);
110       Global.instance().shutdown();
111       this.glob = null;
112      
113       this.serverThread = null;
114    }
115 
116    /**
117     * Create a new connection
118     * @param loginName The login name
119     * @param cb The callback handle or null
120     */
121    private Client doConnect(String loginName, I_Callback cb) {
122       try {
123          Client client = new Client();
124          Global gg = this.glob.getClone(null);
125          ConnectQos connectQos = new ConnectQos(gg, loginName, "secret");
126          CallbackAddress cbAddress = new CallbackAddress(glob);
127          cbAddress.setRetries(-1);       // -1 == forever to avoid server side clearing of queue
128          connectQos.addCallbackAddress(cbAddress);
129          client.con = gg.getXmlBlasterAccess();
130          client.con.registerConnectionListener(this);
131          client.updateInterceptor = new MsgInterceptor(gg, log, cb); // Collect received msgs
132          client.con.connect(connectQos, client.updateInterceptor); // Login to xmlBlaster
133          return client;
134       }
135       catch (XmlBlasterException e) {
136          log.warning("doConnect() - login failed: " + e.getMessage());
137          fail(ME+".doConnect() failed: " + e.getMessage());
138       }
139       return null;
140    }
141 
142    /**
143     * Subscribe to message. 
144     */
145    private void doSubscribe(I_XmlBlasterAccess con) {
146       if (log.isLoggable(Level.FINE)) log.fine("Subscribing using EXACT oid syntax ...");
147       try {
148          SubscribeKey subscribeKey = new SubscribeKey(con.getGlobal(), this.oid);
149          SubscribeQos subscribeQos = new SubscribeQos(con.getGlobal());
150          String subscribeOid = con.subscribe(subscribeKey, subscribeQos).getSubscriptionId();
151          log.info("Success: Subscribe on " + subscribeOid + " done");
152          assertTrue("returned null subscribeOid", subscribeOid != null);
153       } catch(XmlBlasterException e) {
154          log.severe("XmlBlasterException: " + e.getMessage());
155          fail(ME+".doSubscribe() failed: " + e.getMessage());
156       }
157    }
158 
159    /**
160     * Construct a message and publish it persistent. 
161     */
162    private void doPublish(I_XmlBlasterAccess con) {
163       if (log.isLoggable(Level.FINE)) log.fine("Publishing a message");
164       try {
165          PublishKey publishKey = new PublishKey(con.getGlobal(), this.oid);
166          PublishQos publishQos = new PublishQos(con.getGlobal());
167          publishQos.setPersistent(true);
168          String content = "Hi";
169          MsgUnit msgUnit = new MsgUnit(publishKey, content.getBytes(), publishQos);
170          con.publish(msgUnit);
171          log.info("Success: Publishing of " + this.oid + " done");
172       } catch(XmlBlasterException e) {
173          log.severe("XmlBlasterException: " + e.getMessage());
174          fail(ME+".doPublish() failed: " + e.getMessage());
175       }
176    }
177 
178    /**
179     * Erase the message. 
180     */
181    private void doErase(I_XmlBlasterAccess con) {
182       if (log.isLoggable(Level.FINE)) log.fine("Erasing ...");
183       try {
184          EraseKey eraseKey = new EraseKey(con.getGlobal(), this.oid);
185          EraseQos eraseQos = new EraseQos(con.getGlobal());
186          eraseQos.setForceDestroy(true);
187          EraseReturnQos[] arr = con.erase(eraseKey, eraseQos);
188       }
189       catch(XmlBlasterException e) {
190          log.severe("XmlBlasterException: " + e.getMessage());
191          fail(ME+".doErase() failed: " + e.getMessage());
192       }
193    }
194 
195    /**
196     * Test as described in class javadoc. 
197     */
198    public void testReferenceCount() {
199       // long waitTime = 180000L;
200       long waitTime = 1000L;
201       log.info("testReferenceCount START");
202       log.info("STEP1: Start xmlBlaster server");
203       this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.glob);
204 
205       log.info("STEP2: Publish a message twice");
206       Client pub = doConnect("publisher", null);
207       doPublish(pub.con);
208       doPublish(pub.con);
209 
210       log.info("STEP3: Start subscriber and subscribe and block in callback");
211       Client sub1 = doConnect("subscribe/1", new I_Callback() {
212          public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
213             log.info("Receiving update of a message oid=" + updateKey.getOid() +
214                            " priority=" + updateQos.getPriority() +
215                            " state=" + updateQos.getState() +
216                            " we are going to sleep and don't return control to server");
217             try { Thread.sleep(1000000L); } catch( InterruptedException i) {}
218             log.severe("Waking up from sleep");
219             fail("Waking up from sleep");
220             return "";
221          }
222       });
223       doSubscribe(sub1.con);
224       assertEquals("", 1, sub1.updateInterceptor.waitOnUpdate(waitTime, 1));
225       sub1.updateInterceptor.clear();
226 
227       log.info("STEP4: Kill server and thereafter the clients");
228       EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
229       this.serverThread = null;
230       pub.con.disconnect(null);
231       sub1.con.leaveServer(null);
232 
233       log.info("STEP5: Start server and recover message from persistence store");
234       this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.glob);
235 
236       log.info("STEP6: Start subscriber and expect the last not delivered message to be sent automatically");
237       sub1 = doConnect("subscribe/1", null);
238       assertEquals("", 1, sub1.updateInterceptor.waitOnUpdate(waitTime, 1));
239       sub1.updateInterceptor.clear();
240       sub1.con.disconnect(null);
241 
242       log.info("STEP7: Start another subscriber and subscribe");
243       Client sub2 = doConnect("subscribe2", null);
244       doSubscribe(sub2.con);
245       assertEquals("", 1, sub2.updateInterceptor.waitOnUpdate(waitTime, 1));
246       sub2.updateInterceptor.clear();
247 
248       log.info("testReferenceCount SUCCESS");
249 
250       log.info("STEP8: Cleanup");
251       doErase(sub2.con);
252       sub2.con.disconnect(null);
253 
254       EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
255       this.serverThread = null;
256    }
257 
258    /**
259     * This is the callback method invoked from I_XmlBlasterAccess
260     * informing the client in an asynchronous mode if the connection was established.
261     * <p />
262     * This method is enforced through interface I_ConnectionStateListener
263     */
264    public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
265       log.info("I_ConnectionStateListener-"+connection.getId()+": We were lucky, reconnected to xmlBlaster");
266    }
267 
268    public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
269    }
270 
271    public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
272       if (log!=null) log.warning("I_ConnectionStateListener-"+connection.getId()+": Lost connection to xmlBlaster");
273    }
274 
275    public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
276       if (log!=null) log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
277    }
278 
279    /**
280     * Method is used by TestRunner to load these tests
281     */
282    public static Test suite() {
283        TestSuite suite= new TestSuite();
284        suite.addTest(new TestReferenceCount(null, "testReferenceCount"));
285        return suite;
286    }
287 
288    /**
289     * Invoke: java org.xmlBlaster.test.topic.TestReferenceCount
290     * @deprecated Use the TestRunner from the testsuite to run it:<p />
291     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.topic.TestReferenceCount</pre>
292     */
293    public static void main(String args[]) {
294       Global glob = new Global();
295       if (glob.init(args) != 0) {
296          System.err.println(ME + ": Init failed");
297          System.exit(1);
298       }
299       TestReferenceCount testSub = new TestReferenceCount(glob, "TestReferenceCount");
300       testSub.setUp();
301       testSub.testReferenceCount();
302       testSub.tearDown();
303    }
304 }


syntax highlighted by Code2HTML, v. 0.9.1