1 package org.xmlBlaster.test.cluster;
  2 
  3 import java.util.logging.Logger;
  4 import java.util.logging.Level;
  5 import org.xmlBlaster.util.Global;
  6 
  7 // for client connections:
  8 import org.xmlBlaster.util.*;
  9 import org.xmlBlaster.client.I_Callback;
 10 import org.xmlBlaster.client.key.PublishKey;
 11 import org.xmlBlaster.client.key.EraseKey;
 12 import org.xmlBlaster.client.key.SubscribeKey;
 13 import org.xmlBlaster.client.key.UnSubscribeKey;
 14 import org.xmlBlaster.client.key.UpdateKey;
 15 import org.xmlBlaster.client.qos.PublishQos;
 16 import org.xmlBlaster.client.qos.PublishReturnQos;
 17 import org.xmlBlaster.client.qos.UpdateQos;
 18 import org.xmlBlaster.client.qos.SubscribeQos;
 19 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 20 import org.xmlBlaster.client.qos.EraseQos;
 21 import org.xmlBlaster.client.qos.EraseReturnQos;
 22 import org.xmlBlaster.client.I_XmlBlasterAccess;
 23 import org.xmlBlaster.util.MsgUnit;
 24 import org.xmlBlaster.util.def.Constants;
 25 
 26 
 27 import java.util.Vector;
 28 import java.io.File;
 29 
 30 import junit.framework.*;
 31 
 32 /**
 33  * Test publishing a message from bilbo to heron. 
 34  * <p />
 35  * <pre>
 36  * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.cluster.SubscribeXPathTest
 37  * </pre>
 38  * NOTE: asserts() in update() methods are routed back to server and are not handled
 39  *       by the junit testsuite, so we check double (see code).
 40  * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">Cluster requirement</a>
 41  */
 42 public class SubscribeXPathTest extends TestCase {
 43    private String ME = "SubscribeXPathTest";
 44    private Global glob;
 45    private static Logger log = Logger.getLogger(SubscribeXPathTest.class.getName());
 46    private ServerHelper serverHelper;
 47 
 48    private I_XmlBlasterAccess heronCon, avalonCon, golanCon, frodoCon, bilboCon;
 49 
 50    private int updateCounterHeron = 0;
 51    private int updateCounterFrodo = 0;
 52    private int updateCounterBilbo = 0;
 53    private String oid = "SubscribeToBilbo";
 54    private String domain = "RUGBY_NEWS"; // heron is master for RUGBY_NEWS
 55    private String contentStr = "We win";
 56 
 57    private String assertInUpdate = null;
 58 
 59    public SubscribeXPathTest(String name) {
 60       super(name);
 61       this.glob = new Global(null, true, false);
 62    }
 63 
 64    /**
 65     * Initialize the test ...
 66     */
 67    protected void setUp() {
 68 
 69       log.info("Entering setUp(), test starts");
 70 
 71       updateCounterHeron = 0;
 72       updateCounterFrodo = 0;
 73       updateCounterBilbo = 0;
 74 
 75       serverHelper = new ServerHelper(glob, log, ME);
 76 
 77       // Starts a cluster node
 78       serverHelper.startHeron();
 79       serverHelper.startAvalon();
 80       //serverHelper.startGolan();
 81       serverHelper.startFrodo();
 82       serverHelper.startBilbo();
 83    }
 84 
 85    /**
 86     * cleaning up ...
 87     */
 88    protected void tearDown() {
 89       log.info("Entering tearDown(), test is finished");
 90       try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
 91 
 92       if (bilboCon != null) { bilboCon.disconnect(null); bilboCon = null; }
 93       if (frodoCon != null) { frodoCon.disconnect(null); frodoCon = null; }
 94       if (golanCon != null) { golanCon.disconnect(null); golanCon = null; }
 95       if (avalonCon != null) { avalonCon.disconnect(null); avalonCon = null; }
 96       if (heronCon != null) { heronCon.disconnect(null); heronCon = null; }
 97 
 98       serverHelper.tearDown();
 99    }
100 
101    /**
102     * We start all nodes as described in requirement
103     * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
104     * <p />
105     * - Subscribe to RUGBY messages from client at bilbo with XPath<br />
106     * - publish RUGBY messages to avalon (heron is the master)<br />
107     * - Does the client at bilbo receive them?
108     * <pre>
109     *   avalonClient -> avalon(slave) -> heron(master) -> bilbo(slave) -> bilboClient
110     * </pre>
111     */ 
112    public void testSubscribeXpath() {
113       System.err.println("***SubscribeXPathTest.testSubscribeXpath: Subscribe a message from a cluster slave ...");
114       try {
115          System.err.println("->Connect to avalon ...");
116          avalonCon = serverHelper.connect(serverHelper.getAvalonGlob(), null);
117 
118          System.err.println("->Connect to bilbo ...");
119          bilboCon = serverHelper.connect(serverHelper.getBilboGlob(), new I_Callback() {  // Login to xmlBlaster, register for updates
120                public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
121                   updateCounterBilbo++;
122                   log.info(
123                            "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo + " ...");
124                   assertEquals("Wrong message updated", oid, updateKey.getOid());
125                   return "";
126                }
127             });
128 
129          System.err.println("->Subscribe from bilbo ...");
130          SubscribeKey sk = new SubscribeKey(glob, "/xmlBlaster/key[@oid='SubscribeToBilbo']", Constants.XPATH);
131          sk.setDomain(domain);  // set domain to allow cluster forwarding of subscription
132          // without setting the domain the subscribe would just be handled by the slave connected to
133          SubscribeQos sq = new SubscribeQos(glob);
134          SubscribeReturnQos srq = bilboCon.subscribe(sk.toXml(), sq.toXml());
135 
136          System.err.println("->Publish to avalon ...");
137          PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
138          PublishQos avalon_pq = new PublishQos(glob);
139          MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
140          PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
141          assertEquals("oid changed", oid, avalon_prq.getKeyOid());
142 
143 
144          try { Thread.sleep(2000); } catch( InterruptedException i) {}
145          if (1 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
146          assertEquals("message from avalon", 1, updateCounterBilbo);
147          updateCounterBilbo = 0;
148 
149          System.err.println("->testSubscribeXpath done, SUCCESS.");
150 
151          System.err.println("->Trying to unSubscribe ...");
152          bilboCon.unSubscribe("<key oid='" + srq.getSubscriptionId() + "'/>", null);
153 
154          System.err.println("->Trying to erase the message at the slave node ...");
155          EraseKey ek = new EraseKey(glob, oid);
156          ek.setDomain(domain);
157          EraseQos eq = new EraseQos(glob);
158          EraseReturnQos[] arr = avalonCon.erase(ek.toXml(), eq.toXml());
159          assertEquals("Erase", 1, arr.length);
160       }
161       catch (XmlBlasterException e) {
162          e.printStackTrace();
163          fail("SubscribeToBilbo-Exception: " + e.toString());
164       }
165       finally {
166          if (bilboCon != null) {
167             bilboCon.disconnect(null);
168             bilboCon = null;
169          }   
170          if (avalonCon != null) {
171             avalonCon.disconnect(null);
172             avalonCon = null;
173          }
174       }
175 
176       System.err.println("***SubscribeXPathTest.testSubscribeXpath: testSubscribeXpath [SUCCESS]");
177    }
178 
179    /*
180    private void waitOnUpdate(final long timeout, final int numWait) {
181       long pollingInterval = 50L;  // check every 0.05 seconds
182       if (timeout < 50)  pollingInterval = timeout / 10L;
183       long sum = 0L;
184       while (updateCounterBilbo < numWait) {
185          try {
186             Thread.sleep(pollingInterval);
187          }
188          catch( InterruptedException i)
189          {}
190          sum += pollingInterval;
191          if (sum > timeout) {
192             log.warning("Timeout of " + timeout + " occurred");
193             break;
194          }
195       }
196    }
197    */
198 
199    /**
200     * setUp() and tearDown() are ivoked between each test...() method
201     */
202     /*
203    public void testDummy() {
204       System.err.println("***SubscribeXPathTest: testDummy [SUCCESS]");
205    }
206      */
207 }


syntax highlighted by Code2HTML, v. 0.9.1