1 /*------------------------------------------------------------------------------
2 Name: TestPriorizedDispatchWithLostCallback.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.dispatch;
7
8 import java.util.logging.Logger;
9 import org.xmlBlaster.util.Global;
10 import org.xmlBlaster.util.XmlBlasterException;
11 import org.xmlBlaster.client.qos.ConnectQos;
12 import org.xmlBlaster.util.def.PriorityEnum;
13 import org.xmlBlaster.client.I_XmlBlasterAccess;
14 import org.xmlBlaster.client.protocol.I_CallbackServer;
15 import org.xmlBlaster.authentication.plugins.I_ClientPlugin;
16 import org.xmlBlaster.client.protocol.AbstractCallbackExtended;
17 import org.xmlBlaster.client.qos.PublishQos;
18 import org.xmlBlaster.client.qos.PublishReturnQos;
19 import org.xmlBlaster.client.qos.UpdateQos;
20 import org.xmlBlaster.client.qos.SubscribeQos;
21 import org.xmlBlaster.client.qos.SubscribeReturnQos;
22 import org.xmlBlaster.client.key.UpdateKey;
23 import org.xmlBlaster.client.key.SubscribeKey;
24 import org.xmlBlaster.util.MsgUnit;
25 import org.xmlBlaster.util.def.Constants;
26 import org.xmlBlaster.util.qos.address.CallbackAddress;
27 import org.xmlBlaster.util.EmbeddedXmlBlaster;
28 import org.xmlBlaster.test.Util;
29 import org.xmlBlaster.test.Msg;
30 import org.xmlBlaster.test.MsgInterceptor;
31 import org.xmlBlaster.client.protocol.xmlrpc.XmlRpcCallbackServer;
32
33 import junit.framework.*;
34
35
36 /**
37 * This client tests the
38 * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/dispatch.control.plugin.html">dispatch.control.plugin requirement</a>
39 * <p />
40 * We start our own xmlBlaster server in a thread.
41 * This client may be invoked multiple time on the same xmlBlaster server,
42 * as it cleans up everything after his tests are done.
43 * <p>
44 * <p>
45 * This tests runs only based on XmlRpc, as with xmlrpc we can easily start/stop the callback server
46 * independent from our connection
47 * </p>
48 * Invoke examples:<br />
49 * <pre>
50 * java junit.textui.TestRunner org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback
51 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback
52 * </pre>
53 * @see org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin
54 */
55 public class TestPriorizedDispatchWithLostCallback extends TestCase
56 {
57 private Global glob;
58 private static Logger log = Logger.getLogger(TestPriorizedDispatchWithLostCallback.class.getName());
59
60 private ConnectQos connectQos;
61 private I_XmlBlasterAccess con = null;
62 private String name;
63 private String passwd = "secret";
64 private EmbeddedXmlBlaster serverThread;
65 private int serverPort = 9660;
66 private MsgInterceptor updateInterceptor;
67 private MsgInterceptor updateMsgs; // just used as message container, class scope to be usable in inner update class
68
69 private final String msgOid = "dispatchTestMessage";
70
71 private int msgSequenceNumber = 0;
72
73 private String statusOid = "_bandwidth.status";
74 private String NORMAL_LINE = "2M";
75 private String BACKUP_LINE = "64k";
76 private String DEAD_LINE = "DOWN";
77
78 /**
79 * Constructs the TestPriorizedDispatchWithLostCallback object.
80 * <p />
81 * @param testName The name used in the test suite
82 * @param name The name to login to the xmlBlaster
83 */
84 public TestPriorizedDispatchWithLostCallback(Global glob, String testName, String name) {
85 super(testName);
86 this.glob = glob;
87
88 this.name = name;
89 }
90
91 /**
92 * Sets up the fixture.
93 * <p />
94 * We start an own xmlBlaster server in a separate thread,
95 * it is configured to load our demo dispatch plugin.
96 * <p />
97 * Then we connect as an XmlRpc client in fail save mode.
98 * We need to shutdown and restart the callback server and this is buggy with CORBA.
99 */
100 protected void setUp() {
101 glob.init(Util.getOtherServerPorts(serverPort));
102 // We register here the demo plugin with xmlBlaster server, supplying an argument to the plugin
103 String[] args = {
104 "-ProtocolPlugin[XMLRPC][1.0]", "org.xmlBlaster.protocol.xmlrpc.XmlRpcDriver",
105 "-CbProtocolPlugin[XMLRPC][1.0]", "org.xmlBlaster.protocol.xmlrpc.CallbackXmlRpcDriver",
106 "-dispatch/connection/protocol", "XMLRPC",
107 "-dispatch/callback/protocol", "XMLRPC",
108 "-plugin/xmlrpc/port", ""+(serverPort+1),
109 "-dispatch/callback/plugin/xmlrpc/port", ""+(serverPort+1),
110 "-DispatchPlugin[Priority][1.0]", "org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin",
111 "-DispatchPlugin/defaultPlugin", "undef", // configure "Priority,1.0" below with CallbackAddress
112 "-PriorizedDispatchPlugin.user", "_PriorizedDispatchPlugin",
113 "-"+org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY, "<msgDispatch defaultStatus='" + BACKUP_LINE + "' defaultAction='send'/>\n"
114 // "PriorizedDispatchPlugin/config"
115 };
116 glob.init(args);
117
118 this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
119 log.info("XmlBlaster is ready for testing the priority dispatch plugin");
120
121 try {
122 // A testsuite helper to collect update messages
123 this.updateInterceptor = new MsgInterceptor(glob, log, null);
124
125 // Connecting to server
126 log.info("Connecting with XmlRpc ...");
127 this.con = glob.getXmlBlasterAccess();
128 this.connectQos = new ConnectQos(glob, name, passwd);
129
130 CallbackAddress cbAddress = new CallbackAddress(glob);
131 cbAddress.setDelay(1000L); // retry connecting every 4 sec
132 cbAddress.setRetries(-1); // -1 == forever
133 cbAddress.setPingInterval(5000L); // ping every 4 seconds
134 cbAddress.setDispatchPlugin("Priority,1.0"); // Activate plugin for callback only
135 this.connectQos.addCallbackAddress(cbAddress);
136
137 this.con.connect(this.connectQos, this.updateInterceptor);
138 }
139 catch (Exception e) {
140 e.printStackTrace();
141 log.severe("Can't connect to xmlBlaster: " + e.getMessage());
142 }
143
144 this.updateInterceptor.clear();
145 }
146
147 private void publish(String oid, int priority) {
148 PriorityEnum prio = PriorityEnum.toPriorityEnum(priority);
149 try {
150 msgSequenceNumber++;
151 String content = "" + msgSequenceNumber;
152 PublishQos pq = new PublishQos(glob);
153 pq.setPriority(prio);
154 PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", content.getBytes(), pq.toXml()));
155 log.info("SUCCESS publish '" + oid + "' with prio=" + prio.toString() + " returned state=" + rq.getState());
156 assertEquals("Returned oid wrong", oid, rq.getKeyOid());
157 assertEquals("Return not OK", Constants.STATE_OK, rq.getState());
158 } catch(XmlBlasterException e) {
159 log.warning("XmlBlasterException: " + e.getMessage());
160 fail("publish prio=" + prio.toString() + " - XmlBlasterException: " + e.getMessage());
161 }
162 }
163
164 /**
165 * Change the configuration of the plugin
166 */
167 private void publishNewConfig(String config) {
168 String configKey = org.xmlBlaster.util.dispatch.plugins.prio.PriorizedDispatchPlugin.CONFIG_PROPERTY_KEY; // -PriorizedDispatchPlugin/config=
169 try {
170 String oid = "__cmd:sysprop/?" + configKey;
171 String contentStr = config;
172 PublishQos pq = new PublishQos(glob);
173 PublishReturnQos rq = con.publish(new MsgUnit("<key oid='"+oid+"'/>", contentStr.getBytes(), pq.toXml()));
174 log.info("SUCCESS publish '" + oid + "' " + pq.toXml() + ", returned state=" + rq.getState());
175 assertEquals("Returned oid wrong", oid, rq.getKeyOid());
176 assertEquals("Return not OK", Constants.STATE_OK, rq.getState());
177 } catch(XmlBlasterException e) {
178 log.warning("XmlBlasterException: " + e.getMessage());
179 e.printStackTrace();
180 fail("publish of configuration data - XmlBlasterException: " + e.getMessage());
181 }
182 }
183
184 private void subscribe(String oid) {
185 try {
186 SubscribeKey sk = new SubscribeKey(glob, oid);
187 SubscribeQos sq = new SubscribeQos(glob);
188 SubscribeReturnQos srq = con.subscribe(sk.toXml(), sq.toXml());
189 log.info("SUCCESS subscribe to '" + oid + "' returned state=" + srq.getState());
190 } catch(XmlBlasterException e) {
191 log.warning("XmlBlasterException: " + e.getMessage());
192 fail("subscribe - XmlBlasterException: " + e.getMessage());
193 }
194 }
195
196 /**
197 * Tests what happens if dispatcher frameworks looses the callback connection to us
198 * and starts polling
199 */
200 public void testPriorizedDispatchPluginConnectionState() {
201 log.info("testPriorizedDispatchPluginConnectionState() ...");
202 String config =
203 "<msgDispatch defaultStatus='" + NORMAL_LINE + "' defaultAction='send'>\n"+
204 " <onStatus oid='" + statusOid + "' content='" + NORMAL_LINE + "' defaultAction='send'>\n" +
205 " <action do='send' ifPriority='0-9'/>\n" +
206 " </onStatus>\n" +
207 " <onStatus oid='" + statusOid + "' content='" + DEAD_LINE + "' defaultAction='queue' connectionState='polling'>\n" +
208 " <action do='queue' ifPriority='4-9'/>\n" +
209 " <action do='destroy' ifPriority='0-3'/>\n" +
210 " </onStatus>\n" +
211 "</msgDispatch>\n";
212
213 publishNewConfig(config);
214
215
216 String text = "Testing configuration";
217
218 long sleep = 2000L;
219
220 subscribe(msgOid);
221
222 int maxPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;
223
224 // First check normal operation
225 //changeStatus(statusOid, NORMAL_LINE);
226 publish(msgOid, 1);
227 assertEquals(text, 1, this.updateInterceptor.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
228 this.updateInterceptor.clear();
229
230 // Now kill our callback server
231 log.info("Shutdown callback, expecting messages to be queued or destroyed depending on the priority");
232 try {
233 con.getCbServer().shutdown();
234 }
235 catch (XmlBlasterException e) {
236 fail("ShutdownCB: " + e.getMessage());
237 }
238 this.updateInterceptor.clear();
239
240 // These messages are depending on the priority queued or destroyed
241 // as the callback connection is polling ...
242 for (int priority=0; priority < maxPrio; priority++) {
243 publish(msgOid, priority);
244 }
245 assertEquals(text, 0, this.updateInterceptor.waitOnUpdate(sleep, msgOid, Constants.STATE_OK));
246 this.updateInterceptor.clear();
247
248
249 // Now reestablish the callback server ...
250 I_CallbackServer cbServer = null;
251 try {
252 updateMsgs = new MsgInterceptor(glob, log, null); // just used as message container
253 this.updateInterceptor.clear();
254 try {
255 cbServer = new XmlRpcCallbackServer();
256 CallbackAddress cbAddress = new CallbackAddress(glob);
257 cbServer.initialize(this.glob, name, cbAddress, new AbstractCallbackExtended(glob) {
258 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
259 try {
260 String contentStr = new String(content);
261 String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;
262 log.info("Receiving update of a message oid=" + updateKey.getOid() +
263 " priority=" + updateQos.getPriority() +
264 " state=" + updateQos.getState() +
265 " content=" + cont);
266 if (!updateQos.isErased()) {
267 updateMsgs.add(new Msg(cbSessionId, updateKey, content, updateQos));
268 }
269 }
270 catch (Throwable e) {
271 log.severe("Error in update method: " + e.toString());
272 e.printStackTrace();
273 }
274 return "";
275 }
276 public I_ClientPlugin getSecurityPlugin() { return null; }
277 public void lostConnection(XmlBlasterException xmlBlasterException) {}
278 }); // Establish new callback server
279 }
280 catch (Throwable e) {
281 log.severe("Can't restart callback server: " + e.toString());
282 fail("Can't restart callback server: " + e.toString());
283 }
284
285 log.info("Waiting long enough that xmlBlaster reconnected to us and expecting the 6 queued messages ...");
286 try { Thread.sleep(3000L); } catch( InterruptedException i) {}
287 assertEquals(text, 0, this.updateInterceptor.getMsgs().length);
288 assertEquals(text, 6, updateMsgs.getMsgs(msgOid, Constants.STATE_OK).length);
289 Msg[] msgArr = updateMsgs.getMsgs();
290 assertEquals(text, 6, msgArr.length);
291 int lastNum = -1;
292 int lastPrio = PriorityEnum.MAX_PRIORITY.getInt() + 1;
293 for (int i=0; i<msgArr.length; i++) {
294 int currPrio = msgArr[i].getUpdateQos().getPriority().getInt();
295 int currNum = msgArr[i].getContentInt();
296 if (lastPrio < currPrio || lastPrio == currPrio && lastNum >= currNum)
297 fail(text + " Sequence is not ascending: last=" + lastNum + " curr=" + currNum);
298 lastNum = currNum;
299 lastPrio = currPrio;
300 }
301 assertEquals("", PriorityEnum.MAX_PRIORITY, msgArr[0].getUpdateQos().getPriority());
302 assertEquals("", 4, msgArr[5].getUpdateQos().getPriority().getInt());
303 updateMsgs.clear();
304 this.updateInterceptor.clear();
305 }
306 finally {
307 if (cbServer != null) {
308 try { cbServer.shutdown(); } catch (Exception e) { log.severe(e.toString()); };
309 }
310 }
311
312 log.info("Success in testPriorizedDispatchPluginConnectionState()");
313 }
314
315 /**
316 * Tears down the fixture.
317 * <p />
318 * cleaning up .... erase() the previous message OID and logout
319 */
320 protected void tearDown() {
321 try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait some time
322
323 con.disconnect(null);
324 con = null;
325
326 try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
327 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
328 this.serverThread = null;
329
330 // reset to default server port (necessary if other tests follow in the same JVM).
331 Util.resetPorts(glob);
332 this.glob = null;
333
334 this.connectQos = null;
335 this.con = null;
336 this.updateInterceptor = null;
337 this.updateMsgs = null;
338 Global.instance().shutdown();
339 }
340
341 /**
342 * Method is used by TestRunner to load these tests
343 */
344 public static Test suite() {
345 TestSuite suite= new TestSuite();
346 suite.addTest(new TestPriorizedDispatchWithLostCallback(Global.instance(), "testPriorizedDispatchPluginConnectionState", "PriorizedDispatchPluginRecovery"));
347 return suite;
348 }
349
350 /**
351 * Invoke:
352 * <pre>
353 * java org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback -logging/org.xmlBlaster.engine.dispatch FINE -logging/org.xmlBlaster.util.dispatch FINE -logging/org.xmlBlaster.engine FINEST
354 * java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.dispatch.TestPriorizedDispatchWithLostCallback
355 * <pre>
356 */
357 public static void main(String args[]) {
358 Global glob = new Global();
359 if (glob.init(args) != 0) {
360 System.exit(0);
361 }
362 TestPriorizedDispatchWithLostCallback testSub = new TestPriorizedDispatchWithLostCallback(glob, "TestPriorizedDispatchWithLostCallback", "TestPriorizedDispatchWithLostCallback");
363 testSub.setUp();
364 testSub.testPriorizedDispatchPluginConnectionState();
365 testSub.tearDown();
366 }
367 }
syntax highlighted by Code2HTML, v. 0.9.1