1 package org.xmlBlaster.test.cluster;
  2 
  3 import java.util.logging.Logger;
  4 import org.xmlBlaster.util.Global;
  5 
  6 // for client connections:
  7 import org.xmlBlaster.util.*;
  8 import org.xmlBlaster.client.I_Callback;
  9 import org.xmlBlaster.client.key.PublishKey;
 10 import org.xmlBlaster.client.key.EraseKey;
 11 import org.xmlBlaster.client.key.SubscribeKey;
 12 import org.xmlBlaster.client.key.UnSubscribeKey;
 13 import org.xmlBlaster.client.key.UpdateKey;
 14 import org.xmlBlaster.client.qos.PublishQos;
 15 import org.xmlBlaster.client.qos.PublishReturnQos;
 16 import org.xmlBlaster.client.qos.UpdateQos;
 17 import org.xmlBlaster.client.qos.SubscribeQos;
 18 import org.xmlBlaster.client.qos.UnSubscribeQos;
 19 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
 20 import org.xmlBlaster.client.qos.EraseQos;
 21 import org.xmlBlaster.client.qos.EraseReturnQos;
 22 import org.xmlBlaster.client.I_XmlBlasterAccess;
 23 import org.xmlBlaster.util.MsgUnit;
 24 
 25 import junit.framework.*;
 26 
 27 /**
 28  * Test publishing a message from bilbo to heron. 
 29  * <p />
 30  * <pre>
 31  * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.cluster.SubscribeTest
 32  * </pre>
 33  * NOTE: asserts() in update() methods are routed back to server and are not handled
 34  *       by the junit testsuite, so we check double (see code).
 35  * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">Cluster requirement</a>
 36  */
 37 public class SubscribeTest extends TestCase {
 38    private String ME = "SubscribeTest";
 39    private Global glob;
 40    private static Logger log = Logger.getLogger(SubscribeTest.class.getName());
 41    private ServerHelper serverHelper;
 42 
 43    private I_XmlBlasterAccess heronCon, avalonCon, golanCon, frodoCon, bilboCon, bilboCon2;
 44 
 45    private int updateCounterBilbo = 0;
 46    private int updateCounterBilbo2 = 0;
 47    private String oid = "SubscribeToBilbo";
 48    private String domain = "RUGBY_NEWS"; // heron is master for RUGBY_NEWS
 49    private String contentStr = "We win";
 50 
 51    public SubscribeTest(String name) {
 52       super(name);
 53       this.glob = new Global(null, true, false);
 54    }
 55 
 56    /**
 57     * Initialize the test ...
 58     */
 59    protected void setUp() {
 60 
 61       log.info("Entering setUp(), test starts");
 62 
 63       updateCounterBilbo = 0;
 64       updateCounterBilbo2 = 0;
 65 
 66 
 67       serverHelper = new ServerHelper(glob, log, ME);
 68 
 69       // Starts a cluster node
 70       serverHelper.startHeron();
 71       serverHelper.startAvalon();
 72       //serverHelper.startGolan();
 73       serverHelper.startFrodo();
 74       serverHelper.startBilbo();
 75    }
 76 
 77    /**
 78     * cleaning up ...
 79     */
 80    protected void tearDown() {
 81       log.info("Entering tearDown(), test is finished");
 82       try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
 83 
 84       if (bilboCon != null) { bilboCon.disconnect(null); bilboCon = null; }
 85       if (bilboCon2 != null) { bilboCon2.disconnect(null); bilboCon2 = null; }
 86       if (frodoCon != null) { frodoCon.disconnect(null); frodoCon = null; }
 87       if (golanCon != null) { golanCon.disconnect(null); golanCon = null; }
 88       if (avalonCon != null) { avalonCon.disconnect(null); avalonCon = null; }
 89       if (heronCon != null) { heronCon.disconnect(null); heronCon = null; }
 90 
 91       serverHelper.tearDown();
 92    }
 93 
 94    /**
 95     * We start all nodes as described in requirement
 96     * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
 97     * <p />
 98     * - Subscribe to RUGBY messages from bilbo twice<br />
 99     * - publish RUGBY messages to avalon (heron is the master)<br />
100     * - Kill bilbo, restart bilbo and check if we still get them
101     */ 
102    public void testSubscribeTwice() {
103       System.err.println("***SubscribeTest.testSubscribeTwice: Subscribe a message from a cluster slave ...");
104       try {
105          System.err.println("->Connect to avalon ...");
106          avalonCon = serverHelper.connect(serverHelper.getAvalonGlob(), null);
107 
108          {
109             System.err.println("->Connect to bilbo ...");
110             bilboCon = serverHelper.connect(serverHelper.getBilboGlob(), new I_Callback() {  // Login to xmlBlaster, register for updates
111                   public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
112                      if (updateQos.isErased()) {
113                         log.info("Ignoring erase message");
114                         return "";
115                      }
116                      updateCounterBilbo++;
117                      log.info(
118                               "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo + " ...");
119                      assertEquals("Wrong message updated", oid, updateKey.getOid());
120                      return "";
121                   }
122                });
123 
124             System.err.println("->Subscribe from bilbo ...");
125             SubscribeKey sk = new SubscribeKey(glob, oid);
126             sk.setDomain(domain);
127             SubscribeQos sq = new SubscribeQos(glob);
128             bilboCon.subscribe(sk.toXml(), sq.toXml());
129          }
130 
131          {
132             System.err.println("->Connect to bilbo 2 ...");
133             final Global bilboGlob2 = serverHelper.getBilboGlob().getClone(null);
134             bilboCon2 = serverHelper.connect(bilboGlob2, new I_Callback() {  // Login to xmlBlaster, register for updates
135                   public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
136                      if (updateQos.isErased()) {
137                         log.info("Ignoring erase message");
138                         return "";
139                      }
140                      updateCounterBilbo2++;
141                      log.info(
142                               "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo2 + " ...");
143                      assertEquals("#2 Wrong message updated", oid, updateKey.getOid());
144                      return "";
145                   }
146                });
147 
148             System.err.println("->Subscribe from bilbo 2 ...");
149             SubscribeKey sk = new SubscribeKey(glob, oid);
150             sk.setDomain(domain);
151             SubscribeQos sq = new SubscribeQos(glob);
152             bilboCon2.subscribe(sk.toXml(), sq.toXml());
153          }
154 
155          // First test subscribe ...
156          {
157             System.err.println("->Publish to avalon ...");
158             PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
159             PublishQos avalon_pq = new PublishQos(glob);
160             MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
161             PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
162             assertEquals("oid changed", oid, avalon_prq.getKeyOid());
163 
164 
165             try { Thread.sleep(2000); } catch( InterruptedException i) {}
166             if (1 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
167             assertEquals("message from avalon", 1, updateCounterBilbo);
168             if (1 != updateCounterBilbo2) log.severe("Did not expect " + updateCounterBilbo2 + " updates");
169             assertEquals("message from avalon #2", 1, updateCounterBilbo2);
170             updateCounterBilbo = 0;
171             updateCounterBilbo2 = 0;
172          }
173 
174          System.err.println("->testSubscribeTwice done, SUCCESS.");
175 
176          // ... and now test unSubscribe
177          {
178             System.err.println("->UnSubscribe from bilbo ...");
179             UnSubscribeKey usk = new UnSubscribeKey(glob, oid);
180             usk.setDomain(domain);
181             UnSubscribeQos usq = new UnSubscribeQos(glob);
182             UnSubscribeReturnQos[] usrq = bilboCon.unSubscribe(usk, usq);
183             assertEquals("", 1, usrq.length);
184 
185             System.err.println("->Publish to avalon ...");
186             PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
187             PublishQos avalon_pq = new PublishQos(glob);
188             MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
189             PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
190             assertEquals("oid changed", oid, avalon_prq.getKeyOid());
191 
192 
193             try { Thread.sleep(2000); } catch( InterruptedException i) {}
194             if (0 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
195             assertEquals("message from avalon", 0, updateCounterBilbo);
196             if (1 != updateCounterBilbo2) log.severe("Did not expect " + updateCounterBilbo2 + " updates");
197             assertEquals("message from avalon #2", 1, updateCounterBilbo2);
198             updateCounterBilbo = 0;
199             updateCounterBilbo2 = 0;
200          }
201 
202          System.err.println("->Trying to erase the message at the slave node ...");
203          EraseKey ek = new EraseKey(glob, oid);
204          ek.setDomain(domain);
205          EraseQos eq = new EraseQos(glob);
206          EraseReturnQos[] arr = avalonCon.erase(ek.toXml(), eq.toXml());
207          assertEquals("Erase", 1, arr.length);
208       }
209       catch (XmlBlasterException e) {
210          e.printStackTrace();
211          fail("SubscribeToBilbo-Exception: " + e.toString());
212       }
213       finally {
214          if (bilboCon != null) {
215             bilboCon.disconnect(null);
216             bilboCon = null;
217          }   
218          if (bilboCon2 != null) {
219             bilboCon2.disconnect(null);
220             bilboCon2 = null;
221          }   
222          if (avalonCon != null) {
223             avalonCon.disconnect(null);
224             avalonCon = null;
225          }
226       }
227 
228       System.err.println("***SubscribeTest.testSubscribeTwice: testSubscribeTwice [SUCCESS]");
229    }
230 
231    /**
232     * We start all nodes as described in requirement
233     * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
234     * <p />
235     * 1. publish RUGBY messages to avalon (heron is the master)<br />
236     * 2. Subscribe those messages from bilbo<br />
237     * 3. Kill bilbo, restart bilbo and check if we still get them
238     */ 
239    public void testSubscribe() {
240       System.err.println("***SubscribeTest: Subscribe a message from a cluster slave ...");
241 
242       int num = 2;
243       I_XmlBlasterAccess[] bilboCons = new I_XmlBlasterAccess[num];
244 
245       try {
246          System.err.println("->Connect to avalon ...");
247          avalonCon = serverHelper.connect(serverHelper.getAvalonGlob(), null);
248          try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
249 
250          for (int ii=0; ii<num; ii++) {
251             System.err.println("->Connect to bilbo #" + ii + " ...");
252             final Global bilboGlobii = serverHelper.getBilboGlob().getClone(null);
253             bilboCons[ii] = serverHelper.connect(bilboGlobii, new I_Callback() {  // Login to xmlBlaster, register for updates
254                   public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
255                      log.info(
256                               "Receiving update '" + updateKey.getOid() + "' state=" + updateQos.getState() + ", " + updateCounterBilbo + " ...");
257                      if (updateQos.isErased()) {
258                         log.info("Ignoring erase message");
259                         return "";
260                      }
261                      updateCounterBilbo++;
262                      log.info(
263                               "Receiving update '" + updateKey.getOid() + "' " + updateCounterBilbo + " ...");
264                      assertEquals("Wrong message updated", oid, updateKey.getOid());
265                      return "";
266                   }
267                });
268 
269             System.err.println("->Publish to avalon #" + ii + " ...");
270             PublishKey avalon_pk = new PublishKey(glob, oid, "text/plain", "1.0", domain);
271             PublishQos avalon_pq = new PublishQos(glob);
272             MsgUnit avalon_msgUnit = new MsgUnit(avalon_pk, contentStr, avalon_pq);
273             PublishReturnQos avalon_prq = avalonCon.publish(avalon_msgUnit);
274             assertEquals("oid changed", oid, avalon_prq.getKeyOid());
275 
276             try { Thread.sleep(1000L); } catch( InterruptedException i) {}
277             
278             System.err.println("->Subscribe from bilbo #" + ii + ", the message from avalon should arrive ...");
279             SubscribeKey sk = new SubscribeKey(glob, oid);
280             sk.setDomain(domain);
281             SubscribeQos sq = new SubscribeQos(glob);
282             bilboCons[ii].subscribe(sk.toXml(), sq.toXml());
283 
284             waitOnUpdate(2000L, 1);
285             try { Thread.sleep(1000); } catch( InterruptedException i) {} // wait longer to check if too many arrive
286             if (1 != updateCounterBilbo) log.severe("Did not expect " + updateCounterBilbo + " updates");
287             assertEquals("message from avalon", 1, updateCounterBilbo);
288             updateCounterBilbo = 0;
289 
290             System.err.println("->Trying to erase the message at the slave node ...");
291             EraseKey ek = new EraseKey(glob, oid);
292             ek.setDomain(domain);
293             EraseQos eq = new EraseQos(glob);
294             EraseReturnQos[] arr = avalonCon.erase(ek.toXml(), eq.toXml());
295             assertEquals("Erase", 1, arr.length);
296 
297             // Wait on erase events
298             try { Thread.sleep(1000); } catch( InterruptedException i) {}
299             updateCounterBilbo = 0;
300             updateCounterBilbo2 = 0;
301 
302             // We stay logged in but kill over callback server ...
303             bilboCons[ii].getCbServer().shutdown();
304          }
305 
306          System.err.println("->testSubscribe done, SUCCESS.");
307       }
308       catch (XmlBlasterException e) {
309          e.printStackTrace();
310          fail("SubscribeToBilbo-Exception: " + e.toString());
311       }
312       finally {
313          for (int jj=0; jj<bilboCons.length; jj++) {
314             if (bilboCons[jj] != null) {
315                bilboCons[jj].disconnect(null);
316                bilboCons[jj] = null;
317             }
318          }
319          if (avalonCon != null) {
320             avalonCon.disconnect(null);
321             avalonCon = null;
322          }
323       }
324 
325       System.err.println("***SubscribeTest: testSubscribe [SUCCESS]");
326 
327    }
328 
329    private void waitOnUpdate(final long timeout, final int numWait) {
330       long pollingInterval = 50L;  // check every 0.05 seconds
331       if (timeout < 50)  pollingInterval = timeout / 10L;
332       long sum = 0L;
333       while (updateCounterBilbo < numWait) {
334          try {
335             Thread.sleep(pollingInterval);
336          }
337          catch( InterruptedException i)
338          {}
339          sum += pollingInterval;
340          if (sum > timeout) {
341             log.warning("Timeout of " + timeout + " occurred");
342             break;
343          }
344       }
345    }
346 }


syntax highlighted by Code2HTML, v. 0.9.1