1 package org.xmlBlaster.test.classtest;

  2 
  3 import java.io.FileWriter;

  4 import java.util.logging.Logger;

  5 
  6 import org.xmlBlaster.client.I_Callback;

  7 import org.xmlBlaster.client.I_XmlBlasterAccess;

  8 import org.xmlBlaster.client.key.PublishKey;

  9 import org.xmlBlaster.client.key.SubscribeKey;

 10 import org.xmlBlaster.client.key.UpdateKey;

 11 import org.xmlBlaster.client.qos.ConnectQos;

 12 import org.xmlBlaster.client.qos.DisconnectQos;

 13 import org.xmlBlaster.client.qos.PublishQos;

 14 import org.xmlBlaster.client.qos.SubscribeQos;

 15 import org.xmlBlaster.client.qos.UpdateQos;

 16 import org.xmlBlaster.engine.EventPlugin;

 17 import org.xmlBlaster.test.MsgInterceptor;

 18 import org.xmlBlaster.util.EmbeddedXmlBlaster;

 19 import org.xmlBlaster.util.Global;

 20 import org.xmlBlaster.util.MsgUnit;

 21 import org.xmlBlaster.util.XmlBlasterException;

 22 
 23 import junit.framework.TestCase;

 24 
 25 /**
 26  * Test Timeout class (scheduling for timeouts). 
 27  * <p />
 28  * All methods starting with 'test' and without arguments are invoked automatically
 29  * <p />
 30  * Invoke: java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.classtest.EventPluginTest
 31  * @see org.xmlBlaster.util.Timeout
 32  */
 33 public class EventPluginTest extends TestCase implements I_Callback {
 34    private static Logger log = Logger.getLogger(EventPluginTest.class.getName());
 35 
 36    private EmbeddedXmlBlaster embeddedServer;
 37    
 38    public EventPluginTest(String name) {
 39       super(name);
 40    }
 41 
 42    private void writePluginsFile(String port, String eventTypes) {
 43       try {
 44          String filename = System.getProperty("user.home") + "/tmp/xmlBlasterPluginsTest.xml";
 45          FileWriter file = new FileWriter(filename);
 46          file.write("<xmlBlaster>\n");
 47          file.write("<plugin create='true' id='SOCKET' className='org.xmlBlaster.protocol.socket.SocketDriver'>\n");
 48          file.write("   <action do='LOAD' onStartupRunlevel='4' sequence='20' \n");
 49          file.write("           onFail='resource.configuration.pluginFailed'/>\n");
 50          file.write("   <action do='STOP' onShutdownRunlevel='3' sequence='50'/>   \n");
 51          file.write("   <attribute id='port'>" + port + "</attribute>\n");
 52          file.write("   <attribute id='compress/type'></attribute>\n");
 53          file.write("</plugin>\n");
 54          file.write("<plugin create='true' id='coreEvents' className='org.xmlBlaster.engine.EventPlugin'>\n");
 55          file.write("   <action do='LOAD' onStartupRunlevel='8' sequence='4'/>\n");
 56          file.write("   <action do='STOP' onShutdownRunlevel='7' sequence='4'/>\n");
 57          file.write("   <attribute id='eventTypes'>\n");
 58          file.write(eventTypes);
 59          file.write("   </attribute>\n");
 60          file.write("   <attribute id='destination.publish'>\n");
 61          file.write("      publish.content=$_{xml}\n");
 62          file.write("   </attribute>\n");
 63          file.write("</plugin>\n");
 64          file.write("</xmlBlaster>\n");
 65          file.close();
 66       }
 67       catch (Exception ex) {
 68          ex.printStackTrace();
 69          fail(ex.getMessage());
 70       }
 71    }
 72    
 73    private void startServer() {
 74       String dir = System.getProperty("user.home") + "/tmp/";
 75       String[] args = new String[] {
 76             "-pluginsFile",
 77             dir + "/xmlBlasterPluginsTest.xml",
 78             /* "-propertyFile", */
 79             /* dir + "/xmlBlasterTest.properties", */
 80             "-admin.remoteconsole.port", "0",
 81             "-queue/history/maxEntriesCache", "10",
 82             "-queue/history/maxEntries", "10",
 83             "-queue/callback/maxEntriesCache", "10",
 84             "-queue/callback/maxEntries", "10",
 85             "-queue/subject/maxEntriesCache", "10",
 86             "-queue/subject/maxEntries", "10",
 87             "-xmlBlaster/jmx/HtmlAdaptor", "true"
 88       };
 89       this.embeddedServer = EmbeddedXmlBlaster.startXmlBlaster(args);
 90       log.info("The XmlBlaster Server has been started");
 91       if (this.embeddedServer.getMain().isHalted())
 92          fail("The xmlBlaster is not running");
 93    }
 94 
 95 
 96    private void stopServer() {
 97       final boolean sync = true; // shutting down and waiting

 98       if (this.embeddedServer != null)
 99          this.embeddedServer.stopServer(sync);
100       this.embeddedServer = null;
101       log.info("The XmlBlaster Server has been stopped");
102    }
103 
104 
105    /**
106     * Test a simple timeout
107     */
108    public void testRegex() {
109       // regex used: ".*/queue/.*/event/threshold.*"

110       // positive tests

111       String txt = "client/*/session/[publicSessionId]/queue/callback/event/threshold.90%";
112       assertTrue("must be a callback queue plugin", EventPlugin.isQueueEvent(txt));
113       txt = "client/[subjectId]/session/[publicSessionId]/queue/callback/event/threshold.90%";
114       assertTrue("must be a specific callback queue plugin", EventPlugin.isQueueEvent(txt));
115       txt = "topic/[topicId]/queue/history/event/threshold.90%";
116       assertTrue("must be a topic queue plugin", EventPlugin.isQueueEvent(txt));
117       // negative tests

118       txt = "topic/[topicId]/quieue/history/event/threshold.90%"; // must fail

119       assertFalse("must be a topic queue plugin", EventPlugin.isQueueEvent(txt));
120 
121       txt = "topic/[topicId]/quiue/history/event/threshold.90%"; // must fail

122       assertFalse("must be a topic queue plugin", EventPlugin.isQueueEvent(txt));
123 
124       txt = "topic/[topicId]/queue/historyevent/threshold.90%"; // must fail

125       assertFalse("must be a topic queue plugin", EventPlugin.isQueueEvent(txt));
126 
127       txt = "topicqueue/history/event/threshold"; // must fail

128       assertFalse("must be a topic queue plugin", EventPlugin.isQueueEvent(txt));
129       
130       log.info("***EventPluginTest: testTimeout [SUCCESS]");
131    }
132    
133    public void testQueueEventsWithoutWildcards() {
134       try {
135          String userName = "eventTester";
136          String topicName = "eventTest";
137          String sessionId = "1";
138          String port = "7617";
139          String eventTypes = "client/eventTester/session/1/queue/callback/event/threshold.70%,";
140          eventTypes +="client/" + userName + "/session/" + sessionId + "/queue/callback/event/threshold.70%,";
141          eventTypes +="topic/" + topicName + "/queue/history/event/threshold.4";
142          writePluginsFile(port, eventTypes);
143          startServer();
144          String[] args = new String[] {
145                "-dispatch/connection/plugin/socket/port", port,
146                "-dispatch/connection/retries", "-1",
147                "-dispatch/callback/retries", "-1"
148                };
149          {
150             Global global = new Global(args);
151             ConnectQos qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
152             qos.getSessionCbQueueProperty().setMaxEntries(10L);
153             qos.getSessionCbQueueProperty().setMaxEntriesCache(10L);
154             I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
155             conn.connect(qos, this);
156             SubscribeKey subKey = new SubscribeKey(global, topicName);
157             SubscribeQos subQos = new SubscribeQos(global);
158             conn.subscribe(subKey, subQos);
159             // conn.leaveServer(null);

160             DisconnectQos disconnectQos = new DisconnectQos(global);
161             disconnectQos.setLeaveServer(true);
162             conn.disconnect(disconnectQos);
163          }
164 
165          Global secondGlobal = new Global(args);
166          MsgInterceptor msgInterceptor = new MsgInterceptor(secondGlobal, log, null);
167          ConnectQos qos = new ConnectQos(secondGlobal, "tester/1", "secret");
168          I_XmlBlasterAccess conn2 = secondGlobal.getXmlBlasterAccess();
169          conn2.connect(qos, msgInterceptor);
170          SubscribeKey subKey = new SubscribeKey(secondGlobal, "__sys__Event");
171          SubscribeQos subQos = new SubscribeQos(secondGlobal);
172          conn2.subscribe(subKey, subQos);
173          msgInterceptor.clear();
174 
175          {
176             // publish now

177             Global global = new Global(args);
178             qos = new ConnectQos(global, "testPublisher/1", "secret");
179             I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
180             conn.connect(qos, this);
181             PublishKey pubKey = new PublishKey(global, topicName);
182             PublishQos pubQos = new PublishQos(global);
183             for (int i=0; i < 5; i++) {
184                String content = "This is test " + i;
185                conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
186             }
187             
188             int ret = msgInterceptor.waitOnUpdate(3000L, 1);
189             assertEquals("We expected one message for the excess of the history queue", 1, ret);
190             msgInterceptor.clear();
191             for (int i=5; i < 8; i++) {
192                String content = "This is test " + i;
193                conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
194             }
195             ret = msgInterceptor.waitOnUpdate(3000L, 1);
196             assertEquals("We expected one message", 1, ret);
197             msgInterceptor.clear();
198             conn.disconnect(new DisconnectQos(global));
199          }
200 
201          {
202             Global global = new Global(args);
203             qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
204             I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
205             conn.connect(qos, this);
206             Thread.sleep(1000L);
207             conn.disconnect(new DisconnectQos(global));
208          }
209          conn2.disconnect(new DisconnectQos(secondGlobal));
210       }
211       catch (Exception ex) {
212          ex.printStackTrace();
213          fail(ex.getMessage());
214       }
215       finally {
216          stopServer();
217       }
218    }
219 
220    /**
221     * We start an embedded server where we define an EventPlugin to fire on two events:
222     * <ul>
223     *    <li>on all callback queues (all users) 70 % of the maximum has been reached (maximum is 10 Entries)</li>
224     *    <li>on all topics when the history queue reaches 4</li>
225     * </ul>
226     * We then connect one failsafe client, make a subscription and leave the server (without logging out) to keep
227     * the entries in the callback queue (and in the history queue).
228     * <p/>
229     * The second client subscribes to the configured events (this is the client which will get 
230     * the events.
231     * <p/>
232     * A third client publishes 5 messages (which hit the subscription of the first client). 
233     * Such messages fill the callback queue and the history queue.
234     * This shall result in an event coming from the history queue. The callback queue shall not
235     * fire since it has not been exceeded, however the second history queue, the one for the __sys__Event
236     * shall fire since it has exceeded too, so two messages shall arrive.
237     * <p/>
238     */
239    public void testQueueEventsWithWildcards() {
240       try {
241          String userName = "eventTester";
242          String topicName = "eventTest";
243          String sessionId = "1";
244          String port = "7617";
245          String eventTypes = "";
246          eventTypes +="client/*/session/*/queue/callback/event/threshold.70%,";
247          eventTypes +="topic/*/queue/history/event/threshold.4";
248          writePluginsFile(port, eventTypes);
249          startServer();
250          String[] args = new String[] {
251                "-dispatch/connection/plugin/socket/port", port,
252                "-dispatch/connection/retries", "-1",
253                "-dispatch/callback/retries", "-1"
254                };
255          {
256             Global global = new Global(args);
257             ConnectQos qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
258             qos.getSessionCbQueueProperty().setMaxEntries(10L);
259             qos.getSessionCbQueueProperty().setMaxEntriesCache(10L);
260             I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
261             conn.connect(qos, this);
262             SubscribeKey subKey = new SubscribeKey(global, topicName);
263             SubscribeQos subQos = new SubscribeQos(global);
264             conn.subscribe(subKey, subQos);
265             // conn.leaveServer(null);

266             DisconnectQos disconnectQos = new DisconnectQos(global);
267             disconnectQos.setLeaveServer(true);
268             conn.disconnect(disconnectQos);
269          }
270 
271          Global secondGlobal = new Global(args);
272          MsgInterceptor msgInterceptor = new MsgInterceptor(secondGlobal, log, null);
273          ConnectQos qos = new ConnectQos(secondGlobal, "tester/2", "secret");
274          I_XmlBlasterAccess conn2 = secondGlobal.getXmlBlasterAccess();
275          conn2.connect(qos, msgInterceptor);
276          SubscribeKey subKey = new SubscribeKey(secondGlobal, "__sys__Event");
277          SubscribeQos subQos = new SubscribeQos(secondGlobal);
278          conn2.subscribe(subKey, subQos);
279          msgInterceptor.clear();
280 
281          {
282             // publish now

283             Global global = new Global(args);
284             qos = new ConnectQos(global, "testPublisher/1", "secret");
285             I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
286             conn.connect(qos, this);
287             PublishKey pubKey = new PublishKey(global, topicName);
288             PublishQos pubQos = new PublishQos(global);
289             for (int i=0; i < 5; i++) {
290                String content = "This is test " + i;
291                conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
292             }
293             
294             int ret = msgInterceptor.waitOnUpdate(3000L, 1);
295             assertEquals("We expected one message for the excess of the history queue", 1, ret);
296             msgInterceptor.clear();
297             for (int i=5; i < 8; i++) {
298                String content = "This is test " + i;
299                conn.publish(new MsgUnit(pubKey, content.getBytes(), pubQos));
300             }
301             ret = msgInterceptor.waitOnUpdate(3000L, 2);
302             assertEquals("We expected two messages: one for the excess of the callback queue and the other for the excess of the history queue of the __sys__Event topic", 2, ret);
303             msgInterceptor.clear();
304             conn.disconnect(new DisconnectQos(global));
305          }
306 
307          {
308             Global global = new Global(args);
309             qos = new ConnectQos(global, userName + "/" + sessionId, "secret");
310             I_XmlBlasterAccess conn = global.getXmlBlasterAccess();
311             conn.connect(qos, this);
312             Thread.sleep(1000L);
313             conn.disconnect(new DisconnectQos(global));
314          }
315          conn2.disconnect(new DisconnectQos(secondGlobal));
316       }
317       catch (Exception ex) {
318          ex.printStackTrace();
319          fail(ex.getMessage());
320       }
321       finally {
322          stopServer();
323       }
324    }
325 
326    /**
327     * here come the updates for the test client.
328     * @param cbSessionId
329     * @param updateKey
330     * @param content
331     * @param updateQos
332     * @return
333     * @throws XmlBlasterException
334     */
335    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
336       log.info(updateQos.toXml());
337       return "OK";
338    }
339    
340    
341    public static void main(String[] args) {
342       EventPluginTest test = new EventPluginTest("EventPluginTest");
343 
344       try {
345          test.setUp();
346          test.testRegex();
347          test.tearDown();
348 
349          test.setUp();
350          test.testQueueEventsWithWildcards();
351          test.tearDown();
352          
353          test.setUp();
354          test.testQueueEventsWithoutWildcards();
355          test.tearDown();
356          
357       }
358       catch (Exception ex) {
359          ex.printStackTrace();
360       }
361    }
362 
363 
364 }


syntax highlighted by Code2HTML, v. 0.9.1