1 /*------------------------------------------------------------------------------
  2 Name:      TestXPathSubscribeFilter.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Test XPath filter.
  6 ------------------------------------------------------------------------------*/
  7 package org.xmlBlaster.test.mime;
  8 
  9 import java.util.logging.Logger;
 10 import org.xmlBlaster.util.Global;
 11 import org.xmlBlaster.util.XmlBlasterException;
 12 import org.xmlBlaster.client.qos.ConnectQos;
 13 import org.xmlBlaster.client.I_XmlBlasterAccess;
 14 import org.xmlBlaster.client.I_Callback;
 15 import org.xmlBlaster.client.key.UpdateKey;
 16 import org.xmlBlaster.client.qos.UpdateQos;
 17 import org.xmlBlaster.client.qos.EraseReturnQos;
 18 import org.xmlBlaster.client.qos.SubscribeQos;
 19 import org.xmlBlaster.util.MsgUnit;
 20 import org.xmlBlaster.util.qos.AccessFilterQos;
 21 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 22 import org.xmlBlaster.test.Util;
 23 
 24 import junit.framework.*;
 25 
 26 import java.util.HashMap;
 27 
 28 /**
 29  * This client does test of XPathFilter based queries.<br />
 30  * <p />
 31  * This client may be invoked multiple time on the same xmlBlaster server,
 32  * as it cleans up everything after his tests are done.
 33  * <p>
 34  * Invoke examples:<br />
 35  * <pre>
 36  *    java junit.textui.TestRunner -noloading org.xmlBlaster.test.mime.TestXPathSubscribeFilter
 37  *    java junit.swingui.TestRunner -noloading org.xmlBlaster.test.mime.TestXPathSubscribeFilter
 38  * </pre>
 39  *
 40  * @author Peter Antman
 41  * @see org.xmlBlaster.engine.mime.xpath.XPathFilter
 42  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/mime.plugin.access.xpath.html
 43  */
 44 public class TestXPathSubscribeFilter extends TestCase implements I_Callback
 45 {
 46    private static String ME = "Tim";
 47    private final Global glob;
 48    private static Logger log = Logger.getLogger(TestXPathSubscribeFilter.class.getName());
 49 
 50    private I_XmlBlasterAccess con = null;
 51    private String name;
 52    private String passwd = "secret";
 53    private EmbeddedXmlBlaster serverThread;
 54    private int serverPort = 7624;
 55 
 56    private HashMap subscriberTable = new HashMap();
 57    private int[] subRec = new int[4];
 58    String subscribeOid;
 59    String subscribeOid2;
 60    String subscribeOid3;
 61    String subscribeOid4;
 62    /**
 63     * Constructs the TestXPathSubscribeFilter object.
 64     * <p />
 65     * @param testName   The name used in the test suite
 66     * @param name       The name to login to the xmlBlaster
 67     */
 68    public TestXPathSubscribeFilter(Global glob, String testName, String name)
 69    {
 70       super(testName);
 71       this.glob = glob;
 72 
 73       this.name = name;
 74    }
 75 
 76    /**
 77     * Sets up the fixture.
 78     * <p />
 79     * We start an own xmlBlaster server in a separate thread,
 80     * it has configured to load our XPath MIME filter plugin with extensions.
 81     * <p />
 82     * Then we connect as a client
 83     */
 84    protected void setUp()
 85    {
 86       String[] args = {
 87          "-bootstrapPort",        // For all protocol we may use set an alternate server port
 88          "" + serverPort,
 89          "-plugin/socket/port",
 90          "" + (serverPort-1),
 91          "-plugin/rmi/registryPort",
 92          "" + (serverPort-2),
 93          "-plugin/xmlrpc/port",
 94          "" + (serverPort-3),
 95          "-MimeAccessPlugin[XPathFilter][1.0]",
 96          "org.xmlBlaster.engine.mime.xpath.XPathFilter,engine.mime.xpath.extension_functions=:contains-ignore-case:org.xmlBlaster.engine.mime.xpath.ContainsIgnoreCaseFunction;:recursive-text:org.xmlBlaster.engine.mime.xpath.RecursiveTextFunction",
 97          //,classpath=jaxen.jar
 98          "-admin.remoteconsole.port",
 99          "0"
100       };
101       /*
102 ,
103          "-logging",
104          "INFO"
105       */
106       glob.init(args);
107 
108       serverThread = EmbeddedXmlBlaster.startXmlBlaster(args);
109       log.info("XmlBlaster is ready for testing subscribe MIME filter");
110 
111       try {
112          log.info("Connecting ...");
113          con = glob.getXmlBlasterAccess();
114          ConnectQos qos = new ConnectQos(glob, name, passwd);
115          con.connect(qos, this); // Login to xmlBlaster
116       }
117       catch (Exception e) {
118          Thread.dumpStack();
119          log.severe("Can't connect to xmlBlaster: " + e.toString());
120       }
121 
122       // Subscribe to a message with a supplied filter
123       try {
124          // One sport subscriber
125          SubscribeQos qos = new SubscribeQos(glob);
126          qos.addAccessFilter(new AccessFilterQos(glob, "XPathFilter", "1.0", "/news[@type='sport']"));
127          
128          subscribeOid = con.subscribe("<key oid='MSG'/>", qos.toXml()).getSubscriptionId();
129          subscriberTable.put(subscribeOid, new Integer(0));
130          log.info("Success: Subscribe subscription-id=" + subscribeOid + " done");
131          // One culture subscriber
132          qos = new SubscribeQos(glob);
133          qos.addAccessFilter(new AccessFilterQos(glob, "XPathFilter", "1.0", "/news[@type='culture']"));
134          
135          subscribeOid2 = con.subscribe("<key oid='MSG'/>", qos.toXml()).getSubscriptionId();
136          subscriberTable.put(subscribeOid2, new Integer(1));
137          log.info("Success: Subscribe subscription-id2=" + subscribeOid2 + " done");
138 
139          // And one on another msg type but with the same xpath
140          qos = new SubscribeQos(glob);
141          qos.addAccessFilter(new AccessFilterQos(glob, "XPathFilter", "1.0", "/news[@type='sport' or @type='culture']"));
142          
143          
144          subscribeOid3 = con.subscribe("<key oid='AnotherMsG'/>", qos.toXml()).getSubscriptionId();
145          subscriberTable.put(subscribeOid3, new Integer(2));
146          log.info("Success: Subscribe subscription-id3=" + subscribeOid3 + " done");
147 
148          // Ad with extention functions
149          qos = new SubscribeQos(glob);
150          qos.addAccessFilter(new AccessFilterQos(glob, "XPathFilter", "1.0", "/news[ contains-ignore-case( recursive-text(body), 'needle')]"));
151          
152          
153          subscribeOid4 = con.subscribe("<key oid='AnotherMsG'/>", qos.toXml()).getSubscriptionId();
154          subscriberTable.put(subscribeOid4, new Integer(3));
155          log.info("Success: Subscribe subscription-id4=" + subscribeOid4 + " done");
156          
157       } catch(XmlBlasterException e) {
158          log.warning("XmlBlasterException: " + e.getMessage());
159          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
160       }
161    }
162 
163    /**
164     * Tears down the fixture.
165     * <p />
166     * cleaning up .... erase() the previous message OID and logout
167     */
168    protected void tearDown()
169    {
170       log.info("TEST: tearing down");
171       try { Thread.sleep(200L); } catch( InterruptedException i) {}   // Wait 200 milli seconds, until all updates are processed ...
172       
173       try {
174          con.unSubscribe("<key oid='"+subscribeOid+"'/>",
175                           "<qos/>");
176          con.unSubscribe("<key oid='"+subscribeOid2+"'/>",
177                           "<qos/>");
178          con.unSubscribe("<key oid='"+subscribeOid3+"'/>",
179                           "<qos/>");
180          con.unSubscribe("<key oid='"+subscribeOid4+"'/>",
181                           "<qos/>");
182          EraseReturnQos[] arr = con.erase("<key oid='MSG'/>", "<qos/>");
183          assertEquals("Erase", 1, arr.length);
184          arr = con.erase("<key oid='AnotherMsG'/>", "<qos/>");
185          assertEquals("Erase", 1, arr.length);
186       } catch(XmlBlasterException e) { fail("Erase XmlBlasterException: " + e.getMessage()); }
187       
188       con.disconnect(null);
189       con=null;
190       
191       try { Thread.sleep(500L); } catch( InterruptedException i) {}    // Wait some time
192       EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
193       this.serverThread = null;
194       
195       // reset to default server port (necessary if other tests follow in the same JVM).
196       Util.resetPorts();
197    }
198 
199    /**
200     *
201     *
202     */
203    public void testFilter()
204    {
205       log.info("testFilter() with XPath filter /news[@type='sport'] ...");
206 
207       log.info("TEST 1: Testing sport message");
208       try {
209          con.publish(new MsgUnit("<key oid='MSG' contentMime='text/xml'/>", "<news type='sport'></news>".getBytes(), null));
210       } catch(XmlBlasterException e) {
211          log.warning("XmlBlasterException: " + e.getMessage());
212          assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
213       }
214       waitOnUpdate(subscribeOid,4000L, 1);
215 
216 
217       log.info("TEST 2: Testing culture message");
218       try {
219          con.publish(new MsgUnit("<key oid='MSG' contentMime='text/xml'/>", "<news type='culture'></news>".getBytes(), null));
220       } catch(XmlBlasterException e) {
221          log.warning("XmlBlasterException: " + e.getMessage());
222          assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
223       }
224       waitOnUpdate(subscribeOid2,4000L, 1);
225 
226       log.info("TEST 3: Testing AnotherMsG message");
227       try {
228          con.publish(new MsgUnit("<key oid='AnotherMsG' contentMime='text/xml'/>", "<news type='culture'></news>".getBytes(), null));
229       } catch(XmlBlasterException e) {
230          log.warning("XmlBlasterException: " + e.getMessage());
231          assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
232       }
233       waitOnUpdate(subscribeOid3,4000L, 1);
234       
235       log.info("TEST 4: Testing extention functions");
236       try {
237          con.publish(new MsgUnit("<key oid='AnotherMsG' contentMime='text/xml'/>", "<news><body><p>A little message</p><p>With a Needle in second paragraph wich normal XPath string function would not see</p></body></news>".getBytes(), null));
238       } catch(XmlBlasterException e) {
239          log.warning("XmlBlasterException: " + e.getMessage());
240          assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
241       }
242       waitOnUpdate(subscribeOid4,4000L, 1);
243       /* See TestSubscribeFilter.java for this test
244       log.info("TEST 4: Test what happens if the plugin throws an exception");
245       try {   
246          con.publish(new MsgUnit("<key oid='MSG'/>", "<broken><xml></broken>".getBytes(), null));
247          waitOnUpdate(subscribeOid,4000L, 1); // a dead message should come if we would subscribe on it
248       } catch(XmlBlasterException e) {
249          fail("publish forced the plugin to throw an XmlBlasterException, but it should not reach the publisher: " + e.toString());
250       }
251       */
252       
253       log.info("Success in testFilter()");
254    }
255    
256    /**
257     * This is the callback method invoked from xmlBlaster
258     * delivering us a new asynchronous message. 
259     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
260     */
261    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
262    {
263       log.info("Receiving update of a message " + updateKey.getOid() + " for subId: " + updateQos.getSubscriptionId() + " content=" + new String(content));
264       int ii = ((Integer)subscriberTable.get(updateQos.getSubscriptionId())).intValue();
265       log.fine("Got message " + new String(content));
266       subRec[ii]++;
267       return "";
268    }
269    
270    /**
271     * Little helper, waits until the wanted number of messages are arrived
272     * or returns when the given timeout occurs.
273     * <p />
274     * @param timeout in milliseconds
275     * @param numWait how many messages to wait
276     */
277    private void waitOnUpdate(String subId,final long timeout, final int numWait)
278    {
279       long pollingInterval = 50L;  // check every 0.05 seconds
280       if (timeout < 50)  pollingInterval = timeout / 10L;
281       long sum = 0L;
282       int ii = ((Integer)subscriberTable.get(subId)).intValue();
283       // check if too few are arriving
284       while (subRec[ii] < numWait) {
285          try { Thread.sleep(pollingInterval); } catch( InterruptedException i) {}
286          sum += pollingInterval;
287          assertTrue("Timeout of " + timeout + " occurred without update", sum <= timeout);
288       }
289 
290       // check if too many are arriving
291       try { Thread.sleep(timeout); } catch( InterruptedException i) {}
292       assertEquals("Wrong number of messages arrived", numWait, subRec[ii]);
293       log.info("Found correct rec messages for: " + subId);
294       subRec[ii]= 0;
295    }
296 
297    /**
298     * Method is used by TestRunner to load these tests
299     */
300    public static Test suite()
301    {
302        TestSuite suite= new TestSuite();
303        suite.addTest(new TestXPathSubscribeFilter(new Global(), "testFilter", "TestXPathSubscribeFilter"));
304        return suite;
305    }
306 
307    /**
308     * Invoke: 
309     * <pre>
310     *   java org.xmlBlaster.test.mime.TestXPathSubscribeFilter
311     *   java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.mime.TestXPathSubscribeFilter
312     * <pre>
313     */
314    public static void main(String args[])
315    {
316       Global glob = new Global();
317       if (glob.init(args) != 0) {
318          System.err.println(ME + ": Init failed");
319          System.exit(1);
320       }
321       TestXPathSubscribeFilter testSub = new TestXPathSubscribeFilter(glob, "TestXPathSubscribeFilter", "TestXPathSubscribeFilter");
322       testSub.setUp();
323       testSub.testFilter();
324       testSub.tearDown();
325    }
326 }


syntax highlighted by Code2HTML, v. 0.9.1