1 /*------------------------------------------------------------------------------
2 Name: TestReferenceCountSwap.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.topic;
7
8
9 import java.util.logging.Logger;
10 import java.util.logging.Level;
11 import org.xmlBlaster.util.Global;
12 import org.xmlBlaster.util.XmlBlasterException;
13 import org.xmlBlaster.util.def.ErrorCode;
14 import org.xmlBlaster.util.EmbeddedXmlBlaster;
15 import org.xmlBlaster.client.I_Callback;
16 import org.xmlBlaster.client.I_XmlBlasterAccess;
17 import org.xmlBlaster.client.key.UpdateKey;
18 import org.xmlBlaster.client.key.PublishKey;
19 import org.xmlBlaster.client.key.GetKey;
20 import org.xmlBlaster.client.key.SubscribeKey;
21 import org.xmlBlaster.client.key.UnSubscribeKey;
22 import org.xmlBlaster.client.key.EraseKey;
23 import org.xmlBlaster.client.qos.ConnectQos;
24 import org.xmlBlaster.client.qos.GetQos;
25 import org.xmlBlaster.client.qos.GetReturnQos;
26 import org.xmlBlaster.client.qos.PublishQos;
27 import org.xmlBlaster.client.qos.PublishReturnQos;
28 import org.xmlBlaster.client.qos.UpdateQos;
29 import org.xmlBlaster.client.qos.UpdateReturnQos;
30 import org.xmlBlaster.client.qos.SubscribeQos;
31 import org.xmlBlaster.client.qos.SubscribeReturnQos;
32 import org.xmlBlaster.client.qos.EraseQos;
33 import org.xmlBlaster.client.qos.EraseReturnQos;
34 import org.xmlBlaster.client.qos.UnSubscribeQos;
35 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
36 import org.xmlBlaster.client.I_ConnectionStateListener;
37 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
38 import org.xmlBlaster.util.property.Property;
39 import org.xmlBlaster.util.qos.address.Address;
40 import org.xmlBlaster.util.qos.address.CallbackAddress;
41 import org.xmlBlaster.util.MsgUnit;
42
43 import org.xmlBlaster.test.Util;
44 import org.xmlBlaster.test.Msg;
45 import org.xmlBlaster.test.MsgInterceptor;
46 import junit.framework.*;
47
48
49 /**
50 * Tests the correct reference counting for persistent messages
51 * after recovery.
52 * <pre>
53 * 1. Start xmlBlaster server, configure cache size to 2
54 * 2. Publish two messages
55 * 3. Subscribe to message in fail save mode and block in callback
56 * 4. Kill client subscriber
57 * 5. Publish two more message to swap the first two
58 * 6. Start same subscriber without initial update - it will receive the messages from 3.
59 * 7. Start another subscriber with history = 10: we expect 4 updates
60 * If the reference counter is not properly swapped and resored test 6. will fail.
61 * </pre>
62 * Invoke examples:<br />
63 * <pre>
64 * java junit.textui.TestRunner org.xmlBlaster.test.topic.TestReferenceCountSwap
65 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.topic.TestReferenceCountSwap
66 * </pre>
67 */
68 public class TestReferenceCountSwap extends TestCase implements I_ConnectionStateListener
69 {
70 private static String ME = "TestReferenceCountSwap";
71 private Global glob;
72 private static Logger log = Logger.getLogger(TestReferenceCountSwap.class.getName());
73
74 private int serverPort = 7694;
75 private EmbeddedXmlBlaster serverThread;
76
77 private String oid = "referenceCountMsg";
78
79 class Client {
80 I_XmlBlasterAccess con;
81 MsgInterceptor updateInterceptor;
82 }
83
84 public TestReferenceCountSwap(String testName) {
85 this(null, testName);
86 }
87
88 /**
89 * Constructs the TestReferenceCountSwap object.
90 * <p />
91 * @param testName The name used in the test suite
92 */
93 public TestReferenceCountSwap(Global glob, String testName) {
94 super(testName);
95 this.glob = glob;
96 }
97
98 /**
99 * Sets up the fixture.
100 * <p />
101 * Connect to xmlBlaster and login
102 */
103 protected void setUp() {
104 this.glob = (this.glob == null) ? new Global() : this.glob;
105
106 this.glob.init(Util.getOtherServerPorts(serverPort));
107 }
108
109 /**
110 * Tears down the fixture.
111 * <p />
112 * cleaning up .... erase() the previous message OID and logout
113 */
114 protected void tearDown() {
115 log.info("Entering tearDown(), test is finished");
116
117 if (this.serverThread != null) {
118 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
119 }
120
121 // reset to default server port (necessary if other tests follow in the same JVM).
122 Util.resetPorts(this.glob);
123 Global.instance().shutdown();
124 this.glob = null;
125
126 this.serverThread = null;
127 }
128
129 /**
130 * Create a new connection
131 * @param loginName The login name
132 * @param cb The callback handle or null
133 */
134 private Client doConnect(String loginName, I_Callback cb) {
135 try {
136 Client client = new Client();
137 Global gg = this.glob.getClone(null);
138 ConnectQos connectQos = new ConnectQos(gg, loginName, "secret");
139 CallbackAddress cbAddress = new CallbackAddress(glob);
140 cbAddress.setRetries(-1); // -1 == forever to avoid server side clearing of queue
141 connectQos.addCallbackAddress(cbAddress);
142 client.con = gg.getXmlBlasterAccess();
143 client.con.registerConnectionListener(this);
144 client.updateInterceptor = new MsgInterceptor(gg, log, cb); // Collect received msgs
145 client.con.connect(connectQos, client.updateInterceptor); // Login to xmlBlaster
146 return client;
147 }
148 catch (XmlBlasterException e) {
149 log.warning("doConnect() - login failed: " + e.getMessage());
150 fail(ME+".doConnect() failed: " + e.getMessage());
151 }
152 return null;
153 }
154
155 /**
156 * Subscribe to message.
157 */
158 private void doSubscribe(I_XmlBlasterAccess con) {
159 if (log.isLoggable(Level.FINE)) log.fine("Subscribing using EXACT oid syntax ...");
160 try {
161 SubscribeKey subscribeKey = new SubscribeKey(con.getGlobal(), this.oid);
162 SubscribeQos subscribeQos = new SubscribeQos(con.getGlobal());
163 String subscribeOid = con.subscribe(subscribeKey, subscribeQos).getSubscriptionId();
164 log.info("Success: Subscribe on " + subscribeOid + " done");
165 assertTrue("returned null subscribeOid", subscribeOid != null);
166 } catch(XmlBlasterException e) {
167 log.severe("XmlBlasterException: " + e.getMessage());
168 fail(ME+".doSubscribe() failed: " + e.getMessage());
169 }
170 }
171
172 /**
173 * Construct a message and publish it persistent.
174 */
175 private void doPublish(I_XmlBlasterAccess con) {
176 if (log.isLoggable(Level.FINE)) log.fine("Publishing a message");
177 try {
178 PublishKey publishKey = new PublishKey(con.getGlobal(), this.oid);
179 PublishQos publishQos = new PublishQos(con.getGlobal());
180 publishQos.setPersistent(true);
181 String content = "Hi";
182 MsgUnit msgUnit = new MsgUnit(publishKey, content.getBytes(), publishQos);
183 con.publish(msgUnit);
184 log.info("Success: Publishing of " + this.oid + " done");
185 } catch(XmlBlasterException e) {
186 log.severe("XmlBlasterException: " + e.getMessage());
187 fail(ME+".doPublish() failed: " + e.getMessage());
188 }
189 }
190
191 /**
192 * Erase the message.
193 */
194 private void doErase(I_XmlBlasterAccess con) {
195 if (log.isLoggable(Level.FINE)) log.fine("Erasing ...");
196 try {
197 EraseKey eraseKey = new EraseKey(con.getGlobal(), this.oid);
198 EraseQos eraseQos = new EraseQos(con.getGlobal());
199 eraseQos.setForceDestroy(true);
200 EraseReturnQos[] arr = con.erase(eraseKey, eraseQos);
201 }
202 catch(XmlBlasterException e) {
203 log.severe("XmlBlasterException: " + e.getMessage());
204 fail(ME+".doErase() failed: " + e.getMessage());
205 }
206 }
207
208 /**
209 * Test as described in class javadoc.
210 */
211 public void testReferenceCount() {
212 log.info("testReferenceCount START");
213 log.info("STEP1: Start xmlBlaster server");
214 this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(serverPort);
215
216 log.info("STEP2: Publish a message twice");
217 Client pub = doConnect("publisher", null);
218 doPublish(pub.con);
219 doPublish(pub.con);
220
221 log.info("STEP3: Start subscriber and subscribe and block in callback");
222 Client sub1 = doConnect("subscribe/1", new I_Callback() {
223 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
224 log.info("Receiving update of a message oid=" + updateKey.getOid() +
225 " priority=" + updateQos.getPriority() +
226 " state=" + updateQos.getState() +
227 " we going to sleep and don't return control to server");
228 try { Thread.sleep(1000000L); } catch( InterruptedException i) {}
229 log.severe("Waiking up from sleep");
230 fail("Waiking up from sleep");
231 return "";
232 }
233 });
234 doSubscribe(sub1.con);
235 assertEquals("", 1, sub1.updateInterceptor.waitOnUpdate(1000L, 1));
236 sub1.updateInterceptor.clear();
237
238 log.info("STEP4: Kill server and thereafter the clients");
239 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
240 this.serverThread = null;
241 pub.con.disconnect(null);
242 sub1.con.disconnect(null);
243
244 log.info("STEP5: Start server and recover message from persistence store");
245 this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(serverPort);
246
247 log.info("STEP6: Start subscriber and expect the last not delivered message to be sent automatically");
248 sub1 = doConnect("subscribe/1", null);
249 assertEquals("", 1, sub1.updateInterceptor.waitOnUpdate(1000L, 1));
250 sub1.updateInterceptor.clear();
251 sub1.con.disconnect(null);
252
253 log.info("STEP7: Start another subscriber and subscribe");
254 Client sub2 = doConnect("subscribe2", null);
255 doSubscribe(sub2.con);
256 assertEquals("", 1, sub2.updateInterceptor.waitOnUpdate(1000L, 1));
257 sub2.updateInterceptor.clear();
258
259 log.info("testReferenceCount SUCCESS");
260
261 log.info("STEP8: Cleanup");
262 doErase(sub2.con);
263 sub2.con.disconnect(null);
264
265 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
266 this.serverThread = null;
267 }
268
269 /**
270 * This is the callback method invoked from I_XmlBlasterAccess
271 * informing the client in an asynchronous mode if the connection was established.
272 * <p />
273 * This method is enforced through interface I_ConnectionStateListener
274 */
275 public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
276 log.info("I_ConnectionStateListener-"+connection.getId()+": We were lucky, reconnected to xmlBlaster");
277 }
278
279 public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
280 }
281
282 public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
283 if (log!=null) log.warning("I_ConnectionStateListener-"+connection.getId()+": Lost connection to xmlBlaster");
284 }
285
286 public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
287 if (log!=null) log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
288 }
289
290 /**
291 * Method is used by TestRunner to load these tests
292 */
293 public static Test suite() {
294 TestSuite suite= new TestSuite();
295 suite.addTest(new TestReferenceCountSwap(null, "testReferenceCount"));
296 return suite;
297 }
298
299 /**
300 * Invoke: java org.xmlBlaster.test.topic.TestReferenceCountSwap
301 * @deprecated Use the TestRunner from the testsuite to run it:<p />
302 * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.topic.TestReferenceCountSwap</pre>
303 */
304 public static void main(String args[]) {
305 Global glob = new Global();
306 if (glob.init(args) != 0) {
307 System.err.println(ME + ": Init failed");
308 System.exit(1);
309 }
310 TestReferenceCountSwap testSub = new TestReferenceCountSwap(glob, "TestReferenceCountSwap");
311 testSub.setUp();
312 testSub.testReferenceCount();
313 testSub.tearDown();
314 }
315 }
syntax highlighted by Code2HTML, v. 0.9.1