1 /*------------------------------------------------------------------------------
2 Name: TestSubscribeFilter.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.mime;
7
8 import java.util.logging.Logger;
9
10 import junit.framework.Test;
11 import junit.framework.TestCase;
12 import junit.framework.TestSuite;
13
14 import org.xmlBlaster.client.I_Callback;
15 import org.xmlBlaster.client.I_XmlBlasterAccess;
16 import org.xmlBlaster.client.key.UpdateKey;
17 import org.xmlBlaster.client.qos.ConnectQos;
18 import org.xmlBlaster.client.qos.EraseReturnQos;
19 import org.xmlBlaster.client.qos.SubscribeQos;
20 import org.xmlBlaster.client.qos.UpdateQos;
21 import org.xmlBlaster.test.Util;
22 import org.xmlBlaster.util.EmbeddedXmlBlaster;
23 import org.xmlBlaster.util.Global;
24 import org.xmlBlaster.util.MsgUnit;
25 import org.xmlBlaster.util.XmlBlasterException;
26 import org.xmlBlaster.util.def.Constants;
27 import org.xmlBlaster.util.qos.AccessFilterQos;
28
29
30 /**
31 * This client does test login sessions.<br />
32 * login/logout combinations are checked with subscribe()/publish() calls
33 * <p />
34 * This client may be invoked multiple time on the same xmlBlaster server,
35 * as it cleans up everything after his tests are done.
36 * <p>
37 * Invoke examples:<br />
38 * <pre>
39 * java junit.textui.TestRunner -noloading org.xmlBlaster.test.mime.TestSubscribeFilter
40 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.mime.TestSubscribeFilter
41 * </pre>
42 */
43 public class TestSubscribeFilter extends TestCase implements I_Callback
44 {
45 private static String ME = "TestSubscribeFilter";
46 private final Global glob;
47 private static Logger log = Logger.getLogger(TestSubscribeFilter.class.getName());
48
49 private I_XmlBlasterAccess con = null;
50 private String name;
51 private String passwd = "secret";
52 private int numReceived = 0; // error checking
53 private String updateOid;
54 private EmbeddedXmlBlaster serverThread;
55 private int serverPort = 7624;
56 private int filterMessageContentBiggerAs = 10;
57
58 /**
59 * Constructs the TestSubscribeFilter object.
60 * <p />
61 * @param testName The name used in the test suite
62 * @param name The name to login to the xmlBlaster
63 */
64 public TestSubscribeFilter(Global glob, String testName, String name)
65 {
66 super(testName);
67 this.glob = glob;
68
69 this.name = name;
70 }
71
72 /**
73 * Sets up the fixture.
74 * <p />
75 * We start an own xmlBlaster server in a separate thread,
76 * it has configured to load our simple demo MIME filter plugin.
77 * <p />
78 * Then we connect as a client
79 */
80 protected void setUp()
81 {
82 // We register here the demo plugin with xmlBlaster server, supplying an argument to the plugin
83 String[] args = {
84 "-bootstrapPort", // For all protocol we may use set an alternate server port
85 "" + serverPort,
86 "-plugin/socket/port",
87 "" + (serverPort-1),
88 "-plugin/rmi/registryPort",
89 "" + (serverPort-2),
90 "-plugin/xmlrpc/port",
91 "" + (serverPort-3),
92 "-MimeAccessPlugin[ContentLenFilter][1.0]",
93 "org.xmlBlaster.engine.mime.demo.ContentLenFilter,DEFAULT_MAX_LEN=200,THROW_EXCEPTION_FOR_LEN=3",
94 "-admin.remoteconsole.port",
95 "0"
96 };
97 glob.init(args);
98
99 serverThread = EmbeddedXmlBlaster.startXmlBlaster(args);
100 log.info("XmlBlaster is ready for testing subscribe MIME filter");
101
102 try {
103 log.info("Connecting ...");
104 con = glob.getXmlBlasterAccess();
105 ConnectQos qos = new ConnectQos(glob, name, passwd);
106 con.connect(qos, this); // Login to xmlBlaster
107 }
108 catch (Exception e) {
109 Thread.currentThread().dumpStack();
110 log.severe("Can't connect to xmlBlaster: " + e.toString());
111 }
112
113 // Subscribe to a message with a supplied filter
114 try {
115 SubscribeQos qos = new SubscribeQos(glob);
116 qos.addAccessFilter(new AccessFilterQos(glob, "ContentLenFilter", "1.0", ""+filterMessageContentBiggerAs));
117
118 String subscribeOid = con.subscribe("<key oid='MSG'/>", qos.toXml()).getSubscriptionId();
119 log.info("Success: Subscribe subscription-id=" + subscribeOid + " done");
120
121 con.subscribe("<key oid='" + Constants.OID_DEAD_LETTER + "'/>", "<qos/>");
122
123 } catch(XmlBlasterException e) {
124 log.warning("XmlBlasterException: " + e.getMessage());
125 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(), false);
126 }
127 }
128
129 /**
130 * Tears down the fixture.
131 * <p />
132 * cleaning up .... erase() the previous message OID and logout
133 */
134 protected void tearDown()
135 {
136 try { Thread.sleep(200L); } catch( InterruptedException i) {} // Wait 200 milli seconds, until all updates are processed ...
137
138 try {
139 EraseReturnQos[] arr = con.erase("<key oid='MSG'/>", "<qos/>");
140 assertEquals("Erase", 1, arr.length);
141 } catch(XmlBlasterException e) { fail("Erase XmlBlasterException: " + e.getMessage()); }
142
143 con.disconnect(null);
144 con=null;
145
146 try { Thread.sleep(500L); } catch( InterruptedException i) {} // Wait some time
147 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
148 this.serverThread = null;
149
150 // reset to default server port (necessary if other tests follow in the same JVM).
151 Util.resetPorts();
152 }
153
154 /**
155 * First we send a message <= 10 content length which should be updated to us,
156 * then we send a message with 11 bytes in the content which should be filtered
157 */
158 public void testFilter()
159 {
160 log.info("testFilter() with filterMessageContentBiggerAs=" + filterMessageContentBiggerAs + " ...");
161
162 log.info("TEST 1: Testing unfiltered message");
163 try {
164 con.publish(new MsgUnit("<key oid='MSG'/>", "1234567890".getBytes(), null));
165 } catch(XmlBlasterException e) {
166 log.warning("XmlBlasterException: " + e.getMessage());
167 assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
168 }
169 waitOnUpdate(2000L, 1); // message should come back as it is only 10 bytes
170
171
172 log.info("TEST 2: Testing filtered message");
173 try {
174 con.publish(new MsgUnit("<key oid='MSG'/>", "12345678901".getBytes(), null));
175 } catch(XmlBlasterException e) {
176 log.warning("XmlBlasterException: " + e.getMessage());
177 assertTrue("publish - XmlBlasterException: " + e.getMessage(), false);
178 }
179 waitOnUpdate(2000L, 0); // message should be filtered as it is longer 10 bytes
180
181
182 log.info("TEST 3: Test what happens if the plugin throws an exception");
183 try { // see THROW_EXCEPTION_FOR_LEN=3
184 con.publish(new MsgUnit("<key oid='MSG'/>", "123".getBytes(), null));
185 waitOnUpdate(2000L, 1); // a dead message should come
186 assertEquals("", Constants.OID_DEAD_LETTER, updateOid);
187 log.info("SUCCESS: Dead message arrived");
188 } catch(XmlBlasterException e) {
189 fail("publish forced the plugin to throw an XmlBlasterException, but it should not reach the publisher: " + e.toString());
190 }
191 waitOnUpdate(2000L, 0); // no message expected on exception
192
193 log.info("Success in testFilter()");
194 }
195
196 /**
197 * This is the callback method invoked from xmlBlaster
198 * delivering us a new asynchronous message.
199 * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
200 */
201 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos)
202 {
203 log.info("Receiving update of a message '" + updateKey.getOid() + "' state=" + updateQos.getState());
204 updateOid = updateKey.getOid();
205 numReceived++;
206 return "";
207 }
208
209 /**
210 * Little helper, waits until the wanted number of messages are arrived
211 * or returns when the given timeout occurs.
212 * <p />
213 * @param timeout in milliseconds
214 * @param numWait how many messages to wait
215 */
216 private void waitOnUpdate(final long timeout, final int numWait)
217 {
218 long pollingInterval = 50L; // check every 0.05 seconds
219 if (timeout < 50) pollingInterval = timeout / 10L;
220 long sum = 0L;
221 // check if too few are arriving
222 while (numReceived < numWait) {
223 try { Thread.sleep(pollingInterval); } catch( InterruptedException i) {}
224 sum += pollingInterval;
225 assertTrue("Timeout of " + timeout + " occurred without update", sum <= timeout);
226 }
227
228 // check if too many are arriving
229 try { Thread.sleep(timeout); } catch( InterruptedException i) {}
230 assertEquals("Wrong number of messages arrived", numWait, numReceived);
231
232 numReceived = 0;
233 }
234
235 /**
236 * Method is used by TestRunner to load these tests
237 */
238 public static Test suite()
239 {
240 TestSuite suite= new TestSuite();
241 String loginName = "Tim";
242 suite.addTest(new TestSubscribeFilter(new Global(), "testFilter", "Tim"));
243 return suite;
244 }
245
246 /**
247 * Invoke:
248 * <pre>
249 * java org.xmlBlaster.test.mime.TestSubscribeFilter
250 * java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.mime.TestSubscribeFilter
251 * <pre>
252 */
253 public static void main(String args[])
254 {
255 Global glob = new Global();
256 if (glob.init(args) != 0) {
257 System.err.println(ME + ": Init failed");
258 System.exit(1);
259 }
260 TestSubscribeFilter testSub = new TestSubscribeFilter(glob, "TestSubscribeFilter", "Tim");
261 testSub.setUp();
262 testSub.testFilter();
263 }
264 }
syntax highlighted by Code2HTML, v. 0.9.1