1 /*------------------------------------------------------------------------------
2 Name: TestPriorizedDispatchWithLostCallback.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.dispatch;
7
8 import java.util.logging.Logger;
9 import org.xmlBlaster.util.Global;
10 import org.xmlBlaster.util.XmlBlasterException;
11 import org.xmlBlaster.client.qos.ConnectQos;
12 import org.xmlBlaster.util.def.PriorityEnum;
13 import org.xmlBlaster.client.I_XmlBlasterAccess;
14 import org.xmlBlaster.client.protocol.I_CallbackServer;
15 import org.xmlBlaster.authentication.plugins.I_ClientPlugin;
16 import org.xmlBlaster.client.protocol.AbstractCallbackExtended;
17 import org.xmlBlaster.client.qos.PublishQos;
18 import org.xmlBlaster.client.qos.PublishReturnQos;
19 import org.xmlBlaster.client.qos.UpdateQos;
20 import org.xmlBlaster.client.qos.SubscribeQos;
21 import org.xmlBlaster.client.qos.SubscribeReturnQos;
22 import org.xmlBlaster.client.key.UpdateKey;
23 import org.xmlBlaster.client.key.SubscribeKey;
24 import org.xmlBlaster.util.MsgUnit;
25 import org.xmlBlaster.util.def.Constants;
26 import org.xmlBlaster.util.qos.address.CallbackAddress;
27 import org.xmlBlaster.util.EmbeddedXmlBlaster;
28 import org.xmlBlaster.test.Util;
29 import org.xmlBlaster.test.Msg;
30 import org.xmlBlaster.test.MsgInterceptor;
31 import org.xmlBlaster.test.util.Client;
32 //import org.xmlBlaster.test.util.Client;
33 import org.xmlBlaster.client.protocol.xmlrpc.XmlRpcCallbackServer;
34
35 import junit.framework.*;
36
37
38 /**
39 * This client tests the
40 * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/dispatch.control.plugin.html">dispatch.control.plugin requirement</a>
41 * <p />
42 * We start our own xmlBlaster server in a thread.
43 * This client may be invoked multiple time on the same xmlBlaster server,
44 * as it cleans up everything after his tests are done.
45 * <p>
46 * <p>
47 * This tests runs only based on XmlRpc, as with xmlrpc we can easily start/stop the callback server
48 * independent from our connection
49 * </p>
50 * Invoke examples:<br />
51 * <pre>
52 * java junit.textui.TestRunner org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback
53 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback
54 * </pre>
55 * @see org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin
56 */
57 public class TestPriorizedDispatchWithLostCallback extends TestCase
58 {
59 private Global glob;
60 private static Logger log = Logger.getLogger(TestPriorizedDispatchWithLostCallback.class.getName());
61
62 private ConnectQos connectQos;
63 private I_XmlBlasterAccess con = null;
64 private String name;
65 private String passwd = "secret";
66 private EmbeddedXmlBlaster serverThread;
67 private int serverPort = 9660;
68 private MsgInterceptor updateInterceptor;
69 private MsgInterceptor updateMsgs; // just used as message container, class scope to be usable in inner update class
70
71 private final String msgOid = "dispatchTestMessage";
72
73 private int msgSequenceNumber = 0;
74
75 private String statusOid = "_bandwidth.status";
76 private String NORMAL_LINE = "2M";
77 private String BACKUP_LINE = "64k";
78 private String DEAD_LINE = "DOWN";
79
80 /**
81 * Constructs the TestPriorizedDispatchWithLostCallback object.
82 * <p />
83 * @param testName The name used in the test suite
84 * @param name The name to login to the xmlBlaster
85 */
86 public TestPriorizedDispatchWithLostCallback(Global glob, String testName, String name) {
87 super(testName);
88 this.glob = glob;
89
90 this.name = name;
91 }
92
93 /**
94 * Sets up the fixture.
95 * <p />
96 * We start an own xmlBlaster server in a separate thread,
97 * it is configured to load our demo dispatch plugin.
98 * <p />
99 * Then we connect as an XmlRpc client in fail save mode.
100 * We need to shutdown and restart the callback server and this is buggy with CORBA.
101 */
102 protected void setUp() {
103 glob.init(Util.getOtherServerPorts(serverPort));
104 // We register here the demo plugin with xmlBlaster server, supplying an argument to the plugin
105 String[] args = {
106 "-ProtocolPlugin[XMLRPC][1.0]", "org.xmlBlaster.protocol.xmlrpc.XmlRpcDriver",
107 "-CbProtocolPlugin[XMLRPC][1.0]", "org.xmlBlaster.protocol.xmlrpc.CallbackXmlRpcDriver",
108 "-dispatch/connection/protocol", "XMLRPC",
109 "-dispatch/callback/protocol", "XMLRPC",
110 "-plugin/xmlrpc/singleChannel", "false",
111 "-plugin/xmlrpc/port", ""+(serverPort+1),
112 "-dispatch/callback/plugin/xmlrpc/port", ""+(serverPort+1),
113 "-DispatchPlugin[Priority][1.0]", "org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin",
114 "-DispatchPlugin/defaultPlugin", "undef", // configure "Priority,1.0" below with CallbackAddress
115 "-PriorizedDispatchPlugin.user", "_PriorizedDispatchPlugin",
116 "-"+org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY, "<msgDispatch defaultStatus='" + BACKUP_LINE + "' defaultAction='send'/>\n"
117 // "PriorizedDispatchPlugin/config"
118 };
119 glob.init(args);
120
121 this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
122 log.info("XmlBlaster is ready for testing the priority dispatch plugin");
123
124 try {
125 // A testsuite helper to collect update messages
126 this.updateInterceptor = new MsgInterceptor(glob, log, null);
127
128 // Connecting to server
129 log.info("Connecting with XmlRpc ...");
130 this.con = glob.getXmlBlasterAccess();
131 this.connectQos = new ConnectQos(glob, name, passwd);
132
133 CallbackAddress cbAddress = new CallbackAddress(glob);
134 cbAddress.setDelay(1000L); // retry connecting every 4 sec
135 cbAddress.setRetries(-1); // -1 == forever
136 cbAddress.setPingInterval(5000L); // ping every 4 seconds
137 cbAddress.setDispatchPlugin("Priority,1.0"); // Activate plugin for callback only
138 this.connectQos.addCallbackAddress(cbAddress);
139
140 this.con.connect(this.connectQos, this.updateInterceptor);
141 }
142 catch (Exception e) {
143 e.printStackTrace();
144 log.severe("Can't connect to xmlBlaster: " + e.getMessage());
145 }
146
147 this.updateInterceptor.clear();
148 }
149
150 private void publish(String oid, int priority) {
151 PriorityEnum prio = PriorityEnum.toPriorityEnum(priority);
152 try {
153 msgSequenceNumber++;
154 String content = "" + msgSequenceNumber;
155 PublishQos pq = new PublishQos(glob);
156 pq.setPriority(prio);
157 PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", content.getBytes(), pq.toXml()));
158 log.info("SUCCESS publish '" + oid + "' with prio=" + prio.toString() + " returned state=" + rq.getState());
159 assertEquals("Returned oid wrong", oid, rq.getKeyOid());
160 assertEquals("Return not OK", Constants.STATE_OK, rq.getState());
161 } catch(XmlBlasterException e) {
162 log.warning("XmlBlasterException: " + e.getMessage());
163 fail("publish prio=" + prio.toString() + " - XmlBlasterException: " + e.getMessage());
164 }
165 }
166
167 /**
168 * Change the configuration of the plugin
169 */
170 private void publishNewConfig(String config) {
171 String configKey = org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY; // -PriorizedDispatchPlugin/config=
172 try {
173 String oid = "__cmd:sysprop/?" + configKey;
174 String contentStr = config;
175 PublishQos pq = new PublishQos(glob);
176 PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", contentStr.getBytes(), pq.toXml()));
177 log.info("SUCCESS publish '" + oid + "' " + pq.toXml() + ", returned state=" + rq.getState());
178 assertEquals("Returned oid wrong", oid, rq.getKeyOid());
179 assertEquals("Return not OK", Constants.STATE_OK, rq.getState());
180 } catch(XmlBlasterException e) {
181 log.warning("XmlBlasterException: " + e.getMessage());
182 e.printStackTrace();
183 fail("publish of configuration data - XmlBlasterException: " + e.getMessage());
184 }
185 }
186
187 private void subscribe(String oid) {
188 try {
189 SubscribeKey sk = new SubscribeKey(glob, oid);
190 SubscribeQos sq = new SubscribeQos(glob);
191 SubscribeReturnQos srq = con.subscribe(sk.toXml(), sq.toXml());
192 log.info("SUCCESS subscribe to '" + oid + "' returned state=" + srq.getState());
193 } catch(XmlBlasterException e) {
194 log.warning("XmlBlasterException: " + e.getMessage());
195 fail("subscribe - XmlBlasterException: " + e.getMessage());
196 }
197 }
198
199 /**
200 * Tests what happens if dispatcher frameworks looses the callback connection to us
201 * and starts polling
202 */
203 public void testPriorizedDispatchPluginConnectionState() {
204 log.info("testPriorizedDispatchPluginConnectionState() ...");
205 String config =
206 "<msgDispatch defaultStatus='" + NORMAL_LINE + "' defaultAction='send'>\n"+
207 " <onStatus oid='" + statusOid + "' content='" + NORMAL_LINE + "' defaultAction='send'>\n" +
208 " <action do='send' ifPriority='0-9'/>\n" +
209 " </onStatus>\n" +
210 " <onStatus oid='" + statusOid + "' content='" + DEAD_LINE + "' defaultAction='queue' connectionState='polling'>\n" +
211 " <action do='queue' ifPriority='4-9'/>\n" +
212 " <action do='destroy' ifPriority='0-3'/>\n" +
213 " </onStatus>\n" +
214 "</msgDispatch>\n";
215
216 publishNewConfig(config);
217
218
219 String text = "Testing configuration";
220
221 long sleep = 2000L;
222
223 subscribe(msgOid);
224
225 int maxPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;
226
227 // First check normal operation
228 //changeStatus(statusOid, NORMAL_LINE);
229 publish(msgOid, 1);
230 assertEquals(text, 1, this.updateInterceptor.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
231 this.updateInterceptor.clear();
232
233 // Now kill our callback server
234 log.info("Shutdown callback, expecting messages to be queued or destroyed depending on the priority");
235 try {
236 Client.shutdownCb(con, Client.Shutdown.KEEP_LOGGED_IN);
237 }
238 catch (XmlBlasterException ex) {
239 ex.printStackTrace();
240 assertTrue("Exception " + ex.getMessage() + " should not occur", false);
241 }
242 this.updateInterceptor.clear();
243
244 // These messages are depending on the priority queued or destroyed
245 // as the callback connection is polling ...
246 for (int priority=0; priority < maxPrio; priority++) {
247 publish(msgOid, priority);
248 }
249 assertEquals(text, 0, this.updateInterceptor.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
250 this.updateInterceptor.clear();
251
252
253 // Now reestablish the callback server ...
254 I_CallbackServer cbServer = null;
255 try {
256 updateMsgs = new MsgInterceptor(glob, log, null); // just used as message container
257 this.updateInterceptor.clear();
258 try {
259 // TODO change this since it can not work when using xmlrpc with singleChannel=true
260 cbServer = new XmlRpcCallbackServer();
261 CallbackAddress cbAddress = new CallbackAddress(glob);
262 cbServer.initialize(this.glob, name, cbAddress, new AbstractCallbackExtended(glob) {
263 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
264 try {
265 String contentStr = new String(content);
266 String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;
267 log.info("Receiving update of a message oid=" + updateKey.getOid() +
268 " priority=" + updateQos.getPriority() +
269 " state=" + updateQos.getState() +
270 " content=" + cont);
271 if (!updateQos.isErased()) {
272 updateMsgs.add(new Msg(cbSessionId, updateKey, content, updateQos));
273 }
274 }
275 catch (Throwable e) {
276 log.severe("Error in update method: " + e.toString());
277 e.printStackTrace();
278 }
279 return "";
280 }
281 public I_ClientPlugin getSecurityPlugin() { return null; }
282 public void lostConnection(XmlBlasterException xmlBlasterException) {}
283 }); // Establish new callback server
284 }
285 catch (Throwable e) {
286 log.severe("Can't restart callback server: " + e.toString());
287 fail("Can't restart callback server: " + e.toString());
288 }
289
290 log.info("Waiting long enough that xmlBlaster reconnected to us and expecting the 6 queued messages ...");
291 try { Thread.sleep(3000L); } catch( InterruptedException i) {}
292 assertEquals(text, 0, this.updateInterceptor.getMsgs().length);
293 assertEquals(text, 6, updateMsgs.getMsgs(msgOid, Constants.STATE_OK).length);
294 Msg[] msgArr = updateMsgs.getMsgs();
295 assertEquals(text, 6, msgArr.length);
296 int lastNum = -1;
297 int lastPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;
298 for (int i=0; i<msgArr.length; i++) {
299 int currPrio = msgArr[i].getUpdateQos().getPriority().getInt();
300 int currNum = msgArr[i].getContentInt();
301 if (lastPrio < currPrio || lastPrio == currPrio && lastNum >= currNum)
302 fail(text + " Sequence is not ascending: last=" + lastNum + " curr=" + currNum);
303 lastNum = currNum;
304 lastPrio = currPrio;
305 }
306 assertEquals("", PriorityEnum.MAX_PRIORITY, msgArr[0].getUpdateQos().getPriority());
307 assertEquals("", 4, msgArr[5].getUpdateQos().getPriority().getInt());
308 updateMsgs.clear();
309 this.updateInterceptor.clear();
310 }
311 finally {
312 if (cbServer != null) {
313 try { cbServer.shutdown(); } catch (Exception e) { log.severe(e.toString()); };
314 }
315 }
316
317 log.info("Success in testPriorizedDispatchPluginConnectionState()");
318 }
319
320 /**
321 * Tears down the fixture.
322 * <p />
323 * cleaning up .... erase() the previous message OID and logout
324 */
325 protected void tearDown() {
326 try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait some time
327
328 con.disconnect(null);
329 con = null;
330
331 try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
332 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
333 this.serverThread = null;
334
335 // reset to default server port (necessary if other tests follow in the same JVM).
336 Util.resetPorts(glob);
337 this.glob = null;
338
339 this.connectQos = null;
340 this.con = null;
341 this.updateInterceptor = null;
342 this.updateMsgs = null;
343 Global.instance().shutdown();
344 }
345
346 /**
347 * Method is used by TestRunner to load these tests
348 */
349 public static Test suite() {
350 TestSuite suite= new TestSuite();
351 suite.addTest(new TestPriorizedDispatchWithLostCallback(Global.instance(), "testPriorizedDispatchPluginConnectionState", "PriorizedDispatchPluginRecovery"));
352 return suite;
353 }
354
355 /**
356 * Invoke:
357 * <pre>
358 * java org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback -logging/org.xmlBlaster.engine.dispatch FINE -logging/org.xmlBlaster.util.dispatch FINE -logging/org.xmlBlaster.engine FINEST
359 * java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback
360 * <pre>
361 */
362 public static void main(String args[]) {
363 Global glob = new Global();
364 if (glob.init(args) != 0) {
365 System.exit(0);
366 }
367 TestPriorizedDispatchWithLostCallback testSub = new TestPriorizedDispatchWithLostCallback(glob, "TestPriorizedDispatchWithLostCallback", "TestPriorizedDispatchWithLostCallback");
368 testSub.setUp();
369 testSub.testPriorizedDispatchPluginConnectionState();
370 testSub.tearDown();
371 }
372 }
syntax highlighted by Code2HTML, v. 0.9.1