1 /*------------------------------------------------------------------------------
  2 Name:      TestXmlBlasterAccessMultiThreaded.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.client;
  7 
  8 import java.util.logging.Logger;
  9 import java.util.logging.Level;
 10 import org.xmlBlaster.util.Global;
 11 import org.xmlBlaster.client.qos.ConnectQos;
 12 import org.xmlBlaster.util.XmlBlasterException;
 13 import org.xmlBlaster.util.def.ErrorCode;
 14 import org.xmlBlaster.util.def.Constants;
 15 import org.xmlBlaster.util.property.PropString;
 16 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 17 import org.xmlBlaster.client.qos.PublishQos;
 18 import org.xmlBlaster.client.I_ConnectionStateListener;
 19 import org.xmlBlaster.client.key.UpdateKey;
 20 import org.xmlBlaster.client.qos.UpdateQos;
 21 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 22 import org.xmlBlaster.client.qos.EraseReturnQos;
 23 import org.xmlBlaster.client.I_XmlBlasterAccess;
 24 import org.xmlBlaster.util.qos.address.Address;
 25 import org.xmlBlaster.util.MsgUnit;
 26 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 27 
 28 import org.xmlBlaster.test.Util;
 29 import org.xmlBlaster.test.MsgInterceptor;
 30 import org.xmlBlaster.test.Msg;
 31 import junit.framework.*;
 32 
 33 import java.util.Vector;
 34 import java.util.ArrayList;
 35 import java.util.HashSet;
 36 
 37 /**
 38  * Tests the thread safety of the I_XmlBlasterAccess client helper class
 39  * <p>
 40  * Invoke examples:<br />
 41  * <pre>
 42  *   java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestXmlBlasterAccessMultiThreaded
 43  *   java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestXmlBlasterAccessMultiThreaded
 44  * </pre>
 45  * @see org.xmlBlaster.client.I_XmlBlasterAccess
 46  */
 47 public class TestXmlBlasterAccessMultiThreaded extends TestCase implements I_ConnectionStateListener
 48 {
 49    private static String ME = "TestXmlBlasterAccessMultiThreaded";
 50    private Global glob;
 51    private static Logger log = Logger.getLogger(TestXmlBlasterAccessMultiThreaded.class.getName());
 52 
 53    private int serverPort = 7404;
 54    private EmbeddedXmlBlaster serverThread;
 55 
 56    private MsgInterceptor updateInterceptor;
 57    private I_XmlBlasterAccess con;
 58    private String senderName;
 59 
 60    private int numPublish = 20;
 61    private int numStop = 3;
 62    private int numStart = 5;
 63 
 64    private int iThread = 0;
 65    private final String contentMime = "text/plain";
 66 
 67    private final long reconnectDelay = 2000L;
 68 
 69    public TestXmlBlasterAccessMultiThreaded(String testName) {
 70       this(null, testName);
 71    }
 72 
 73    public TestXmlBlasterAccessMultiThreaded(Global glob, String testName) {
 74       super(testName);
 75       this.glob = glob;
 76       this.senderName = testName;
 77    }
 78 
 79    /**
 80     * Sets up the fixture.
 81     * <p />
 82     * Connect to xmlBlaster and login
 83     */
 84    protected void setUp() {
 85       this.glob = (this.glob == null) ? new Global() : this.glob;
 86 
 87 
 88       glob.init(Util.getOtherServerPorts(serverPort));
 89 
 90       serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
 91       log.info("XmlBlaster is ready for testing on bootstrapPort " + serverPort);
 92       try {
 93          con = glob.getXmlBlasterAccess(); // Find orb
 94 
 95          String passwd = "secret";
 96          ConnectQos connectQos = new ConnectQos(glob, senderName, passwd); // == "<qos>...</qos>";
 97 
 98          // Setup fail save handling ...
 99          Address addressProp = new Address(glob);
100          addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
101          addressProp.setRetries(-1);       // -1 == forever
102          addressProp.setPingInterval(-1L); // switched off
103          con.registerConnectionListener(this);
104 
105          connectQos.setAddress(addressProp);
106 
107          this.updateInterceptor = new MsgInterceptor(this.glob, log, null); // Collect received msgs
108 
109          con.connect(connectQos, this.updateInterceptor);  // Login to xmlBlaster, register for updates
110       }
111       catch (XmlBlasterException e) {
112           log.warning("setUp() - login failed: " + e.getMessage());
113           fail("setUp() - login fail: " + e.getMessage());
114       }
115       catch (Exception e) {
116           log.severe("setUp() - login failed: " + e.toString());
117           e.printStackTrace();
118           fail("setUp() - login fail: " + e.toString());
119       }
120    }
121 
122    /**
123     * Tears down the fixture.
124     * <p />
125     * cleaning up .... erase() the previous message OID and logout
126     */
127    protected void tearDown() {
128       log.info("Entering tearDown(), test is finished");
129       String xmlKey = "<key oid='' queryType='XPATH'>\n" +
130                       "   //TestXmlBlasterAccessMultiThreaded-AGENT" +
131                       "</key>";
132       String qos = "<qos></qos>";
133       try {
134          EraseReturnQos[] arr = con.erase(xmlKey, qos);
135       }
136       catch(XmlBlasterException e) {
137          log.severe("XmlBlasterException: " + e.getMessage());
138       }
139       finally {
140          con.disconnect(null);
141 
142          EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
143          this.serverThread = null;
144 
145          // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
146          Util.resetPorts(glob);
147 
148          this.glob = null;
149          this.con = null;
150          Global.instance().shutdown();
151       }
152    }
153 
154    /**
155     * TEST: Subscribe to messages with XPATH.
156     */
157    public void doSubscribe() {
158       if (log.isLoggable(Level.FINE)) log.fine("Subscribing using EXACT oid syntax ...");
159 
160       String xmlKey = "<key oid='' queryType='XPATH'>\n" +
161                       "   //TestXmlBlasterAccessMultiThreaded-AGENT" +
162                       "</key>";
163       String qos = "<qos><notify>false</notify></qos>"; // send no erase events
164       try {
165          SubscribeReturnQos subscriptionId = con.subscribe(xmlKey, qos);
166          log.info("Success: Subscribe on subscriptionId=" + subscriptionId.getSubscriptionId() + " done");
167          assertTrue("returned null subscriptionId", subscriptionId != null);
168       } catch(XmlBlasterException e) {
169          log.warning("XmlBlasterException: " + e.getMessage());
170          assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
171       }
172    }
173 
174    /**
175     * TEST: Construct a message and publish it.
176     * <p />
177     */
178    public MsgUnit doPublish(String oid, String content) throws XmlBlasterException {
179       log.info("Publishing a message " + oid + " ...");
180       String xmlKey = "<key oid='" + oid + "' contentMime='" + contentMime + "'>\n" +
181                       "   <TestXmlBlasterAccessMultiThreaded-AGENT id='192.168.124.10' subId='1' type='generic'>" +
182                       "   </TestXmlBlasterAccessMultiThreaded-AGENT>" +
183                       "</key>";
184       PublishQos qosWrapper = new PublishQos(glob); // == "<qos></qos>"
185       MsgUnit msgUnit = new MsgUnit(xmlKey, content.getBytes(), qosWrapper.toXml());
186 
187       con.publish(msgUnit);
188       log.info("Success: Publishing of " + oid + " content='" + content + "' done");
189       return msgUnit;
190    }
191 
192 
193    /**
194     * TEST: <br />
195     */
196    public void testPublishThreads()
197    {
198       //doSubscribe(); -> see reachedAlive()
199       ME = "TestXmlBlasterAccessMultiThreaded.testPublishThreads()";
200       final String oid = "TestXmlBlasterAccessMultiThreaded";
201       int numThreads = 5;
202       log.info("Going to publish " + numPublish + " messages with each of " + numThreads + " threads");
203       final Vector sentMsgVec = new Vector(numPublish*numThreads);
204       PublishThread[] publishThreads = new PublishThread[numThreads];
205       for (iThread=0; iThread<numThreads; iThread++) {
206          publishThreads[iThread] = new PublishThread(""+iThread, numPublish, oid);
207          publishThreads[iThread].start();
208       }
209 
210       log.info("Trying join ...");
211 
212       for (int kk=0; kk<numThreads; kk++) {
213          try {
214             publishThreads[kk].join();
215          }
216          catch (InterruptedException ie) {
217             log.warning("Caught join() exception: " + ie.toString());
218          }
219       }
220 
221       log.info("Threads are joined");
222 
223       // Now check everything:
224 
225       this.updateInterceptor.waitOnUpdate(3000L, numPublish * numThreads);
226       Msg[] msgs = this.updateInterceptor.getMsgs(oid, Constants.STATE_OK);
227       //msg.compareMsg(sentMsgVec.elementAt[i]);
228       try { Thread.sleep(3000L); } catch( InterruptedException i) {}
229       assertEquals("Too many messages arrived", numPublish * numThreads, this.updateInterceptor.count());
230 
231       HashSet set = new HashSet();
232       for (int ii=0; ii<msgs.length; ii++) {
233          assertEquals("Duplicate messages!!! '" + msgs[ii].getContentStr(), true, set.add(msgs[ii].getContentStr()));
234       }
235 
236       int[] lastMsg = new int[numThreads];
237       for (int iThread=0; iThread<numThreads; iThread++) lastMsg[iThread] = -1;
238 
239       for (int ii=0; ii<msgs.length; ii++) {
240          String content = msgs[ii].getContentStr();
241          int sepPos = content.indexOf(":");
242          int iThread = Integer.parseInt(content.substring(0, sepPos));
243          int iMsg = Integer.parseInt(content.substring(sepPos+1));
244          if (iMsg <= lastMsg[iThread]) {
245             fail("Messages are not in ascending order, last=" + lastMsg + " curr=" + iMsg);
246          }
247          lastMsg[iThread] = iMsg;
248       }
249       
250       log.info("SUCCESS, all check are OK.");
251    }
252 
253    /**
254     * Helper class for publisher threads
255     */
256    class PublishThread extends Thread {
257       private final ArrayList sentMsgList;
258       private final int numPublish;
259       private final String oid;
260       /** @param name Is thread index from 0 to numThreads-1 */
261       public PublishThread(String name, int numPublish, String oid) {
262          super(name);
263          this.numPublish = numPublish;
264          this.oid = oid;
265          this.sentMsgList = new ArrayList(numPublish);
266       }
267       public void run() {
268          log.info("Started thread " + iThread + ": " + Thread.currentThread().getName());
269          for (int ii=0; ii<numPublish; ii++) {
270             try {
271                MsgUnit msgUnit = doPublish(oid, Thread.currentThread().getName() + ":" + (ii+1));
272                sentMsgList.add(msgUnit);
273             }
274             catch (XmlBlasterException e) {
275                log.severe("Fail: " + e.getMessage());
276                fail(ME+": "+e.getMessage());
277             }
278          }
279          log.info(Thread.currentThread().getName() + ": Published " + numPublish + " messages");
280       }
281       public final ArrayList getSentMsgList() {
282          return this.sentMsgList;
283       }
284    };
285 
286    /**
287     * This is the callback method invoked from I_XmlBlasterAccess
288     * informing the client in an asynchronous mode if the connection was established.
289     * <p />
290     * This method is enforced through interface I_ConnectionStateListener
291     */
292    public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
293       log.info("I_ConnectionStateListener: We were lucky, reconnected to xmlBlaster");
294       doSubscribe();    // initialize on startup and on reconnect
295    }
296 
297    public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
298    }
299 
300    public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
301       log.warning("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING);
302    }
303 
304    public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
305       log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
306    }
307 
308    /**
309     * Invoke: java org.xmlBlaster.test.client.TestXmlBlasterAccessMultiThreaded
310     * <p />
311     * @deprecated Use the TestRunner from the testsuite to run it:<p />
312     * <pre>   java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestXmlBlasterAccessMultiThreaded</pre>
313     */
314    public static void main(String args[])
315    {
316       Global glob = new Global();
317       if (glob.init(args) != 0) {
318          System.out.println(ME + ": Init failed");
319          System.exit(1);
320       }
321       TestXmlBlasterAccessMultiThreaded testSub = new TestXmlBlasterAccessMultiThreaded(glob, "TestXmlBlasterAccessMultiThreaded");
322       testSub.setUp();
323       testSub.testPublishThreads();
324       testSub.tearDown();
325    }
326 }


syntax highlighted by Code2HTML, v. 0.9.1