1 /*
  2  * Copyright (c) 2003 Peter Antman, Teknik i Media  <peter.antman@tim.se>
  3  *
  4  * $Id: TestLocalProtocol.java 16173 2007-05-22 12:44:14Z ruff $
  5  *
  6  * This library is free software; you can redistribute it and/or
  7  * modify it under the terms of the GNU Lesser General Public
  8  * License as published by the Free Software Foundation; either
  9  * version 2 of the License, or (at your option) any later version
 10  * 
 11  * This library is distributed in the hope that it will be useful,
 12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 14  * Lesser General Public License for more details.
 15  * 
 16  * You should have received a copy of the GNU Lesser General Public
 17  * License along with this library; if not, write to the Free Software
 18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 19  */
 20 package org.xmlBlaster.test.client;
 21 import java.util.HashMap;
 22 import java.util.logging.Logger;
 23 
 24 import junit.framework.Test;
 25 import junit.framework.TestCase;
 26 import junit.framework.TestSuite;
 27 
 28 import org.xmlBlaster.client.I_Callback;
 29 import org.xmlBlaster.client.I_XmlBlasterAccess;
 30 import org.xmlBlaster.client.key.UpdateKey;
 31 import org.xmlBlaster.client.qos.ConnectQos;
 32 import org.xmlBlaster.client.qos.EraseReturnQos;
 33 import org.xmlBlaster.client.qos.UpdateQos;
 34 import org.xmlBlaster.j2ee.util.GlobalUtil;
 35 import org.xmlBlaster.test.Util;
 36 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 37 import org.xmlBlaster.util.Global;
 38 import org.xmlBlaster.util.MsgUnit;
 39 import org.xmlBlaster.util.XmlBlasterException;
 40 /**
 41  * Test that local (in vm) client protocol works.
 42  *
 43  * <p>We start an embedded server so that we have access to the engine.Global.</p>
 44  *
 45  * @author <a href="mailto:pra@tim.se">Peter Antman</a>
 46  * @version $Revision: 1.2 $
 47  */
 48 
 49 public class TestLocalProtocol extends TestCase implements I_Callback {
 50    private static String ME = "TestLocalProtocol";
 51    private final Global glob;
 52    private GlobalUtil globalUtil;
 53    private static Logger log = Logger.getLogger(TestLocalProtocol.class.getName());
 54 
 55    private I_XmlBlasterAccess con = null;
 56    private String name;
 57    private String passwd = "secret";
 58    private int numReceived = 0;         // error checking
 59    private EmbeddedXmlBlaster serverThread;
 60    private int serverPort = 7624;
 61 
 62    private HashMap subscriberTable = new HashMap();
 63    private int[] subRec = new int[2];
 64    String subscribeOid;
 65    String subscribeOid2;
 66 
 67    
 68    public TestLocalProtocol (){
 69       this(null, "TestLocalProtocol", "TestLocalProtocol");
 70    }
 71 
 72    public TestLocalProtocol (Global glob, String testName, String name){
 73       super(testName);
 74       this.glob = (glob == null) ? Global.instance() : glob;
 75 
 76       this.name = name;
 77 
 78    }
 79    /**
 80     * Sets up the fixture.
 81     * <p />
 82     * We start an own xmlBlaster server in a separate thread, configured with
 83     * the local drivers,
 84     * <p />
 85     * Then we connect as a client
 86     */
 87    protected void setUp()
 88    {
 89       String[] args = {
 90          "-bootstrapPort",        // For all protocol we may use set an alternate server port
 91          "" + serverPort,
 92          "-plugin/socket/port",
 93          "" + (serverPort-1),
 94          "-plugin/rmi/registryPort",
 95          "" + (serverPort-2),
 96          "-plugin/xmlrpc/port",
 97          "" + (serverPort-3),
 98          "-ClientProtocolPlugin[LOCAL][1.0]",
 99          "org.xmlBlaster.client.protocol.local.LocalConnection",
100          "-ClientCbServerProtocolPlugin[LOCAL][1.0]",
101          "org.xmlBlaster.client.protocol.local.LocalCallbackImpl",
102          "-CbProtocolPlugin[LOCAL][1.0]",
103          "org.xmlBlaster.protocol.local.CallbackLocalDriver",
104          "-protocol",
105          "LOCAL",
106          "-admin.remoteconsole.port",
107          "0"
108       };
109       /*
110 ,
111          "-logging",
112          "INFO"
113       */
114       glob.init(args);
115 
116       serverThread = EmbeddedXmlBlaster.startXmlBlaster(args);
117 
118       globalUtil = new GlobalUtil( serverThread.getMain().getGlobal() );
119       Global runglob = globalUtil.getClone( glob );
120 
121       log.info("XmlBlaster is ready for testing subscribe MIME filter");
122 
123       try {
124          log.info("Connecting ...");
125          con = runglob.getXmlBlasterAccess();
126          ConnectQos qos = new ConnectQos(runglob, name, passwd);
127          con.connect(qos, this); // Login to xmlBlaster
128       }
129       catch (Exception e) {
130          Thread.currentThread().dumpStack();
131          log.severe("Can't connect to xmlBlaster: " + e.toString());
132       }
133 
134       // Subscribe to a message with a supplied filter
135       try {
136          String xmlKey = "<key oid='' queryType='XPATH'>\n" +
137             "//TestLocalProtocol-AGENT" +
138             "</key>";
139          String qos = "<qos><notify>false</notify></qos>"; // send no erase events
140     
141          subscribeOid = con.subscribe(xmlKey, qos).getSubscriptionId() ;
142          log.info("Success: Subscribe on subscriptionId=" + subscribeOid + " done");
143          assertTrue("returned null subscriptionId", subscribeOid != null);
144          
145          subscriberTable.put(subscribeOid, new Integer(0));
146          
147          xmlKey = "<key oid='' queryType='XPATH'>\n" +
148             "//TestLocalProtocol-AGENT[@id='3']" +
149             "</key>";
150 
151          subscribeOid2 = con.subscribe(xmlKey, qos).getSubscriptionId() ;
152          log.info("Success: Subscribe on subscriptionId=" + subscribeOid2 + " done");
153          assertTrue("returned null subscriptionId", subscribeOid2 != null);
154          
155          subscriberTable.put(subscribeOid2, new Integer(1));
156 
157       } catch(XmlBlasterException e) {
158          log.warning("XmlBlasterException: " + e.getMessage());
159          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
160       }
161    }
162 
163       /**
164     * Tears down the fixture.
165     * <p />
166     * cleaning up .... erase() the previous message OID and logout
167     */
168    protected void tearDown()
169    {
170       log.info("TEST: tearing down");
171       try { Thread.sleep(200L); } catch( InterruptedException i) {}   // Wait 200 milli seconds, until all updates are processed ...
172       
173       try {
174          con.unSubscribe("<key oid='"+subscribeOid+"'/>",
175                          "<qos/>");
176          con.unSubscribe("<key oid='"+subscribeOid2+"'/>",
177                          "<qos/>");
178          EraseReturnQos[] arr = con.erase("<key oid='' queryType='XPATH'>\n" +
179                       "   /xmlBlaster/key/TestLocalProtocol-AGENT" +
180                       "</key>", "<qos/>");
181          assertEquals("Erase", 5, arr.length);
182       } catch(XmlBlasterException e) { fail("Erase XmlBlasterException: " + e.getMessage()); }
183       
184       con.disconnect(null);
185       con=null;
186       
187       try { Thread.sleep(500L); } catch( InterruptedException i) {}    // Wait some time
188       EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
189       this.serverThread = null;
190       
191       // reset to default server port (necessary if other tests follow in the same JVM).
192       Util.resetPorts();
193    }
194 
195    public void testPublish()
196    {
197       log.info("testPublish...");
198 
199       log.info("TEST 1");
200       try {
201          // Publish 5 messages
202          // 5 for first sub
203          // 1 for second sub
204          for ( int i = 0; i<5;i++) {
205             String c = "<content>"+i+"</content>";
206             String k = "<key oid='"+i+"' contentMime='text/xml'><TestLocalProtocol-AGENT id='"+i+"' type='generic'/></key>";
207             log.info("Key: " +k);
208             con.publish(new MsgUnit(k, c.getBytes(), null));
209          }
210       } catch(XmlBlasterException e) {
211          log.warning("XmlBlasterException: " + e.getMessage());
212          assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
213       }
214       waitOnUpdate(subscribeOid,10000L, 5);
215       waitOnUpdate(subscribeOid2,10000L, 1);
216 
217    }
218 
219    /**
220     * This is the callback method invoked from xmlBlaster
221     * delivering us a new asynchronous message. 
222     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
223     */
224    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
225    {
226       log.info("Receiving update of a message " + updateKey.getOid() + " for subId: " + updateQos.getSubscriptionId() );
227       int ii = ((Integer)subscriberTable.get(updateQos.getSubscriptionId())).intValue();
228       log.fine("Got message " + new String(content));
229       subRec[ii]++;
230       numReceived++;
231       return "";
232    }
233    
234    /**
235     * Little helper, waits until the wanted number of messages are arrived
236     * or returns when the given timeout occurs.
237     * <p />
238     * @param timeout in milliseconds
239     * @param numWait how many messages to wait
240     */
241    private void waitOnUpdate(String subId,final long timeout, final int numWait)
242    {
243       long pollingInterval = 50L;  // check every 0.05 seconds
244       if (timeout < 50)  pollingInterval = timeout / 10L;
245       long sum = 0L;
246       int ii = ((Integer)subscriberTable.get(subId)).intValue();
247       // check if too few are arriving
248       while (subRec[ii] < numWait) {
249          try { Thread.sleep(pollingInterval); } catch( InterruptedException i) {}
250          sum += pollingInterval;
251          assertTrue("Timeout of " + timeout + " occurred without update", sum <= timeout);
252       }
253 
254       // check if too many are arriving
255       try { Thread.sleep(timeout); } catch( InterruptedException i) {}
256       assertEquals("Wrong number of messages arrived", numWait, subRec[ii]);
257       log.info("Found correct rec messages for: " + subId);
258       subRec[ii]= 0;
259    }
260 
261    /**
262     * Method is used by TestRunner to load these tests
263     */
264    public static Test suite()
265    {
266        TestSuite suite= new TestSuite();
267        String loginName = "Tim";
268        suite.addTest(new TestLocalProtocol(new Global(), "testPublish", "Tim"));
269        return suite;
270    }
271 
272    /**
273     * Invoke: 
274     * <pre>
275     *   java org.xmlBlaster.test.mime.TestXPathSubscribeFilter
276     *   java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.mime.TestXPathSubscribeFilter
277     * <pre>
278     */
279    public static void main(String args[])
280    {
281       Global glob = new Global();
282       if (glob.init(args) != 0) {
283          System.err.println(ME + ": Init failed");
284          System.exit(1);
285       }
286       TestLocalProtocol testSub = new TestLocalProtocol(glob, "TestLocalProtocol", "Tim");
287       testSub.setUp();
288       try {
289          testSub.testPublish();
290       } catch (Throwable e) {
291          e.printStackTrace();
292       } // end of try-catch
293 
294       testSub.tearDown();
295    }
296 }// TestLocalProtocol


syntax highlighted by Code2HTML, v. 0.9.1