1 /*------------------------------------------------------------------------------
2 Name: TestSub.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Demo code for a client using xmlBlaster
6 Version: $Id: TestSub.java 14833 2006-03-06 21:38:58Z laghi $
7 ------------------------------------------------------------------------------*/
8 package org.xmlBlaster.test.qos;
9
10 import java.util.logging.Logger;
11
12 import junit.framework.Test;
13 import junit.framework.TestCase;
14 import junit.framework.TestSuite;
15
16 import org.xmlBlaster.client.I_Callback;
17 import org.xmlBlaster.client.I_XmlBlasterAccess;
18 import org.xmlBlaster.client.key.PublishKey;
19 import org.xmlBlaster.client.key.SubscribeKey;
20 import org.xmlBlaster.client.key.UpdateKey;
21 import org.xmlBlaster.client.qos.ConnectQos;
22 import org.xmlBlaster.client.qos.EraseReturnQos;
23 import org.xmlBlaster.client.qos.PublishQos;
24 import org.xmlBlaster.client.qos.PublishReturnQos;
25 import org.xmlBlaster.client.qos.SubscribeQos;
26 import org.xmlBlaster.client.qos.SubscribeReturnQos;
27 import org.xmlBlaster.client.qos.UpdateQos;
28 import org.xmlBlaster.util.Global;
29 import org.xmlBlaster.util.MsgUnit;
30 import org.xmlBlaster.util.XmlBlasterException;
31 import org.xmlBlaster.util.qos.address.CallbackAddress;
32
33 /**
34 * This client tests the method subscribe() with a later publish() with XPath
35 * query. <br />
36 * The subscribe() should be recognized for this later arriving publish()
37 * <p>
38 * This client may be invoked multiple time on the same xmlBlaster server, as it
39 * cleans up everything after his tests are done.
40 * <p>
41 * Invoke examples:<br />
42 *
43 * <pre>
44 * java junit.textui.TestRunner org.xmlBlaster.test.qos.TestSub
45 * java junit.swingui.TestRunner org.xmlBlaster.test.qos.TestSub
46 * </pre>
47 */
48 public class TestSubNewestOnly extends TestCase implements I_Callback {
49 private static String ME = "TestSubNewestOnly";
50 private final Global glob;
51 private static Logger log = Logger.getLogger(TestSubNewestOnly.class
52 .getName());
53 private boolean messageArrived;
54 private String subscribeOid;
55 private String publishOid = "topic_TestSubNewestOnly";
56 private I_XmlBlasterAccess senderConnection;
57 private String senderName = "Publisher";
58 private String senderContent = "";
59 private final String receiverName = "Subscriber/session/1";
60 private I_XmlBlasterAccess receiverConnection;
61 private int numReceived = 0;
62
63 /**
64 * Constructs the TestSub object.
65 * <p />
66 *
67 * @param testName
68 * The name used in the test suite
69 */
70 public TestSubNewestOnly(Global glob, String testName) {
71 super(testName);
72 this.glob = glob;
73 }
74
75 /**
76 * Connect to xmlBlaster and login
77 */
78 protected void setUp() {
79 String passwd = "secret";
80 try {
81 Global senderGlob = glob.getClone(null);
82 senderConnection = senderGlob.getXmlBlasterAccess();
83 senderConnection.connect(new ConnectQos(senderGlob, senderName,
84 passwd), this);
85 erase(false);
86 } catch (Exception e) {
87 log.severe("Login failed: " + e.toString());
88 e.printStackTrace();
89 assertTrue("Login failed: " + e.toString(), false);
90 }
91
92 connectSubscriber();
93 }
94
95 private void connectSubscriber() {
96 String passwd = "secret";
97 try {
98 Global receiverGlob = glob.getClone(null);
99 receiverConnection = receiverGlob.getXmlBlasterAccess();
100 ConnectQos connectQos = new ConnectQos(receiverGlob, receiverName,
101 passwd);
102 CallbackAddress cbProps = new CallbackAddress(new Global());
103 cbProps.setRetries(-1);
104 connectQos.addCallbackAddress(cbProps);
105 receiverConnection.connect(connectQos, this);
106 } catch (Exception e) {
107 log.severe("Login failed: " + e.toString());
108 e.printStackTrace();
109 fail("Login failed: " + e.toString());
110 }
111 }
112
113 private void erase(boolean check) {
114 String xmlKey = "<key oid='" + publishOid + "' queryType='EXACT'/>";
115 String qos = "<qos></qos>";
116 try {
117 EraseReturnQos[] arr = senderConnection.erase(xmlKey, qos);
118 if (check)
119 assertEquals("Erase", 1, arr.length);
120 } catch (XmlBlasterException e) {
121 fail("Erase XmlBlasterException: " + e.getMessage());
122 }
123 }
124
125 /**
126 * Tears down the fixture.
127 * <p />
128 * cleaning up .... erase() the previous message OID and logout
129 */
130 protected void tearDown() {
131 erase(true);
132 receiverConnection.disconnect(null);
133 senderConnection.disconnect(null);
134 }
135
136 public void subscribe() {
137 SubscribeKey key = new SubscribeKey(receiverConnection.getGlobal(),
138 publishOid);
139 SubscribeQos qos = new SubscribeQos(receiverConnection.getGlobal());
140 qos.setNewestOnly(true);
141 numReceived = 0;
142 subscribeOid = null;
143 try {
144 SubscribeReturnQos subscribeReturnQos = receiverConnection
145 .subscribe(key, qos);
146 subscribeOid = subscribeReturnQos.getSubscriptionId();
147 log.info("Success: Subscribe subscription-id=" + subscribeOid
148 + " done: " + subscribeReturnQos.toXml());
149 } catch (XmlBlasterException e) {
150 log.warning("XmlBlasterException: " + e.getMessage());
151 assertTrue("subscribe - XmlBlasterException: " + e.getMessage(),
152 false);
153 }
154 assertTrue("returned null subscribeOid", subscribeOid != null);
155 assertTrue("returned subscribeOid is empty", 0 != subscribeOid.length());
156 }
157
158 public void publishThree() {
159 int count = 3;
160 for (int i = 0; i < count; i++) {
161 PublishKey publishKey = new PublishKey(
162 senderConnection.getGlobal(), publishOid);
163 PublishQos publishQos = new PublishQos(
164 senderConnection.getGlobal(), publishOid);
165 senderContent = "" + (i + 1);
166 try {
167 MsgUnit msgUnit = new MsgUnit(publishKey,
168 senderContent.getBytes(), publishQos);
169 PublishReturnQos tmp = senderConnection.publish(msgUnit);
170 assertEquals("Wrong publishOid", publishOid, tmp.getKeyOid());
171 log.info("Success: Publishing " + senderContent + " done, returned oid=" + publishOid);
172 } catch (XmlBlasterException e) {
173 log.warning("XmlBlasterException: " + e.getMessage());
174 assertTrue("publish - XmlBlasterException: " + e.getMessage(),
175 false);
176 }
177 }
178 }
179
180 /**
181 * TEST: Construct a message and publish it,<br />
182 * the previous XPath subscription should match and send an update.
183 */
184 public void testNewestOnly() throws InterruptedException {
185 subscribe();
186 receiverConnection.leaveServer(null);
187 Thread.sleep(1000L);
188 assertEquals("numReceived after subscribe", 0, numReceived);
189
190 publishThree();
191 Thread.sleep(1000L);
192 assertEquals("numReceived after subscribe", 0, numReceived);
193
194 connectSubscriber();
195 waitOnUpdate(1000L);
196 assertEquals("numReceived after subscribe", 1, numReceived);
197 }
198
199 /**
200 * This is the callback method invoked from xmlBlaster delivering us a new
201 * asynchronous message.
202 *
203 * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[],
204 * UpdateQos)
205 */
206 public String update(String cbSessionId_, UpdateKey updateKey,
207 byte[] content, UpdateQos updateQos) {
208 String contentStr = new String(content);
209
210 if (updateQos.isErased()) {
211 return "";
212 }
213
214 log.info("Receiving update of message " + updateKey.getOid() + " with content="
215 + contentStr);
216 // log.info("subscribeOid=" + subscribeOid + ":" + updateQos.toXml());
217
218 numReceived += 1;
219
220 // wait that the subscribe() has returned as well
221 for (int ii = 0; ii < 5; ii++) {
222 if (subscribeOid != null)
223 break;
224 try {
225 Thread.sleep(1000L);
226 } catch (InterruptedException i) {
227 }
228 log.info("waiting ...");
229 }
230
231 assertEquals("Wrong sender", senderName, updateQos.getSender()
232 .getLoginName());
233 assertEquals("engine.qos.update.subscriptionId: Wrong subscriptionId",
234 subscribeOid, updateQos.getSubscriptionId());
235 assertEquals("Wrong oid of message returned", publishOid,
236 updateKey.getOid());
237
238 assertEquals("Message content is corrupted", senderContent,
239 contentStr);
240
241 messageArrived = true;
242 return "";
243 }
244
245 /**
246 * Little helper, waits until the variable 'messageArrive' is set to true,
247 * or returns when the given timeout occurs.
248 *
249 * @param timeout
250 * in milliseconds
251 */
252 private void waitOnUpdate(final long timeout) {
253 long pollingInterval = 50L; // check every 0.05 seconds
254 if (timeout < 50)
255 pollingInterval = timeout / 10L;
256 long sum = 0L;
257 while (!messageArrived) {
258 try {
259 Thread.sleep(pollingInterval);
260 } catch (InterruptedException i) {
261 }
262 sum += pollingInterval;
263 if (sum > timeout) {
264 log.warning("Timeout of " + timeout + " occurred");
265 break;
266 }
267 }
268 messageArrived = false;
269 }
270
271 /**
272 * Method is used by TestRunner to load these tests
273 *
274 * <pre>
275 * java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.qos.TestSubNewestOnly
276 * </pre>
277 */
278 public static Test suite() {
279 TestSuite suite = new TestSuite();
280 suite.addTest(new TestSubNewestOnly(new Global(), "testNewestOnly"));
281 return suite;
282 }
283
284 /**
285 * Invoke: java org.xmlBlaster.test.qos.TestSubNewestOnly
286 *
287 * @throws InterruptedException
288 */
289 public static void main(String args[]) throws InterruptedException {
290 Global glob = new Global();
291 if (glob.init(args) != 0) {
292 System.err.println(ME + ": Init failed");
293 System.exit(1);
294 }
295 TestSubNewestOnly testSub = new TestSubNewestOnly(glob,
296 "testNewestOnly");
297 testSub.setUp();
298 testSub.testNewestOnly();
299 testSub.tearDown();
300 }
301 }
syntax highlighted by Code2HTML, v. 0.9.1