1 package org.xmlBlaster.test.classtest;
2
3 import java.io.FileWriter;
4 import java.util.logging.Logger;
5
6 import org.xmlBlaster.client.I_Callback;
7 import org.xmlBlaster.client.I_XmlBlasterAccess;
8 import org.xmlBlaster.client.key.PublishKey;
9 import org.xmlBlaster.client.key.SubscribeKey;
10 import org.xmlBlaster.client.key.UpdateKey;
11 import org.xmlBlaster.client.qos.ConnectQos;
12 import org.xmlBlaster.client.qos.DisconnectQos;
13 import org.xmlBlaster.client.qos.PublishQos;
14 import org.xmlBlaster.client.qos.SubscribeQos;
15 import org.xmlBlaster.client.qos.UpdateQos;
16 import org.xmlBlaster.engine.EventPlugin;
17 import org.xmlBlaster.test.MsgInterceptor;
18 import org.xmlBlaster.util.EmbeddedXmlBlaster;
19 import org.xmlBlaster.util.Global;
20 import org.xmlBlaster.util.MsgUnit;
21 import org.xmlBlaster.util.XmlBlasterException;
22
23 import junit.framework.TestCase;
24
25 /**
26 * Test Timeout class (scheduling for timeouts).
27 * <p />
28 * All methods starting with 'test' and without arguments are invoked automatically
29 * <p />
30 * Invoke: java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.classtest.EventPluginTest
31 * @see org.xmlBlaster.util.Timeout
32 */
33 public class EventPluginTest extends TestCase implements I_Callback {
34 private static Logger log = Logger.getLogger(EventPluginTest.class.getName());
35
36 private EmbeddedXmlBlaster embeddedServer;
37
38 public EventPluginTest(String name) {
39 super(name);
40 }
41
42 private void writePluginsFile(String port, String eventTypes) {
43 try {
44 String filename = System.getProperty("user.home") + "/tmp/xmlBlasterPluginsTest.xml";
45 FileWriter file = new FileWriter(filename);
46 file.write("<xmlBlaster>\n");
47 file.write("<plugin create='true' id='SOCKET' className='org.xmlBlaster.protocol.socket.SocketDriver'>\n");
48 file.write(" <action do='LOAD' onStartupRunlevel='4' sequence='20' \n");
49 file.write(" onFail='resource.configuration.pluginFailed'/>\n");
50 file.write(" <action do='STOP' onShutdownRunlevel='3' sequence='50'/> \n");
51 file.write(" <attribute id='port'>" + port + "</attribute>\n");
52 file.write(" <attribute id='compress/type'></attribute>\n");
53 file.write("</plugin>\n");
54 file.write("<plugin create='true' id='coreEvents' className='org.xmlBlaster.engine.EventPlugin'>\n");
55 file.write(" <action do='LOAD' onStartupRunlevel='8' sequence='4'/>\n");
56 file.write(" <action do='STOP' onShutdownRunlevel='7' sequence='4'/>\n");
57 file.write(" <attribute id='eventTypes'>\n");
58 file.write(eventTypes);
59 file.write(" </attribute>\n");
60 file.write(" <attribute id='destination.publish'>\n");
61 file.write(" publish.content=$_{xml}\n");
62 file.write(" </attribute>\n");
63 file.write("</plugin>\n");
64 file.write("</xmlBlaster>\n");
65 file.close();
66 }
67 catch (Exception ex) {
68 ex.printStackTrace();
69 fail(ex.getMessage());
70 }
71 }
72
73 private void startServer() {
74 String dir = System.getProperty("user.home") + "/tmp/";
75 String[] args = new String[] {
76 "-pluginsFile",
77 dir + "/xmlBlasterPluginsTest.xml",
78 /* "-propertyFile", */
79 /* dir + "/xmlBlasterTest.properties", */
80 "-admin.remoteconsole.port", "0",
81 "-queue/history/maxEntriesCache", "10",
82 "-queue/history/maxEntries", "10",
83 "-queue/callback/maxEntriesCache", "10",
84 "-queue/callback/maxEntries", "10",
85 "-queue/subject/maxEntriesCache", "10",
86 "-queue/subject/maxEntries", "10",
87 "-xmlBlaster/jmx/HtmlAdaptor", "true"
88 };
89 this.embeddedServer = EmbeddedXmlBlaster.startXmlBlaster(args);
90 log.info("The XmlBlaster Server has been started");
91 if (this.embeddedServer.getMain().isHalted())
92 fail("The xmlBlaster is not running");
93 }
94
95
96 private void stopServer() {
97 final boolean sync = true; // shutting down and waiting
98 if (this.embeddedServer != null)
99 this.embeddedServer.stopServer(sync);
100 this.embeddedServer = null;
101 log.info("The XmlBlaster Server has been stopped");
102 }
103
104
105 /**
106 * Test a simple timeout
107 */
108 public void testRegex() {
109 // regex used: ".*/queue/.*/event/threshold.*"
110 // positive tests
111 String txt = "client/*/session/[publicSessionId]/queue/callback/event/threshold.90%";
112 assertTrue("must be a callback queue plugin", EventPlugin.isQueueEvent(txt));
113 txt = "client/[subjectId]/session/[publicSessionId]/queue/callback/event/threshold.90%";
114 assertTrue("must be a specific callback queue plugin", EventPlugin.isQueueEvent(txt));
115 txt = "topic/[topicId]/queue/history/event/threshold.90%";
116 assertTrue("must be a topic queue plugin", EventPlugin.isQueueEvent(txt));
117 // negative tests
118 txt = "topic/[topicId]/quieue/history/event/threshold.90%"; // must fail
119 assertFalse("must be a topic queue plugin", EventPlugin.isQueueEvent(txt));
120
121 txt = "topic/[topicId]/quiue/history/event/threshold.90%"; // must fail
122 assertFalse("must be a topic queue plugin", EventPlugin.isQueueEvent(txt));
123
124 txt = "topic/[topicId]/queue/historyevent/threshold.90%"; // must fail
125 assertFalse("must be a topic queue plugin", EventPlugin.isQueueEvent(txt));
126
127 txt = "topicqueue/history/event/threshold"; // must fail
128 assertFalse("must be a topic queue plugin", EventPlugin.isQueueEvent(txt));
129
130 log.info("***EventPluginTest: testTimeout [SUCCESS]");
131 }
132
133 public void testQueueEventsWithoutWildcards() {
134 try {
135 String userName = "eventTester";
136 String topicName = "eventTest";
137 String sessionId = "1";
138 String port = "7617";
139 String eventTypes = "client/eventTester/session/1/queue/callback/event/threshold.70%,";
140 eventTypes +="client/" + userName + "/session/" + sessionId + "/queue/callback/event/threshold.70%,";
141 eventTypes +="topic/" + topicName + "/queue/history/event/threshold.4";
142 writePluginsFile(port, eventTypes);
143 startServer();
144 String[] args = new String[] {
145 "-dispatch/connection/plugin/socket/port", port,
146 "-dispatch/connection/retries", "-1",
147 "-dispatch/callback/retries", "-1"
148 };
149 {
150 Global global = new Global(args);
151 ConnectQos qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
152 qos.getSessionCbQueueProperty().setMaxEntries(10L);
153 qos.getSessionCbQueueProperty().setMaxEntriesCache(10L);
154 I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
155 conn.connect(qos, this);
156 SubscribeKey subKey = new SubscribeKey(global, topicName);
157 SubscribeQos subQos = new SubscribeQos(global);
158 conn.subscribe(subKey, subQos);
159 // conn.leaveServer(null);
160 DisconnectQos disconnectQos = new DisconnectQos(global);
161 disconnectQos.setLeaveServer(true);
162 conn.disconnect(disconnectQos);
163 }
164
165 Global secondGlobal = new Global(args);
166 MsgInterceptor msgInterceptor = new MsgInterceptor(secondGlobal, log, null);
167 ConnectQos qos = new ConnectQos(secondGlobal, "tester/1", "secret");
168 I_XmlBlasterAccess conn2 = secondGlobal.getXmlBlasterAccess();
169 conn2.connect(qos, msgInterceptor);
170 SubscribeKey subKey = new SubscribeKey(secondGlobal, "__sys__Event");
171 SubscribeQos subQos = new SubscribeQos(secondGlobal);
172 conn2.subscribe(subKey, subQos);
173 msgInterceptor.clear();
174
175 {
176 // publish now
177 Global global = new Global(args);
178 qos = new ConnectQos(global, "testPublisher/1", "secret");
179 I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
180 conn.connect(qos, this);
181 PublishKey pubKey = new PublishKey(global, topicName);
182 PublishQos pubQos = new PublishQos(global);
183 for (int i=0; i < 5; i++) {
184 String content = "This is test " + i;
185 conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
186 }
187
188 int ret = msgInterceptor.waitOnUpdate(3000L, 1);
189 assertEquals("We expected one message for the excess of the history queue", 1, ret);
190 msgInterceptor.clear();
191 for (int i=5; i < 8; i++) {
192 String content = "This is test " + i;
193 conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
194 }
195 ret = msgInterceptor.waitOnUpdate(3000L, 1);
196 assertEquals("We expected one message", 1, ret);
197 msgInterceptor.clear();
198 conn.disconnect(new DisconnectQos(global));
199 }
200
201 {
202 Global global = new Global(args);
203 qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
204 I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
205 conn.connect(qos, this);
206 Thread.sleep(1000L);
207 conn.disconnect(new DisconnectQos(global));
208 }
209 conn2.disconnect(new DisconnectQos(secondGlobal));
210 }
211 catch (Exception ex) {
212 ex.printStackTrace();
213 fail(ex.getMessage());
214 }
215 finally {
216 stopServer();
217 }
218 }
219
220 /**
221 * We start an embedded server where we define an EventPlugin to fire on two events:
222 * <ul>
223 * <li>on all callback queues (all users) 70 % of the maximum has been reached (maximum is 10 Entries)</li>
224 * <li>on all topics when the history queue reaches 4</li>
225 * </ul>
226 * We then connect one failsafe client, make a subscription and leave the server (without logging out) to keep
227 * the entries in the callback queue (and in the history queue).
228 * <p/>
229 * The second client subscribes to the configured events (this is the client which will get
230 * the events.
231 * <p/>
232 * A third client publishes 5 messages (which hit the subscription of the first client).
233 * Such messages fill the callback queue and the history queue.
234 * This shall result in an event coming from the history queue. The callback queue shall not
235 * fire since it has not been exceeded, however the second history queue, the one for the __sys__Event
236 * shall fire since it has exceeded too, so two messages shall arrive.
237 * <p/>
238 */
239 public void testQueueEventsWithWildcards() {
240 try {
241 String userName = "eventTester";
242 String topicName = "eventTest";
243 String sessionId = "1";
244 String port = "7617";
245 String eventTypes = "";
246 eventTypes +="client/*/session/*/queue/callback/event/threshold.70%,";
247 eventTypes +="topic/*/queue/history/event/threshold.4";
248 writePluginsFile(port, eventTypes);
249 startServer();
250 String[] args = new String[] {
251 "-dispatch/connection/plugin/socket/port", port,
252 "-dispatch/connection/retries", "-1",
253 "-dispatch/callback/retries", "-1"
254 };
255 {
256 Global global = new Global(args);
257 ConnectQos qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
258 qos.getSessionCbQueueProperty().setMaxEntries(10L);
259 qos.getSessionCbQueueProperty().setMaxEntriesCache(10L);
260 I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
261 conn.connect(qos, this);
262 SubscribeKey subKey = new SubscribeKey(global, topicName);
263 SubscribeQos subQos = new SubscribeQos(global);
264 conn.subscribe(subKey, subQos);
265 // conn.leaveServer(null);
266 DisconnectQos disconnectQos = new DisconnectQos(global);
267 disconnectQos.setLeaveServer(true);
268 conn.disconnect(disconnectQos);
269 }
270
271 Global secondGlobal = new Global(args);
272 MsgInterceptor msgInterceptor = new MsgInterceptor(secondGlobal, log, null);
273 ConnectQos qos = new ConnectQos(secondGlobal, "tester/2", "secret");
274 I_XmlBlasterAccess conn2 = secondGlobal.getXmlBlasterAccess();
275 conn2.connect(qos, msgInterceptor);
276 SubscribeKey subKey = new SubscribeKey(secondGlobal, "__sys__Event");
277 SubscribeQos subQos = new SubscribeQos(secondGlobal);
278 conn2.subscribe(subKey, subQos);
279 msgInterceptor.clear();
280
281 {
282 // publish now
283 Global global = new Global(args);
284 qos = new ConnectQos(global, "testPublisher/1", "secret");
285 I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
286 conn.connect(qos, this);
287 PublishKey pubKey = new PublishKey(global, topicName);
288 PublishQos pubQos = new PublishQos(global);
289 for (int i=0; i < 5; i++) {
290 String content = "This is test " + i;
291 conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
292 }
293
294 int ret = msgInterceptor.waitOnUpdate(3000L, 1);
295 assertEquals("We expected one message for the excess of the history queue", 1, ret);
296 msgInterceptor.clear();
297 for (int i=5; i < 8; i++) {
298 String content = "This is test " + i;
299 conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
300 }
301 ret = msgInterceptor.waitOnUpdate(3000L, 2);
302 assertEquals("We expected two messages: one for the excess of the callback queue and the other for the excess of the history queue of the __sys__Event topic", 2, ret);
303 msgInterceptor.clear();
304 conn.disconnect(new DisconnectQos(global));
305 }
306
307 {
308 Global global = new Global(args);
309 qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
310 I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
311 conn.connect(qos, this);
312 Thread.sleep(1000L);
313 conn.disconnect(new DisconnectQos(global));
314 }
315 conn2.disconnect(new DisconnectQos(secondGlobal));
316 }
317 catch (Exception ex) {
318 ex.printStackTrace();
319 fail(ex.getMessage());
320 }
321 finally {
322 stopServer();
323 }
324 }
325
326 /**
327 * here come the updates for the test client.
328 * @param cbSessionId
329 * @param updateKey
330 * @param content
331 * @param updateQos
332 * @return
333 * @throws XmlBlasterException
334 */
335 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
336 log.info(updateQos.toXml());
337 return "OK";
338 }
339
340
341 public static void main(String[] args) {
342 EventPluginTest test = new EventPluginTest("EventPluginTest");
343
344 try {
345 test.setUp();
346 test.testRegex();
347 test.tearDown();
348
349 test.setUp();
350 test.testQueueEventsWithWildcards();
351 test.tearDown();
352
353 test.setUp();
354 test.testQueueEventsWithoutWildcards();
355 test.tearDown();
356
357 }
358 catch (Exception ex) {
359 ex.printStackTrace();
360 }
361 }
362
363
364 }
syntax highlighted by Code2HTML, v. 0.9.1