1 /*------------------------------------------------------------------------------
  2 Name:      TestSub.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Demo code for a client using xmlBlaster
  6 Version:   $Id: TestSub.java 14833 2006-03-06 21:38:58Z laghi $
  7 ------------------------------------------------------------------------------*/
  8 package org.xmlBlaster.test.qos;
  9 
 10 import java.util.logging.Logger;
 11 
 12 import junit.framework.Test;
 13 import junit.framework.TestCase;
 14 import junit.framework.TestSuite;
 15 
 16 import org.xmlBlaster.client.I_Callback;
 17 import org.xmlBlaster.client.I_XmlBlasterAccess;
 18 import org.xmlBlaster.client.key.PublishKey;
 19 import org.xmlBlaster.client.key.SubscribeKey;
 20 import org.xmlBlaster.client.key.UpdateKey;
 21 import org.xmlBlaster.client.qos.ConnectQos;
 22 import org.xmlBlaster.client.qos.EraseReturnQos;
 23 import org.xmlBlaster.client.qos.PublishQos;
 24 import org.xmlBlaster.client.qos.PublishReturnQos;
 25 import org.xmlBlaster.client.qos.SubscribeQos;
 26 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 27 import org.xmlBlaster.client.qos.UpdateQos;
 28 import org.xmlBlaster.util.Global;
 29 import org.xmlBlaster.util.MsgUnit;
 30 import org.xmlBlaster.util.XmlBlasterException;
 31 import org.xmlBlaster.util.qos.address.CallbackAddress;
 32 
 33 /**
 34  * This client tests the method subscribe() with a later publish() with XPath
 35  * query. <br />
 36  * The subscribe() should be recognized for this later arriving publish()
 37  * <p>
 38  * This client may be invoked multiple time on the same xmlBlaster server, as it
 39  * cleans up everything after his tests are done.
 40  * <p>
 41  * Invoke examples:<br />
 42  * 
 43  * <pre>
 44  *    java junit.textui.TestRunner org.xmlBlaster.test.qos.TestSub
 45  *    java junit.swingui.TestRunner org.xmlBlaster.test.qos.TestSub
 46  * </pre>
 47  */
 48 public class TestSubNewestOnly extends TestCase implements I_Callback {
 49    private static String ME = "TestSubNewestOnly";
 50    private final Global glob;
 51    private static Logger log = Logger.getLogger(TestSubNewestOnly.class
 52          .getName());
 53    private boolean messageArrived;
 54    private String subscribeOid;
 55    private String publishOid = "topic_TestSubNewestOnly";
 56    private I_XmlBlasterAccess senderConnection;
 57    private String senderName = "Publisher";
 58    private String senderContent = "";
 59    private final String receiverName = "Subscriber/session/1";
 60    private I_XmlBlasterAccess receiverConnection;
 61    private int numReceived = 0;
 62 
 63    /**
 64     * Constructs the TestSub object.
 65     * <p />
 66     * 
 67     * @param testName
 68     *            The name used in the test suite
 69     */
 70    public TestSubNewestOnly(Global glob, String testName) {
 71       super(testName);
 72       this.glob = glob;
 73    }
 74 
 75    /**
 76     * Connect to xmlBlaster and login
 77     */
 78    protected void setUp() {
 79       String passwd = "secret";
 80       try {
 81          Global senderGlob = glob.getClone(null);
 82          senderConnection = senderGlob.getXmlBlasterAccess();
 83          senderConnection.connect(new ConnectQos(senderGlob, senderName,
 84                passwd), this);
 85          erase(false);
 86       } catch (Exception e) {
 87          log.severe("Login failed: " + e.toString());
 88          e.printStackTrace();
 89          assertTrue("Login failed: " + e.toString(), false);
 90       }
 91 
 92       connectSubscriber();
 93    }
 94 
 95    private void connectSubscriber() {
 96       String passwd = "secret";
 97       try {
 98          Global receiverGlob = glob.getClone(null);
 99          receiverConnection = receiverGlob.getXmlBlasterAccess();
100          ConnectQos connectQos = new ConnectQos(receiverGlob, receiverName,
101                passwd);
102          CallbackAddress cbProps = new CallbackAddress(new Global());
103          cbProps.setRetries(-1);
104          connectQos.addCallbackAddress(cbProps);
105          receiverConnection.connect(connectQos, this);
106       } catch (Exception e) {
107          log.severe("Login failed: " + e.toString());
108          e.printStackTrace();
109          fail("Login failed: " + e.toString());
110       }
111    }
112 
113    private void erase(boolean check) {
114       String xmlKey = "<key oid='" + publishOid + "' queryType='EXACT'/>";
115       String qos = "<qos></qos>";
116       try {
117          EraseReturnQos[] arr = senderConnection.erase(xmlKey, qos);
118          if (check)
119             assertEquals("Erase", 1, arr.length);
120       } catch (XmlBlasterException e) {
121          fail("Erase XmlBlasterException: " + e.getMessage());
122       }
123    }
124 
125    /**
126     * Tears down the fixture.
127     * <p />
128     * cleaning up .... erase() the previous message OID and logout
129     */
130    protected void tearDown() {
131       erase(true);
132       receiverConnection.disconnect(null);
133       senderConnection.disconnect(null);
134    }
135 
136    public void subscribe() {
137       SubscribeKey key = new SubscribeKey(receiverConnection.getGlobal(),
138             publishOid);
139       SubscribeQos qos = new SubscribeQos(receiverConnection.getGlobal());
140       qos.setNewestOnly(true);
141       numReceived = 0;
142       subscribeOid = null;
143       try {
144          SubscribeReturnQos subscribeReturnQos = receiverConnection
145                .subscribe(key, qos);
146          subscribeOid = subscribeReturnQos.getSubscriptionId();
147          log.info("Success: Subscribe subscription-id=" + subscribeOid
148                + " done: " + subscribeReturnQos.toXml());
149       } catch (XmlBlasterException e) {
150          log.warning("XmlBlasterException: " + e.getMessage());
151          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(),
152                false);
153       }
154       assertTrue("returned null subscribeOid", subscribeOid != null);
155       assertTrue("returned subscribeOid is empty", 0 != subscribeOid.length());
156    }
157 
158    public void publishThree() {
159       int count = 3;
160       for (int i = 0; i < count; i++) {
161          PublishKey publishKey = new PublishKey(
162                senderConnection.getGlobal(), publishOid);
163          PublishQos publishQos = new PublishQos(
164                senderConnection.getGlobal(), publishOid);
165          senderContent = "" + (i + 1);
166          try {
167             MsgUnit msgUnit = new MsgUnit(publishKey,
168                   senderContent.getBytes(), publishQos);
169             PublishReturnQos tmp = senderConnection.publish(msgUnit);
170             assertEquals("Wrong publishOid", publishOid, tmp.getKeyOid());
171             log.info("Success: Publishing " + senderContent + " done, returned oid=" + publishOid);
172          } catch (XmlBlasterException e) {
173             log.warning("XmlBlasterException: " + e.getMessage());
174             assertTrue("publish - XmlBlasterException: " + e.getMessage(),
175                   false);
176          }
177       }
178    }
179 
180    /**
181     * TEST: Construct a message and publish it,<br />
182     * the previous XPath subscription should match and send an update.
183     */
184    public void testNewestOnly() throws InterruptedException {
185       subscribe();
186       receiverConnection.leaveServer(null);
187       Thread.sleep(1000L);
188       assertEquals("numReceived after subscribe", 0, numReceived);
189 
190       publishThree();
191       Thread.sleep(1000L);
192       assertEquals("numReceived after subscribe", 0, numReceived);
193 
194       connectSubscriber();
195       waitOnUpdate(1000L);
196       assertEquals("numReceived after subscribe", 1, numReceived);
197    }
198 
199    /**
200     * This is the callback method invoked from xmlBlaster delivering us a new
201     * asynchronous message.
202     * 
203     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[],
204     *      UpdateQos)
205     */
206    public String update(String cbSessionId_, UpdateKey updateKey,
207          byte[] content, UpdateQos updateQos) {
208       String contentStr = new String(content);
209 
210       if (updateQos.isErased()) {
211          return "";
212       }
213 
214       log.info("Receiving update of message " + updateKey.getOid() + " with content="
215             + contentStr);
216       // log.info("subscribeOid=" + subscribeOid + ":" + updateQos.toXml());
217 
218       numReceived += 1;
219 
220       // wait that the subscribe() has returned as well
221       for (int ii = 0; ii < 5; ii++) {
222          if (subscribeOid != null)
223             break;
224          try {
225             Thread.sleep(1000L);
226          } catch (InterruptedException i) {
227          }
228          log.info("waiting ...");
229       }
230 
231       assertEquals("Wrong sender", senderName, updateQos.getSender()
232             .getLoginName());
233       assertEquals("engine.qos.update.subscriptionId: Wrong subscriptionId",
234             subscribeOid, updateQos.getSubscriptionId());
235       assertEquals("Wrong oid of message returned", publishOid,
236             updateKey.getOid());
237 
238       assertEquals("Message content is corrupted", senderContent,
239             contentStr);
240 
241       messageArrived = true;
242       return "";
243    }
244 
245    /**
246     * Little helper, waits until the variable 'messageArrive' is set to true,
247     * or returns when the given timeout occurs.
248     * 
249     * @param timeout
250     *            in milliseconds
251     */
252    private void waitOnUpdate(final long timeout) {
253       long pollingInterval = 50L; // check every 0.05 seconds
254       if (timeout < 50)
255          pollingInterval = timeout / 10L;
256       long sum = 0L;
257       while (!messageArrived) {
258          try {
259             Thread.sleep(pollingInterval);
260          } catch (InterruptedException i) {
261          }
262          sum += pollingInterval;
263          if (sum > timeout) {
264             log.warning("Timeout of " + timeout + " occurred");
265             break;
266          }
267       }
268       messageArrived = false;
269    }
270 
271    /**
272     * Method is used by TestRunner to load these tests
273     * 
274     * <pre>
275     * java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.qos.TestSubNewestOnly
276     * </pre>
277     */
278    public static Test suite() {
279       TestSuite suite = new TestSuite();
280       suite.addTest(new TestSubNewestOnly(new Global(), "testNewestOnly"));
281       return suite;
282    }
283 
284    /**
285     * Invoke: java org.xmlBlaster.test.qos.TestSubNewestOnly
286     * 
287     * @throws InterruptedException
288     */
289    public static void main(String args[]) throws InterruptedException {
290       Global glob = new Global();
291       if (glob.init(args) != 0) {
292          System.err.println(ME + ": Init failed");
293          System.exit(1);
294       }
295       TestSubNewestOnly testSub = new TestSubNewestOnly(glob,
296             "testNewestOnly");
297       testSub.setUp();
298       testSub.testNewestOnly();
299       testSub.tearDown();
300    }
301 }


syntax highlighted by Code2HTML, v. 0.9.1