1 /*------------------------------------------------------------------------------
  2 Name:      TestPriorizedDispatchWithLostCallback.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.dispatch;
  7 
  8 import java.util.logging.Logger;
  9 import org.xmlBlaster.util.Global;
 10 import org.xmlBlaster.util.XmlBlasterException;
 11 import org.xmlBlaster.client.qos.ConnectQos;
 12 import org.xmlBlaster.util.def.PriorityEnum;
 13 import org.xmlBlaster.client.I_XmlBlasterAccess;
 14 import org.xmlBlaster.client.protocol.I_CallbackServer;
 15 import org.xmlBlaster.authentication.plugins.I_ClientPlugin;
 16 import org.xmlBlaster.client.protocol.AbstractCallbackExtended;
 17 import org.xmlBlaster.client.qos.PublishQos;
 18 import org.xmlBlaster.client.qos.PublishReturnQos;
 19 import org.xmlBlaster.client.qos.UpdateQos;
 20 import org.xmlBlaster.client.qos.SubscribeQos;
 21 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 22 import org.xmlBlaster.client.key.UpdateKey;
 23 import org.xmlBlaster.client.key.SubscribeKey;
 24 import org.xmlBlaster.util.MsgUnit;
 25 import org.xmlBlaster.util.def.Constants;
 26 import org.xmlBlaster.util.qos.address.CallbackAddress;
 27 import org.xmlBlaster.util.EmbeddedXmlBlaster;
 28 import org.xmlBlaster.test.Util;
 29 import org.xmlBlaster.test.Msg;
 30 import org.xmlBlaster.test.MsgInterceptor;
 31 import org.xmlBlaster.test.util.Client;
 32 //import org.xmlBlaster.test.util.Client;
 33 import org.xmlBlaster.client.protocol.xmlrpc.XmlRpcCallbackServer;
 34 
 35 import junit.framework.*;
 36 
 37 
 38 /**
 39  * This client tests the
 40  * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/dispatch.control.plugin.html">dispatch.control.plugin requirement</a>
 41  * <p />
 42  * We start our own xmlBlaster server in a thread.
 43  * This client may be invoked multiple time on the same xmlBlaster server,
 44  * as it cleans up everything after his tests are done.
 45  * <p>
 46  * <p>
 47  * This tests runs only based on XmlRpc, as with xmlrpc we can easily start/stop the callback server
 48  * independent from our connection
 49  * </p>
 50  * Invoke examples:<br />
 51  * <pre>
 52  *    java junit.textui.TestRunner org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback
 53  *    java junit.swingui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback
 54  * </pre>
 55  * @see org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin
 56  */
 57 public class TestPriorizedDispatchWithLostCallback extends TestCase
 58 {
 59    private Global glob;
 60    private static Logger log = Logger.getLogger(TestPriorizedDispatchWithLostCallback.class.getName());
 61 
 62    private ConnectQos connectQos;
 63    private I_XmlBlasterAccess con = null;
 64    private String name;
 65    private String passwd = "secret";
 66    private EmbeddedXmlBlaster serverThread;
 67    private int serverPort = 9660;
 68    private MsgInterceptor updateInterceptor;
 69    private MsgInterceptor updateMsgs; // just used as message container, class scope to be usable in inner update class
 70 
 71    private final String msgOid = "dispatchTestMessage";
 72 
 73    private int msgSequenceNumber = 0;
 74 
 75    private String statusOid = "_bandwidth.status";
 76    private String NORMAL_LINE = "2M";
 77    private String BACKUP_LINE = "64k";
 78    private String DEAD_LINE = "DOWN";
 79 
 80    /**
 81     * Constructs the TestPriorizedDispatchWithLostCallback object.
 82     * <p />
 83     * @param testName   The name used in the test suite
 84     * @param name       The name to login to the xmlBlaster
 85     */
 86    public TestPriorizedDispatchWithLostCallback(Global glob, String testName, String name) {
 87       super(testName);
 88       this.glob = glob;
 89 
 90       this.name = name;
 91    }
 92 
 93    /**
 94     * Sets up the fixture.
 95     * <p />
 96     * We start an own xmlBlaster server in a separate thread,
 97     * it is configured to load our demo dispatch plugin.
 98     * <p />
 99     * Then we connect as an XmlRpc client in fail save mode.
100     * We need to shutdown and restart the callback server and this is buggy with CORBA.
101     */
102    protected void setUp() {  
103       glob.init(Util.getOtherServerPorts(serverPort));
104       // We register here the demo plugin with xmlBlaster server, supplying an argument to the plugin
105       String[] args = {
106         "-ProtocolPlugin[XMLRPC][1.0]", "org.xmlBlaster.protocol.xmlrpc.XmlRpcDriver",
107         "-CbProtocolPlugin[XMLRPC][1.0]", "org.xmlBlaster.protocol.xmlrpc.CallbackXmlRpcDriver",
108         "-dispatch/connection/protocol", "XMLRPC",
109         "-dispatch/callback/protocol", "XMLRPC",
110         "-plugin/xmlrpc/singleChannel", "false",
111         "-plugin/xmlrpc/port", ""+(serverPort+1),
112         "-dispatch/callback/plugin/xmlrpc/port", ""+(serverPort+1),
113         "-DispatchPlugin[Priority][1.0]", "org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin",
114         "-DispatchPlugin/defaultPlugin", "undef",  // configure "Priority,1.0" below with CallbackAddress
115         "-PriorizedDispatchPlugin.user", "_PriorizedDispatchPlugin",
116         "-"+org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY, "<msgDispatch defaultStatus='" + BACKUP_LINE + "' defaultAction='send'/>\n"
117         // "PriorizedDispatchPlugin/config"
118          };
119       glob.init(args);
120 
121       this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
122       log.info("XmlBlaster is ready for testing the priority dispatch plugin");
123 
124       try {
125          // A testsuite helper to collect update messages
126          this.updateInterceptor = new MsgInterceptor(glob, log, null);
127 
128          // Connecting to server
129          log.info("Connecting with XmlRpc ...");
130          this.con = glob.getXmlBlasterAccess();
131          this.connectQos = new ConnectQos(glob, name, passwd);
132 
133          CallbackAddress cbAddress = new CallbackAddress(glob);
134          cbAddress.setDelay(1000L);      // retry connecting every 4 sec
135          cbAddress.setRetries(-1);       // -1 == forever
136          cbAddress.setPingInterval(5000L); // ping every 4 seconds
137          cbAddress.setDispatchPlugin("Priority,1.0");  // Activate plugin for callback only
138          this.connectQos.addCallbackAddress(cbAddress);
139 
140          this.con.connect(this.connectQos, this.updateInterceptor);
141       }
142       catch (Exception e) {
143          e.printStackTrace();
144          log.severe("Can't connect to xmlBlaster: " + e.getMessage());
145       }
146 
147       this.updateInterceptor.clear();
148    }
149 
150    private void publish(String oid, int priority) {
151       PriorityEnum prio = PriorityEnum.toPriorityEnum(priority);
152       try {
153          msgSequenceNumber++;
154          String content = "" + msgSequenceNumber;
155          PublishQos pq = new PublishQos(glob);
156          pq.setPriority(prio);
157          PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", content.getBytes(), pq.toXml()));
158          log.info("SUCCESS publish '" + oid + "' with prio=" + prio.toString() + " returned state=" + rq.getState());
159          assertEquals("Returned oid wrong", oid, rq.getKeyOid());
160          assertEquals("Return not OK", Constants.STATE_OK, rq.getState());
161       } catch(XmlBlasterException e) {
162          log.warning("XmlBlasterException: " + e.getMessage());
163          fail("publish prio=" + prio.toString() + " - XmlBlasterException: " + e.getMessage());
164       }
165    }
166 
167    /**
168     * Change the configuration of the plugin
169     */
170    private void publishNewConfig(String config) {
171       String configKey = org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY; // -PriorizedDispatchPlugin/config=
172       try {
173          String oid = "__cmd:sysprop/?" + configKey;
174          String contentStr = config;
175          PublishQos pq = new PublishQos(glob);
176          PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", contentStr.getBytes(), pq.toXml()));
177          log.info("SUCCESS publish '" + oid + "' " + pq.toXml() + ", returned state=" + rq.getState());
178          assertEquals("Returned oid wrong", oid, rq.getKeyOid());
179          assertEquals("Return not OK", Constants.STATE_OK, rq.getState());
180       } catch(XmlBlasterException e) {
181          log.warning("XmlBlasterException: " + e.getMessage());
182          e.printStackTrace();
183          fail("publish of configuration data - XmlBlasterException: " + e.getMessage());
184       }
185    }
186 
187    private void subscribe(String oid) {
188       try {
189          SubscribeKey sk = new SubscribeKey(glob, oid);
190          SubscribeQos sq = new SubscribeQos(glob);
191          SubscribeReturnQos srq = con.subscribe(sk.toXml(), sq.toXml());
192          log.info("SUCCESS subscribe to '" + oid + "' returned state=" + srq.getState());
193       } catch(XmlBlasterException e) {
194          log.warning("XmlBlasterException: " + e.getMessage());
195          fail("subscribe - XmlBlasterException: " + e.getMessage());
196       }
197    }
198 
199    /**
200     * Tests what happens if dispatcher frameworks looses the callback connection to us
201     * and starts polling
202     */
203    public void testPriorizedDispatchPluginConnectionState() {
204       log.info("testPriorizedDispatchPluginConnectionState() ...");
205       String config = 
206             "<msgDispatch defaultStatus='" + NORMAL_LINE + "' defaultAction='send'>\n"+
207             "  <onStatus oid='" + statusOid + "' content='" + NORMAL_LINE + "' defaultAction='send'>\n" +
208             "    <action do='send'  ifPriority='0-9'/>\n" +
209             "  </onStatus>\n" +
210             "  <onStatus oid='" + statusOid + "' content='" + DEAD_LINE + "' defaultAction='queue' connectionState='polling'>\n" +
211             "    <action do='queue'  ifPriority='4-9'/>\n" +
212             "    <action do='destroy'  ifPriority='0-3'/>\n" +
213             "  </onStatus>\n" +
214             "</msgDispatch>\n";
215 
216       publishNewConfig(config);
217 
218 
219       String text = "Testing configuration";
220 
221       long sleep = 2000L;
222 
223       subscribe(msgOid);
224 
225       int maxPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;
226 
227       // First check normal operation
228       //changeStatus(statusOid, NORMAL_LINE);
229       publish(msgOid, 1);
230       assertEquals(text, 1, this.updateInterceptor.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
231       this.updateInterceptor.clear();
232 
233       // Now kill our callback server
234       log.info("Shutdown callback, expecting messages to be queued or destroyed depending on the priority");
235       try {
236          Client.shutdownCb(con, Client.Shutdown.KEEP_LOGGED_IN);
237       }
238       catch (XmlBlasterException ex) {
239          ex.printStackTrace();
240          assertTrue("Exception " + ex.getMessage() + " should not occur", false);
241       }
242       this.updateInterceptor.clear();
243 
244       // These messages are depending on the priority queued or destroyed
245       // as the callback connection is polling ...
246       for (int priority=0; priority < maxPrio; priority++) {
247          publish(msgOid, priority);
248       }
249       assertEquals(text, 0, this.updateInterceptor.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
250       this.updateInterceptor.clear();
251 
252 
253       // Now reestablish the callback server ...
254       I_CallbackServer cbServer = null;
255       try {
256          updateMsgs = new MsgInterceptor(glob, log, null); // just used as message container
257          this.updateInterceptor.clear();
258          try {
259             // TODO change this since it can not work when using xmlrpc with singleChannel=true
260             cbServer = new XmlRpcCallbackServer();
261             CallbackAddress cbAddress = new CallbackAddress(glob);
262             cbServer.initialize(this.glob, name, cbAddress, new AbstractCallbackExtended(glob) {
263                public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
264                   try {
265                      String contentStr = new String(content);
266                      String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;
267                      log.info("Receiving update of a message oid=" + updateKey.getOid() +
268                          " priority=" + updateQos.getPriority() +
269                          " state=" + updateQos.getState() +
270                          " content=" + cont);
271                      if (!updateQos.isErased()) {
272                         updateMsgs.add(new Msg(cbSessionId, updateKey, content, updateQos));
273                      }
274                   }
275                   catch (Throwable e) {
276                      log.severe("Error in update method: " + e.toString());
277                      e.printStackTrace();
278                   }
279                   return "";
280                }
281                public I_ClientPlugin getSecurityPlugin() { return null; }
282                public void lostConnection(XmlBlasterException xmlBlasterException) {}
283             }); // Establish new callback server
284          }
285          catch (Throwable e) {
286             log.severe("Can't restart callback server: " + e.toString());
287             fail("Can't restart callback server: " + e.toString());
288          }
289 
290          log.info("Waiting long enough that xmlBlaster reconnected to us and expecting the 6 queued messages ...");
291          try { Thread.sleep(3000L); } catch( InterruptedException i) {}
292          assertEquals(text, 0, this.updateInterceptor.getMsgs().length);
293          assertEquals(text, 6, updateMsgs.getMsgs(msgOid, Constants.STATE_OK).length);
294          Msg[] msgArr = updateMsgs.getMsgs();
295          assertEquals(text, 6, msgArr.length);
296          int lastNum = -1;
297          int lastPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;
298          for (int i=0; i<msgArr.length; i++) {
299             int currPrio = msgArr[i].getUpdateQos().getPriority().getInt();
300             int currNum = msgArr[i].getContentInt();
301             if (lastPrio < currPrio || lastPrio == currPrio && lastNum >= currNum)
302                fail(text + " Sequence is not ascending: last=" + lastNum + " curr=" + currNum);
303             lastNum = currNum;
304             lastPrio = currPrio;
305          }
306          assertEquals("", PriorityEnum.MAX_PRIORITY, msgArr[0].getUpdateQos().getPriority());
307          assertEquals("", 4, msgArr[5].getUpdateQos().getPriority().getInt());
308          updateMsgs.clear();
309          this.updateInterceptor.clear();
310       }
311       finally {
312          if (cbServer != null) {
313             try { cbServer.shutdown(); } catch (Exception e) { log.severe(e.toString()); };
314          }
315       }
316 
317       log.info("Success in testPriorizedDispatchPluginConnectionState()");
318    }
319 
320    /**
321     * Tears down the fixture.
322     * <p />
323     * cleaning up .... erase() the previous message OID and logout
324     */
325    protected void tearDown() {
326       try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait some time
327 
328       con.disconnect(null);
329       con = null;
330       
331       try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
332       EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
333       this.serverThread = null;
334 
335       // reset to default server port (necessary if other tests follow in the same JVM).
336       Util.resetPorts(glob);
337       this.glob = null;
338      
339       this.connectQos = null;
340       this.con = null;
341       this.updateInterceptor = null;
342       this.updateMsgs = null;
343       Global.instance().shutdown();
344    }
345 
346    /**
347     * Method is used by TestRunner to load these tests
348     */
349    public static Test suite() {
350        TestSuite suite= new TestSuite();
351        suite.addTest(new TestPriorizedDispatchWithLostCallback(Global.instance(), "testPriorizedDispatchPluginConnectionState", "PriorizedDispatchPluginRecovery"));
352        return suite;
353    }
354 
355    /**
356     * Invoke: 
357     * <pre>
358     *  java org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback  -logging/org.xmlBlaster.engine.dispatch FINE -logging/org.xmlBlaster.util.dispatch FINE -logging/org.xmlBlaster.engine FINEST
359     *  java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback
360     * <pre>
361     */
362    public static void main(String args[]) {
363       Global glob = new Global();
364       if (glob.init(args) != 0) {
365          System.exit(0);
366       }
367       TestPriorizedDispatchWithLostCallback testSub = new TestPriorizedDispatchWithLostCallback(glob, "TestPriorizedDispatchWithLostCallback", "TestPriorizedDispatchWithLostCallback");
368       testSub.setUp();
369       testSub.testPriorizedDispatchPluginConnectionState();
370       testSub.tearDown();
371    }
372 }


syntax highlighted by Code2HTML, v. 0.9.1