1 /*------------------------------------------------------------------------------
2 Name: TestPtPPersistent.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.client;
7
8 import java.util.logging.Logger;
9 import java.util.logging.Level;
10 import org.xmlBlaster.util.Global;
11 import org.xmlBlaster.util.SessionName;
12 import org.xmlBlaster.client.key.EraseKey;
13 import org.xmlBlaster.client.key.GetKey;
14 import org.xmlBlaster.client.key.PublishKey;
15 import org.xmlBlaster.client.qos.ConnectQos;
16 import org.xmlBlaster.client.qos.EraseQos;
17 import org.xmlBlaster.client.qos.GetQos;
18 import org.xmlBlaster.util.XmlBlasterException;
19 import org.xmlBlaster.util.def.Constants;
20 import org.xmlBlaster.util.property.PropString;
21 import org.xmlBlaster.util.EmbeddedXmlBlaster;
22 import org.xmlBlaster.client.qos.PublishQos;
23 import org.xmlBlaster.client.I_XmlBlasterAccess;
24 import org.xmlBlaster.util.qos.address.Address;
25 import org.xmlBlaster.util.qos.address.Destination;
26 import org.xmlBlaster.util.MsgUnit;
27
28 import org.xmlBlaster.test.Msg;
29 import org.xmlBlaster.test.Util;
30 import org.xmlBlaster.test.MsgInterceptor;
31 import org.xmlBlaster.test.util.PtPDestination;
32
33 import junit.framework.*;
34
35
36 /**
37 * Tests the sending of persistent PtP messages to a session
38 * while resourcea are critical (swapping of all queues and callback
39 * queue overflow) when both the server and client crash.
40 *
41 * Invoke examples:<br />
42 * <pre>
43 * java junit.textui.TestRunner -noloading org.xmlBlaster.test.client.TestPtPPersistent
44 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.client.TestPtPPersistent
45 * </pre>
46 * @see org.xmlBlaster.client.I_XmlBlasterAccess
47 */
48 public class TestPtPPersistent extends TestCase {
49 private static String ME = "TestPtPPersistent";
50 private static final long PUB_DELAY=250L;
51 private Global glob;
52 private static Logger log = Logger.getLogger(TestPtPPersistent.class.getName());
53
54 private int serverPort = 7674;
55 private EmbeddedXmlBlaster serverThread;
56
57 private MsgInterceptor updateInterceptor;
58 private String senderName;
59
60 private final long reconnectDelay = 500L;
61 private PtPDestination destination;
62
63 public TestPtPPersistent(String testName) {
64 this(null, testName);
65 }
66
67 public TestPtPPersistent(Global glob, String testName) {
68 super(testName);
69 this.glob = glob;
70 this.senderName = testName;
71 }
72
73 /**
74 * Sets up the fixture.
75 * <p />
76 * Connect to xmlBlaster and login
77 */
78 protected void setUp() {
79 this.glob = (this.glob == null) ? Global.instance() : this.glob;
80
81 glob.init(Util.getOtherServerPorts(serverPort));
82
83 serverThread = EmbeddedXmlBlaster.startXmlBlaster(glob);
84 log.info("XmlBlaster is ready for testing on bootstrapPort " + serverPort);
85 try {
86 I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess(); // Find orb
87 String passwd = "secret";
88 ConnectQos connectQos = new ConnectQos(glob, senderName, passwd); // == "<qos>...</qos>";
89 // Setup fail save handling for connection ...
90 Address addressProp = new Address(glob);
91 addressProp.setDelay(reconnectDelay); // retry connecting every 2 sec
92 addressProp.setRetries(-1); // -1 == forever
93 addressProp.setPingInterval(500L); // switched off
94 connectQos.setAddress(addressProp);
95 con.connect(connectQos, null); // Login to xmlBlaster, register for updates
96 }
97 catch (XmlBlasterException e) {
98 log.warning("setUp() - login failed: " + e.getMessage());
99 fail("setUp() - login fail: " + e.getMessage());
100 }
101 catch (Exception e) {
102 log.severe("setUp() - login failed: " + e.toString());
103 e.printStackTrace();
104 fail("setUp() - login fail: " + e.toString());
105 }
106 }
107
108 /**
109 * Tears down the fixture.
110 * <p />
111 * cleaning up .... erase() the previous message OID and logout
112 */
113 protected void tearDown() {
114 I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
115 try {
116 log.info("Entering tearDown(), test is finished");
117 PropString defaultPlugin = new PropString("CACHE,1.0");
118 String propName = defaultPlugin.setFromEnv(this.glob, glob.getStrippedId(), null, "persistence", Constants.RELATING_TOPICSTORE, "defaultPlugin");
119 log.info("Lookup of propName=" + propName + " defaultValue=" + defaultPlugin.getValue());
120 EraseKey eraseKey = new EraseKey(this.glob, "//airport", "XPATH");
121 EraseQos eraseQos = new EraseQos(this.glob);
122 con.erase(eraseKey, eraseQos);
123 }
124 catch (XmlBlasterException ex) {
125 ex.printStackTrace();
126 }
127 finally {
128 con.disconnect(null);
129 try {
130 Thread.sleep(1000L);
131 }
132 catch (Exception ex) {
133 }
134 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
135 this.serverThread = null;
136 // reset to default server bootstrapPort (necessary if other tests follow in the same JVM).
137 Util.resetPorts(glob);
138 this.glob = null;
139 con = null;
140 Global.instance().shutdown();
141 }
142 }
143
144 /**
145 * TEST: Construct a message and publish it.
146 *
147 * <p />
148 */
149 public void doPublish(int counter, String oid, boolean doGc, long sleep) throws XmlBlasterException {
150 String content = "" + counter;
151 log.info("Publishing message " + content);
152
153 PublishKey key = new PublishKey(this.glob);
154 if (oid != null) key.setOid(oid);
155 key.setClientTags("<airport/>");
156 PublishQos qos = new PublishQos(glob);
157 qos.setPersistent(true);
158 qos.setVolatile(true);
159 qos.addDestination(new Destination(new SessionName(this.glob, "joe")));
160 MsgUnit msgUnit = new MsgUnit(key, content, qos);
161
162 this.glob.getXmlBlasterAccess().publish(msgUnit);
163 if (doGc) Util.gc(2);
164 try {
165 Thread.sleep(sleep);
166 }
167 catch (Exception ex) {
168 }
169 log.info("Success: Publishing of " + content + " done");
170 }
171
172 public void testPersistentPtPOneOidWithGc() {
173 persistentPtP("persistentPtP", true);
174 }
175
176 public void testPersistentPtPOneOidNoGc() {
177 persistentPtP("persistentPtP", false);
178 }
179
180 public void testPersistentPtPNoOidWithGc() {
181 persistentPtP(null, true);
182 }
183
184 public void testPersistentPtPNoOidNoGc() {
185 persistentPtP(null, false);
186 }
187 /**
188 * TEST: <br />
189 * Sets up a PtP destination (a subject)
190 *
191 */
192 public void persistentPtP(String oid, boolean doGc) {
193 long cbMaxEntries = 3;
194 long cbMaxEntriesCache = 2;
195 long subjMaxEntries = 3;
196 long subjMaxEntriesCache = 2;
197
198 long exLimit = cbMaxEntries + subjMaxEntries + 2;
199
200
201 this.destination = new PtPDestination(this.glob, "joe/1");
202 /** wants PtP messages and does not shutdown */
203 boolean wantsPtP = true;
204 boolean shutdownCB = false;
205 try {
206 this.destination.init(wantsPtP, shutdownCB, cbMaxEntries, cbMaxEntriesCache, subjMaxEntries, subjMaxEntriesCache);
207 }
208 catch (XmlBlasterException ex) {
209 assertTrue("an exception while initing the destination should not occur " + ex.getMessage(), false);
210 }
211
212 for (int i=0; i < exLimit; i++) {
213 try {
214 doPublish(i, oid, doGc, PUB_DELAY);
215 }
216 catch (Exception ex) {
217 assertTrue("an exception on publish '" + i + "' should not occur " + ex.getMessage(), false);
218 }
219 }
220
221 int ret = this.destination.getUpdateInterceptor().waitOnUpdate(3000L*exLimit, (int)exLimit);
222 assertEquals("wrong number of entries arrived", (int)exLimit, ret);
223 // wait half a second more to see if more messages come
224 ret = this.destination.getUpdateInterceptor().waitOnUpdate(500L, (int)exLimit+1);
225 assertEquals("wrong number of entries arrived", (int)exLimit, ret);
226
227 Msg[] msg = this.destination.getUpdateInterceptor().getMsgs();
228 assertEquals("wrong number of messages", exLimit, msg.length);
229 for (int i=0; i < exLimit; i++) {
230 assertEquals("wrong message sequence at ", i, msg[i].getContentInt());
231 }
232 this.destination.getUpdateInterceptor().clear();
233
234 // now stop the receiver by shutting down its cbServer and fill cbQueue and subjQueue
235 this.destination.getConnection().leaveServer(null);
236
237 for (long i=exLimit; i < 2 * exLimit; i++) {
238 try {
239 doPublish((int)i, oid, doGc, PUB_DELAY);
240 }
241 catch (XmlBlasterException ex) {
242 assertTrue("an exception on publish '" + i + "' should not occur " + ex.getMessage(), false);
243 }
244 }
245 this.destination.check(250L, 0);
246
247 for (long i=2*exLimit; i < 2*exLimit + 2; i++) {
248 try {
249 doPublish((int)i, oid, doGc, PUB_DELAY);
250 assertTrue("an exception on publish '" + i + "' should have occurred ", false);
251 }
252 catch (XmlBlasterException ex) {
253 log.info("this is an allowed exception since queues are overflown");
254 }
255 }
256
257 this.destination.check(250L, 0);
258
259 // stop and restart the server
260 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
261 this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(Util.getOtherServerPorts(this.serverPort));
262
263 // reconnect to server (for the destination, the publisher never left)
264 this.destination = new PtPDestination(this.glob, "joe/1");
265 /** wants PtP messages and does not shutdown */
266
267 try {
268 // we pass -1 -1 for the subject queue to avoid reconviguration
269 // otherwise it will shut down the callback
270 this.destination.init(true, false, cbMaxEntries, cbMaxEntriesCache, subjMaxEntries, subjMaxEntriesCache);
271 Thread.sleep(1000L);
272 }
273 catch (Exception ex) {
274 assertTrue("an exception while initing the destination should not occur " + ex.getMessage(), false);
275 }
276
277 for (long i=2*exLimit; i < 3*exLimit; i++) {
278 try {
279 doPublish((int)i, oid, doGc, PUB_DELAY);
280 }
281 catch (Exception ex) {
282 assertTrue("an exception on publish '" + i + "' should not occur " + ex.getMessage(), false);
283 }
284 }
285
286 ret = this.destination.getUpdateInterceptor().waitOnUpdate(3000L*exLimit, (int)(2*exLimit));
287 assertEquals("wrong number of messages arrived", (int)2*exLimit, ret);
288 ret = this.destination.getUpdateInterceptor().waitOnUpdate(1000L, (int)(2*exLimit));
289 assertEquals("wrong number of messages arrived", (int)(2*exLimit), ret);
290
291 msg = this.destination.getUpdateInterceptor().getMsgs();
292 if (oid != null) { // if oid is different sequence is not garanteed
293 for (int i=0; i < msg.length; i++) {
294 assertEquals("wrong message sequence (number of entries arrived: " + msg.length + ") ", i+(int)exLimit, msg[i].getContentInt());
295 }
296 }
297 if ((long)msg.length != exLimit*2) {
298 try {
299 GetKey getKey = new GetKey(this.glob, "__cmd?dump");
300 GetQos getQos = new GetQos(this.glob);
301 MsgUnit[] tmp = this.glob.getXmlBlasterAccess().get(getKey, getQos);
302 if (tmp.length > 0)
303 log.info(tmp[0].getContentStr());
304 }
305 catch (XmlBlasterException ex) {
306 ex.printStackTrace();
307 }
308 }
309 assertEquals("wrong number of entries arrived", exLimit*2, (long)msg.length);
310 this.destination.getUpdateInterceptor().clear();
311 this.destination.shutdown(true);
312 this.destination = null;
313 }
314
315
316 /**
317 * Invoke: java org.xmlBlaster.test.client.TestPtPPersistent
318 * <p />
319 * @deprecated Use the TestRunner from the testsuite to run it:<p />
320 * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.client.TestPtPPersistent</pre>
321 */
322 public static void main(String args[])
323 {
324 Global glob = new Global();
325 if (glob.init(args) != 0) {
326 System.out.println(ME + ": Init failed");
327 System.exit(1);
328 }
329
330 TestPtPPersistent test = new TestPtPPersistent(glob, "TestPtPPersistent/1");
331
332 test.setUp();
333 test.testPersistentPtPOneOidWithGc();
334 test.tearDown();
335
336 test.setUp();
337 test.testPersistentPtPOneOidNoGc();
338 test.tearDown();
339
340 test.setUp();
341 test.testPersistentPtPNoOidWithGc();
342 test.tearDown();
343
344 test.setUp();
345 test.testPersistentPtPNoOidNoGc();
346 test.tearDown();
347
348 }
349 }
syntax highlighted by Code2HTML, v. 0.9.1