1 /*------------------------------------------------------------------------------
  2 Name:      TestSub.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Demo code for a client using xmlBlaster
  6 Version:   $Id: TestSub.java 14833 2006-03-06 21:38:58Z laghi $
  7 ------------------------------------------------------------------------------*/
  8 package org.xmlBlaster.test.qos;
  9 
 10 import java.util.logging.Logger;
 11 import java.util.logging.Level;
 12 import org.xmlBlaster.util.Global;
 13 import org.xmlBlaster.util.XmlBlasterException;
 14 import org.xmlBlaster.client.qos.ConnectQos;
 15 import org.xmlBlaster.util.Timestamp;
 16 import org.xmlBlaster.client.I_XmlBlasterAccess;
 17 import org.xmlBlaster.client.I_Callback;
 18 import org.xmlBlaster.client.key.UpdateKey;
 19 import org.xmlBlaster.client.qos.UpdateQos;
 20 import org.xmlBlaster.client.qos.PublishReturnQos;
 21 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 22 import org.xmlBlaster.client.qos.EraseReturnQos;
 23 import org.xmlBlaster.util.MsgUnit;
 24 import org.xmlBlaster.util.qos.address.CallbackAddress;
 25 
 26 import junit.framework.*;
 27 
 28 
 29 /**
 30  * This client tests the method subscribe() with a later publish() with XPath query.
 31  * <br />
 32  * The subscribe() should be recognized for this later arriving publish()
 33  * <p>
 34  * This client may be invoked multiple time on the same xmlBlaster server,
 35  * as it cleans up everything after his tests are done.
 36  * <p>
 37  * Invoke examples:<br />
 38  * <pre>
 39  *    java junit.textui.TestRunner org.xmlBlaster.test.qos.TestSub
 40  *    java junit.swingui.TestRunner org.xmlBlaster.test.qos.TestSub
 41  * </pre>
 42  */
 43 public class TestSub extends TestCase implements I_Callback
 44 {
 45    private static String ME = "TestSub";
 46    private final Global glob;
 47    private static Logger log = Logger.getLogger(TestSub.class.getName());
 48 
 49    private boolean messageArrived = false;
 50 
 51    private String subscribeOid;
 52    private String publishOid = "dummyTestSub";
 53    private I_XmlBlasterAccess senderConnection;
 54    private String senderName;
 55    private String senderContent;
 56    private String receiverName;         // sender/receiver is here the same client
 57    private Timestamp sentTimestamp;
 58    private String cbSessionId = "0fxrc83plP";
 59 
 60    private int numReceived = 0;         // error checking
 61    private final String contentMime = "text/xml";
 62    private final String contentMimeExtended = "1.0";
 63 
 64    private UpdateQos updateQos = null;
 65 
 66    /**
 67     * Constructs the TestSub object.
 68     * <p />
 69     * @param testName  The name used in the test suite
 70     * @param loginName The name to login to the xmlBlaster
 71     */
 72    public TestSub(Global glob, String testName, String loginName)
 73    {
 74       super(testName);
 75       this.glob = glob;
 76 
 77       this.senderName = loginName;
 78       this.receiverName = loginName;
 79    }
 80 
 81 
 82    /**
 83     * Sets up the fixture.
 84     * <p />
 85     * Connect to xmlBlaster and login
 86     */
 87    protected void setUp()
 88    {
 89       try {
 90          senderConnection = glob.getXmlBlasterAccess(); // Find orb
 91 
 92          String passwd = "secret";
 93          ConnectQos qos = new ConnectQos(glob, senderName, passwd);
 94          if (log.isLoggable(Level.FINE))
 95            log.fine("the connect qos is: " + qos.toXml());
 96          
 97          CallbackAddress cbAddress = new CallbackAddress(this.glob);
 98          cbAddress.setSecretSessionId(cbSessionId); // to protect our callback server - see method update()
 99          qos.addCallbackAddress(cbAddress);
100 
101          senderConnection.connect(qos, this); // Login to xmlBlaster
102       }
103       catch (Exception e) {
104           log.severe("Login failed: " + e.toString());
105           e.printStackTrace();
106           assertTrue("Login failed: " + e.toString(), false);
107       }
108    }
109 
110 
111    /**
112     * Tears down the fixture.
113     * <p />
114     * cleaning up .... erase() the previous message OID and logout
115     */
116    protected void tearDown()
117    {
118       String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
119                       "<key oid='" + publishOid + "' queryType='EXACT'>\n" +
120                       "</key>";
121       String qos = "<qos></qos>";
122       try {
123          EraseReturnQos[] arr = senderConnection.erase(xmlKey, qos);
124          assertEquals("Erase", 1, arr.length);
125       } catch(XmlBlasterException e) { fail("Erase XmlBlasterException: " + e.getMessage()); }
126 
127       senderConnection.disconnect(null);
128    }
129 
130 
131    /**
132     * TEST: Subscribe to messages with XPATH.
133     * <p />
134     * The returned subscribeOid is checked
135     */
136    public void testSubscribeXPath()
137    {
138       if (log.isLoggable(Level.FINE)) log.fine("Subscribing using XPath syntax ...");
139 
140       String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
141                       "<key oid='' queryType='XPATH'>\n" +
142                       "   //TestSub-AGENT" +
143                       "</key>";
144       numReceived = 0;
145       subscribeOid = null;
146       try {
147          SubscribeReturnQos subscribeReturnQos = senderConnection.subscribe(xmlKey, null);
148          subscribeOid = subscribeReturnQos.getSubscriptionId();
149          log.info("Success: Subscribe subscription-id=" + subscribeOid + " done: " + subscribeReturnQos.toXml());
150       } catch(XmlBlasterException e) {
151          log.warning("XmlBlasterException: " + e.getMessage());
152          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
153       }
154       assertTrue("returned null subscribeOid", subscribeOid != null);
155       assertTrue("returned subscribeOid is empty", 0 != subscribeOid.length());
156    }
157 
158 
159    /**
160     * TEST: Construct a message and publish it.
161     * <p />
162     * The returned publishOid is checked
163     */
164    public void testPublish()
165    {
166       if (log.isLoggable(Level.FINE)) log.fine("Publishing a message ...");
167 
168       numReceived = 0;
169       String xmlKey = "<?xml version='1.0' encoding='ISO-8859-1' ?>\n" +
170                       "<key oid='" + publishOid + "' contentMime='" + contentMime + "' contentMimeExtended='" + contentMimeExtended + "'>\n" +
171                       "   <TestSub-AGENT id='192.168.124.10' subId='1' type='generic'>" +
172                       "      <TestSub-DRIVER id='FileProof' pollingFreq='10'>" +
173                       "      </TestSub-DRIVER>"+
174                       "   </TestSub-AGENT>" +
175                       "</key>";
176       senderContent = "Yeahh, i'm the new content";
177       try {
178          MsgUnit msgUnit = new MsgUnit(xmlKey, senderContent.getBytes(), "<qos></qos>");
179          sentTimestamp = new Timestamp();
180          PublishReturnQos tmp = senderConnection.publish(msgUnit);
181          assertEquals("Wrong publishOid", publishOid, tmp.getKeyOid());
182          log.info("Success: Publishing done, returned oid=" + publishOid);
183       } catch(XmlBlasterException e) {
184          log.warning("XmlBlasterException: " + e.getMessage());
185          assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
186       }
187    }
188 
189 
190    /**
191     * TEST: Construct a message and publish it,<br />
192     * the previous XPath subscription should match and send an update.
193     */
194    public void testPublishAfterSubscribeXPath()
195    {
196       testSubscribeXPath();
197       try { Thread.sleep(1000L); } catch( InterruptedException i) {}                                            // Wait some time for callback to arrive ...
198       assertEquals("numReceived after subscribe", 0, numReceived);  // there should be no Callback
199 
200       testPublish();
201       waitOnUpdate(5000L);
202       assertEquals("numReceived after publishing", 1, numReceived); // message arrived?
203    }
204 
205 
206    /**
207     * This is the callback method invoked from xmlBlaster
208     * delivering us a new asynchronous message. 
209     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
210     */
211    public String update(String cbSessionId_, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
212    {
213       log.info("Receiving update of message oid=" + updateKey.getOid() + "...");
214       log.info("subscribeOid=" + subscribeOid + ":" + updateQos.toXml());
215 
216       numReceived += 1;
217 
218       if (updateQos.isErased()) {
219          return "";
220       }
221 
222       // wait that the subscribe() has returned as well
223       for (int ii=0; ii<5; ii++) {
224          if (subscribeOid != null) 
225             break;
226          try { Thread.sleep(1000L); } catch( InterruptedException i) {}
227          log.info("waiting ...");
228       }
229 
230       assertEquals("Wrong cbSessionId", this.cbSessionId, cbSessionId_);
231       assertEquals("Wrong sender", senderName, updateQos.getSender().getLoginName());
232       assertEquals("engine.qos.update.subscriptionId: Wrong subscriptionId", subscribeOid, updateQos.getSubscriptionId());
233       assertEquals("Wrong oid of message returned", publishOid, updateKey.getOid());
234       assertEquals("Message content is corrupted", new String(senderContent), new String(content));
235       assertEquals("Message contentMime is corrupted", contentMime, updateKey.getContentMime());
236       assertEquals("Message contentMimeExtended is corrupted", contentMimeExtended, updateKey.getContentMimeExtended());
237 
238       // Test requirement "engine.qos.update.rcvTimestamp":
239       assertTrue("sentTimestamp="+sentTimestamp+" not in hamony with rcvTimestamp="+updateQos.getRcvTimestamp(),
240              sentTimestamp.getMillis() < updateQos.getRcvTimestamp().getMillis() &&
241              (sentTimestamp.getMillis()+1000) > updateQos.getRcvTimestamp().getMillis());
242 
243       messageArrived = true;
244       return "";
245    }
246 
247 
248    /**
249     * Little helper, waits until the variable 'messageArrive' is set
250     * to true, or returns when the given timeout occurs.
251     * @param timeout in milliseconds
252     */
253    private void waitOnUpdate(final long timeout)
254    {
255       long pollingInterval = 50L;  // check every 0.05 seconds
256       if (timeout < 50)  pollingInterval = timeout / 10L;
257       long sum = 0L;
258       while (!messageArrived) {
259          try {
260             Thread.sleep(pollingInterval);
261          }
262          catch( InterruptedException i)
263          {}
264          sum += pollingInterval;
265          if (sum > timeout) {
266             log.warning("Timeout of " + timeout + " occurred");
267             break;
268          }
269       }
270       messageArrived = false;
271    }
272 
273 
274    /**
275     * Method is used by TestRunner to load these tests
276     */
277    public static Test suite()
278    {
279        TestSuite suite= new TestSuite();
280        String loginName = "Tim";
281        suite.addTest(new TestSub(new Global(), "testPublishAfterSubscribeXPath", loginName));
282        return suite;
283    }
284 
285 
286    /**
287     * Invoke: java org.xmlBlaster.test.qos.TestSub
288     * @deprecated Use the TestRunner from the testsuite to run it:<p />
289     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.qos.TestSub</pre>
290     */
291    public static void main(String args[])
292    {
293       Global glob = new Global();
294       if (glob.init(args) != 0) {
295          System.err.println(ME + ": Init failed");
296          System.exit(1);
297       }
298       TestSub testSub = new TestSub(glob, "TestSub", "Tim");
299       testSub.setUp();
300       testSub.testPublishAfterSubscribeXPath();
301       testSub.tearDown();
302    }
303 }


syntax highlighted by Code2HTML, v. 0.9.1