1 /*
2 * Copyright (c) 2003 Peter Antman, Teknik i Media <peter.antman@tim.se>
3 *
4 * $Id: TestLocalProtocol.java 16173 2007-05-22 12:44:14Z ruff $
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20 package org.xmlBlaster.test.client;
21 import java.util.HashMap;
22 import java.util.logging.Logger;
23
24 import junit.framework.Test;
25 import junit.framework.TestCase;
26 import junit.framework.TestSuite;
27
28 import org.xmlBlaster.client.I_Callback;
29 import org.xmlBlaster.client.I_XmlBlasterAccess;
30 import org.xmlBlaster.client.key.UpdateKey;
31 import org.xmlBlaster.client.qos.ConnectQos;
32 import org.xmlBlaster.client.qos.EraseReturnQos;
33 import org.xmlBlaster.client.qos.UpdateQos;
34 import org.xmlBlaster.j2ee.util.GlobalUtil;
35 import org.xmlBlaster.test.Util;
36 import org.xmlBlaster.util.EmbeddedXmlBlaster;
37 import org.xmlBlaster.util.Global;
38 import org.xmlBlaster.util.MsgUnit;
39 import org.xmlBlaster.util.XmlBlasterException;
40 /**
41 * Test that local (in vm) client protocol works.
42 *
43 * <p>We start an embedded server so that we have access to the engine.Global.</p>
44 *
45 * @author <a href="mailto:pra@tim.se">Peter Antman</a>
46 * @version $Revision: 1.2 $
47 */
48
49 public class TestLocalProtocol extends TestCase implements I_Callback {
50 private static String ME = "TestLocalProtocol";
51 private final Global glob;
52 private GlobalUtil globalUtil;
53 private static Logger log = Logger.getLogger(TestLocalProtocol.class.getName());
54
55 private I_XmlBlasterAccess con = null;
56 private String name;
57 private String passwd = "secret";
58 private int numReceived = 0; // error checking
59 private EmbeddedXmlBlaster serverThread;
60 private int serverPort = 7624;
61
62 private HashMap subscriberTable = new HashMap();
63 private int[] subRec = new int[2];
64 String subscribeOid;
65 String subscribeOid2;
66
67
68 public TestLocalProtocol (){
69 this(null, "TestLocalProtocol", "TestLocalProtocol");
70 }
71
72 public TestLocalProtocol (Global glob, String testName, String name){
73 super(testName);
74 this.glob = (glob == null) ? Global.instance() : glob;
75
76 this.name = name;
77
78 }
79 /**
80 * Sets up the fixture.
81 * <p />
82 * We start an own xmlBlaster server in a separate thread, configured with
83 * the local drivers,
84 * <p />
85 * Then we connect as a client
86 */
87 protected void setUp()
88 {
89 String[] args = {
90 "-bootstrapPort", // For all protocol we may use set an alternate server port
91 "" + serverPort,
92 "-plugin/socket/port",
93 "" + (serverPort-1),
94 "-plugin/rmi/registryPort",
95 "" + (serverPort-2),
96 "-plugin/xmlrpc/port",
97 "" + (serverPort-3),
98 "-ClientProtocolPlugin[LOCAL][1.0]",
99 "org.xmlBlaster.client.protocol.local.LocalConnection",
100 "-ClientCbServerProtocolPlugin[LOCAL][1.0]",
101 "org.xmlBlaster.client.protocol.local.LocalCallbackImpl",
102 "-CbProtocolPlugin[LOCAL][1.0]",
103 "org.xmlBlaster.protocol.local.CallbackLocalDriver",
104 "-protocol",
105 "LOCAL",
106 "-admin.remoteconsole.port",
107 "0"
108 };
109 /*
110 ,
111 "-logging",
112 "INFO"
113 */
114 glob.init(args);
115
116 serverThread = EmbeddedXmlBlaster.startXmlBlaster(args);
117
118 globalUtil = new GlobalUtil( serverThread.getMain().getGlobal() );
119 Global runglob = globalUtil.getClone( glob );
120
121 log.info("XmlBlaster is ready for testing subscribe MIME filter");
122
123 try {
124 log.info("Connecting ...");
125 con = runglob.getXmlBlasterAccess();
126 ConnectQos qos = new ConnectQos(runglob, name, passwd);
127 con.connect(qos, this); // Login to xmlBlaster
128 }
129 catch (Exception e) {
130 Thread.currentThread().dumpStack();
131 log.severe("Can't connect to xmlBlaster: " + e.toString());
132 }
133
134 // Subscribe to a message with a supplied filter
135 try {
136 String xmlKey = "<key oid='' queryType='XPATH'>\n" +
137 "//TestLocalProtocol-AGENT" +
138 "</key>";
139 String qos = "<qos><notify>false</notify></qos>"; // send no erase events
140
141 subscribeOid = con.subscribe(xmlKey, qos).getSubscriptionId() ;
142 log.info("Success: Subscribe on subscriptionId=" + subscribeOid + " done");
143 assertTrue("returned null subscriptionId", subscribeOid != null);
144
145 subscriberTable.put(subscribeOid, new Integer(0));
146
147 xmlKey = "<key oid='' queryType='XPATH'>\n" +
148 "//TestLocalProtocol-AGENT[@id='3']" +
149 "</key>";
150
151 subscribeOid2 = con.subscribe(xmlKey, qos).getSubscriptionId() ;
152 log.info("Success: Subscribe on subscriptionId=" + subscribeOid2 + " done");
153 assertTrue("returned null subscriptionId", subscribeOid2 != null);
154
155 subscriberTable.put(subscribeOid2, new Integer(1));
156
157 } catch(XmlBlasterException e) {
158 log.warning("XmlBlasterException: " + e.getMessage());
159 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
160 }
161 }
162
163 /**
164 * Tears down the fixture.
165 * <p />
166 * cleaning up .... erase() the previous message OID and logout
167 */
168 protected void tearDown()
169 {
170 log.info("TEST: tearing down");
171 try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait 200 milli seconds, until all updates are processed ...
172
173 try {
174 con.unSubscribe("<key oid='"+subscribeOid+"'/>",
175 "<qos/>");
176 con.unSubscribe("<key oid='"+subscribeOid2+"'/>",
177 "<qos/>");
178 EraseReturnQos[] arr = con.erase("<key oid='' queryType='XPATH'>\n" +
179 " /xmlBlaster/key/TestLocalProtocol-AGENT" +
180 "</key>", "<qos/>");
181 assertEquals("Erase", 5, arr.length);
182 } catch(XmlBlasterException e) { fail("Erase XmlBlasterException: " + e.getMessage()); }
183
184 con.disconnect(null);
185 con=null;
186
187 try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
188 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
189 this.serverThread = null;
190
191 // reset to default server port (necessary if other tests follow in the same JVM).
192 Util.resetPorts();
193 }
194
195 public void testPublish()
196 {
197 log.info("testPublish...");
198
199 log.info("TEST 1");
200 try {
201 // Publish 5 messages
202 // 5 for first sub
203 // 1 for second sub
204 for ( int i = 0; i<5;i++) {
205 String c = "<content>"+i+"</content>";
206 String k = "<key oid='"+i+"' contentMime='text/xml'><TestLocalProtocol-AGENT id='"+i+"' type='generic'/></key>";
207 log.info("Key: " +k);
208 con.publish(new MsgUnit(k, c.getBytes(), null));
209 }
210 } catch(XmlBlasterException e) {
211 log.warning("XmlBlasterException: " + e.getMessage());
212 assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
213 }
214 waitOnUpdate(subscribeOid,10000L, 5);
215 waitOnUpdate(subscribeOid2,10000L, 1);
216
217 }
218
219 /**
220 * This is the callback method invoked from xmlBlaster
221 * delivering us a new asynchronous message.
222 * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
223 */
224 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
225 {
226 log.info("Receiving update of a message " + updateKey.getOid() + " for subId: " + updateQos.getSubscriptionId() );
227 int ii = ((Integer)subscriberTable.get(updateQos.getSubscriptionId())).intValue();
228 log.fine("Got message " + new String(content));
229 subRec[ii]++;
230 numReceived++;
231 return "";
232 }
233
234 /**
235 * Little helper, waits until the wanted number of messages are arrived
236 * or returns when the given timeout occurs.
237 * <p />
238 * @param timeout in milliseconds
239 * @param numWait how many messages to wait
240 */
241 private void waitOnUpdate(String subId,final long timeout, final int numWait)
242 {
243 long pollingInterval = 50L; // check every 0.05 seconds
244 if (timeout < 50) pollingInterval = timeout / 10L;
245 long sum = 0L;
246 int ii = ((Integer)subscriberTable.get(subId)).intValue();
247 // check if too few are arriving
248 while (subRec[ii] < numWait) {
249 try { Thread.sleep(pollingInterval); } catch( InterruptedException i) {}
250 sum += pollingInterval;
251 assertTrue("Timeout of " + timeout + " occurred without update", sum <= timeout);
252 }
253
254 // check if too many are arriving
255 try { Thread.sleep(timeout); } catch( InterruptedException i) {}
256 assertEquals("Wrong number of messages arrived", numWait, subRec[ii]);
257 log.info("Found correct rec messages for: " + subId);
258 subRec[ii]= 0;
259 }
260
261 /**
262 * Method is used by TestRunner to load these tests
263 */
264 public static Test suite()
265 {
266 TestSuite suite= new TestSuite();
267 String loginName = "Tim";
268 suite.addTest(new TestLocalProtocol(new Global(), "testPublish", "Tim"));
269 return suite;
270 }
271
272 /**
273 * Invoke:
274 * <pre>
275 * java org.xmlBlaster.test.mime.TestXPathSubscribeFilter
276 * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.mime.TestXPathSubscribeFilter
277 * <pre>
278 */
279 public static void main(String args[])
280 {
281 Global glob = new Global();
282 if (glob.init(args) != 0) {
283 System.err.println(ME + ": Init failed");
284 System.exit(1);
285 }
286 TestLocalProtocol testSub = new TestLocalProtocol(glob, "TestLocalProtocol", "Tim");
287 testSub.setUp();
288 try {
289 testSub.testPublish();
290 } catch (Throwable e) {
291 e.printStackTrace();
292 } // end of try-catch
293
294 testSub.tearDown();
295 }
296 }// TestLocalProtocol
syntax highlighted by Code2HTML, v. 0.9.1