1 /*------------------------------------------------------------------------------
2 Name: TestXmlBlasterAccessMultiThreaded.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.client;
7
8 import java.util.logging.Logger;
9 import java.util.logging.Level;
10 import org.xmlBlaster.util.Global;
11 import org.xmlBlaster.client.qos.ConnectQos;
12 import org.xmlBlaster.util.XmlBlasterException;
13 import org.xmlBlaster.util.def.ErrorCode;
14 import org.xmlBlaster.util.def.Constants;
15 import org.xmlBlaster.util.property.PropString;
16 import org.xmlBlaster.util.EmbeddedXmlBlaster;
17 import org.xmlBlaster.client.qos.PublishQos;
18 import org.xmlBlaster.client.I_ConnectionStateListener;
19 import org.xmlBlaster.client.key.UpdateKey;
20 import org.xmlBlaster.client.qos.UpdateQos;
21 import org.xmlBlaster.client.qos.SubscribeReturnQos;
22 import org.xmlBlaster.client.qos.EraseReturnQos;
23 import org.xmlBlaster.client.I_XmlBlasterAccess;
24 import org.xmlBlaster.util.qos.address.Address;
25 import org.xmlBlaster.util.MsgUnit;
26 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
27
28 import org.xmlBlaster.test.Util;
29 import org.xmlBlaster.test.MsgInterceptor;
30 import org.xmlBlaster.test.Msg;
31 import junit.framework.*;
32
33 import java.util.Vector;
34 import java.util.ArrayList;
35 import java.util.HashSet;
36
37 /**
38 * Tests the thread safety of the I_XmlBlasterAccess client helper class
39 * <p>
40 * Invoke examples:<br />
41 * <pre>
42 * java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestXmlBlasterAccessMultiThreaded
43 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestXmlBlasterAccessMultiThreaded
44 * </pre>
45 * @see org.xmlBlaster.client.I_XmlBlasterAccess
46 */
47 public class TestXmlBlasterAccessMultiThreaded extends TestCase implements I_ConnectionStateListener
48 {
49 private static String ME = "TestXmlBlasterAccessMultiThreaded";
50 private Global glob;
51 private static Logger log = Logger.getLogger(TestXmlBlasterAccessMultiThreaded.class.getName());
52
53 private int serverPort = 7404;
54 private EmbeddedXmlBlaster serverThread;
55
56 private MsgInterceptor updateInterceptor;
57 private I_XmlBlasterAccess con;
58 private String senderName;
59
60 private int numPublish = 20;
61 private int numStop = 3;
62 private int numStart = 5;
63
64 private int iThread = 0;
65 private final String contentMime = "text/plain";
66
67 private final long reconnectDelay = 2000L;
68
69 public TestXmlBlasterAccessMultiThreaded(String testName) {
70 this(null, testName);
71 }
72
73 public TestXmlBlasterAccessMultiThreaded(Global glob, String testName) {
74 super(testName);
75 this.glob = glob;
76 this.senderName = testName;
77 }
78
79 /**
80 * Sets up the fixture.
81 * <p />
82 * Connect to xmlBlaster and login
83 */
84 protected void setUp() {
85 this.glob = (this.glob == null) ? new Global() : this.glob;
86
87
88 glob.init(Util.getOtherServerPorts(serverPort));
89
90 serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
91 log.info("XmlBlaster is ready for testing on bootstrapPort " + serverPort);
92 try {
93 con = glob.getXmlBlasterAccess(); // Find orb
94
95 String passwd = "secret";
96 ConnectQos connectQos = new ConnectQos(glob, senderName, passwd); // == "<qos>...</qos>";
97
98 // Setup fail save handling ...
99 Address addressProp = new Address(glob);
100 addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
101 addressProp.setRetries(-1); // -1 == forever
102 addressProp.setPingInterval(-1L); // switched off
103 con.registerConnectionListener(this);
104
105 connectQos.setAddress(addressProp);
106
107 this.updateInterceptor = new MsgInterceptor(this.glob, log, null); // Collect received msgs
108
109 con.connect(connectQos, this.updateInterceptor); // Login to xmlBlaster, register for updates
110 }
111 catch (XmlBlasterException e) {
112 log.warning("setUp() - login failed: " + e.getMessage());
113 fail("setUp() - login fail: " + e.getMessage());
114 }
115 catch (Exception e) {
116 log.severe("setUp() - login failed: " + e.toString());
117 e.printStackTrace();
118 fail("setUp() - login fail: " + e.toString());
119 }
120 }
121
122 /**
123 * Tears down the fixture.
124 * <p />
125 * cleaning up .... erase() the previous message OID and logout
126 */
127 protected void tearDown() {
128 log.info("Entering tearDown(), test is finished");
129 String xmlKey = "<key oid='' queryType='XPATH'>\n" +
130 " //TestXmlBlasterAccessMultiThreaded-AGENT" +
131 "</key>";
132 String qos = "<qos></qos>";
133 try {
134 EraseReturnQos[] arr = con.erase(xmlKey, qos);
135 }
136 catch(XmlBlasterException e) {
137 log.severe("XmlBlasterException: " + e.getMessage());
138 }
139 finally {
140 con.disconnect(null);
141
142 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
143 this.serverThread = null;
144
145 // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
146 Util.resetPorts(glob);
147
148 this.glob = null;
149 this.con = null;
150 Global.instance().shutdown();
151 }
152 }
153
154 /**
155 * TEST: Subscribe to messages with XPATH.
156 */
157 public void doSubscribe() {
158 if (log.isLoggable(Level.FINE)) log.fine("Subscribing using EXACT oid syntax ...");
159
160 String xmlKey = "<key oid='' queryType='XPATH'>\n" +
161 " //TestXmlBlasterAccessMultiThreaded-AGENT" +
162 "</key>";
163 String qos = "<qos><notify>false</notify></qos>"; // send no erase events
164 try {
165 SubscribeReturnQos subscriptionId = con.subscribe(xmlKey, qos);
166 log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done");
167 assertTrue("returned null subscriptionId", subscriptionId != null);
168 } catch(XmlBlasterException e) {
169 log.warning("XmlBlasterException: " + e.getMessage());
170 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
171 }
172 }
173
174 /**
175 * TEST: Construct a message and publish it.
176 * <p />
177 */
178 public MsgUnit doPublish(String oid, String content) throws XmlBlasterException {
179 log.info("Publishing a message " + oid + " ...");
180 String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'>\n" +
181 " <TestXmlBlasterAccessMultiThreaded-AGENT id='192.168.124.10' subId='1' type='generic'>" +
182 " </TestXmlBlasterAccessMultiThreaded-AGENT>" +
183 "</key>";
184 PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>"
185 MsgUnit msgUnit = new MsgUnit(xmlKey, content.getBytes(), qosWrapper.toXml());
186
187 con.publish(msgUnit);
188 log.info("Success: Publishing of " + oid + " content='" + content + "' done");
189 return msgUnit;
190 }
191
192
193 /**
194 * TEST: <br />
195 */
196 public void testPublishThreads()
197 {
198 //doSubscribe(); -> see reachedAlive()
199 ME = "TestXmlBlasterAccessMultiThreaded.testPublishThreads()";
200 final String oid = "TestXmlBlasterAccessMultiThreaded";
201 int numThreads = 5;
202 log.info("Going to publish " + numPublish + " messages with each of " + numThreads + " threads");
203 final Vector sentMsgVec = new Vector(numPublish*numThreads);
204 PublishThread[] publishThreads = new PublishThread[numThreads];
205 for (iThread=0; iThread<numThreads; iThread++) {
206 publishThreads[iThread] = new PublishThread(""+iThread, numPublish, oid);
207 publishThreads[iThread].start();
208 }
209
210 log.info("Trying join ...");
211
212 for (int kk=0; kk<numThreads; kk++) {
213 try {
214 publishThreads[kk].join();
215 }
216 catch (InterruptedException ie) {
217 log.warning("Caught join() exception: " + ie.toString());
218 }
219 }
220
221 log.info("Threads are joined");
222
223 // Now check everything:
224
225 this.updateInterceptor.waitOnUpdate(3000L, numPublish * numThreads);
226 Msg[] msgs = this.updateInterceptor.getMsgs(oid, Constants.STATE_OK);
227 //msg.compareMsg(sentMsgVec.elementAt[i]);
228 try { Thread.sleep(3000L); } catch( InterruptedException i) {}
229 assertEquals("Too many messages arrived", numPublish * numThreads, this.updateInterceptor.count());
230
231 HashSet set = new HashSet();
232 for (int ii=0; ii<msgs.length; ii++) {
233 assertEquals("Duplicate messages!!! '" + msgs[ii].getContentStr(), true, set.add(msgs[ii].getContentStr()));
234 }
235
236 int[] lastMsg = new int[numThreads];
237 for (int iThread=0; iThread<numThreads; iThread++) lastMsg[iThread] = -1;
238
239 for (int ii=0; ii<msgs.length; ii++) {
240 String content = msgs[ii].getContentStr();
241 int sepPos = content.indexOf(":");
242 int iThread = Integer.parseInt(content.substring(0, sepPos));
243 int iMsg = Integer.parseInt(content.substring(sepPos+1));
244 if (iMsg <= lastMsg[iThread]) {
245 fail("Messages are not in ascending order, last=" + lastMsg + " curr=" + iMsg);
246 }
247 lastMsg[iThread] = iMsg;
248 }
249
250 log.info("SUCCESS, all check are OK.");
251 }
252
253 /**
254 * Helper class for publisher threads
255 */
256 class PublishThread extends Thread {
257 private final ArrayList sentMsgList;
258 private final int numPublish;
259 private final String oid;
260 /** @param name Is thread index from 0 to numThreads-1 */
261 public PublishThread(String name, int numPublish, String oid) {
262 super(name);
263 this.numPublish = numPublish;
264 this.oid = oid;
265 this.sentMsgList = new ArrayList(numPublish);
266 }
267 public void run() {
268 log.info("Started thread " + iThread + ": " + Thread.currentThread().getName());
269 for (int ii=0; ii<numPublish; ii++) {
270 try {
271 MsgUnit msgUnit = doPublish(oid, Thread.currentThread().getName() + ":" + (ii+1));
272 sentMsgList.add(msgUnit);
273 }
274 catch (XmlBlasterException e) {
275 log.severe("Fail: " + e.getMessage());
276 fail(ME+": "+e.getMessage());
277 }
278 }
279 log.info(Thread.currentThread().getName() + ": Published " + numPublish + " messages");
280 }
281 public final ArrayList getSentMsgList() {
282 return this.sentMsgList;
283 }
284 };
285
286 /**
287 * This is the callback method invoked from I_XmlBlasterAccess
288 * informing the client in an asynchronous mode if the connection was established.
289 * <p />
290 * This method is enforced through interface I_ConnectionStateListener
291 */
292 public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
293 log.info("I_ConnectionStateListener: We were lucky, reconnected to xmlBlaster");
294 doSubscribe(); // initialize on startup and on reconnect
295 }
296
297 public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
298 }
299
300 public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
301 log.warning("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING);
302 }
303
304 public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
305 log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
306 }
307
308 /**
309 * Invoke: java org.xmlBlaster.test.client.TestXmlBlasterAccessMultiThreaded
310 * <p />
311 * @deprecated Use the TestRunner from the testsuite to run it:<p />
312 * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestXmlBlasterAccessMultiThreaded</pre>
313 */
314 public static void main(String args[])
315 {
316 Global glob = new Global();
317 if (glob.init(args) != 0) {
318 System.out.println(ME + ": Init failed");
319 System.exit(1);
320 }
321 TestXmlBlasterAccessMultiThreaded testSub = new TestXmlBlasterAccessMultiThreaded(glob, "TestXmlBlasterAccessMultiThreaded");
322 testSub.setUp();
323 testSub.testPublishThreads();
324 testSub.tearDown();
325 }
326 }
syntax highlighted by Code2HTML, v. 0.9.1