1 /*------------------------------------------------------------------------------
2 Name: TestReferenceCount.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.topic;
7
8 import java.util.logging.Logger;
9 import java.util.logging.Level;
10 import org.xmlBlaster.util.Global;
11 import org.xmlBlaster.util.XmlBlasterException;
12 import org.xmlBlaster.util.EmbeddedXmlBlaster;
13 import org.xmlBlaster.client.I_Callback;
14 import org.xmlBlaster.client.I_XmlBlasterAccess;
15 import org.xmlBlaster.client.key.UpdateKey;
16 import org.xmlBlaster.client.key.PublishKey;
17 import org.xmlBlaster.client.key.SubscribeKey;
18 import org.xmlBlaster.client.key.EraseKey;
19 import org.xmlBlaster.client.qos.ConnectQos;
20 import org.xmlBlaster.client.qos.PublishQos;
21 import org.xmlBlaster.client.qos.UpdateQos;
22 import org.xmlBlaster.client.qos.SubscribeQos;
23 import org.xmlBlaster.client.qos.EraseQos;
24 import org.xmlBlaster.client.qos.EraseReturnQos;
25 import org.xmlBlaster.client.I_ConnectionStateListener;
26 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
27 import org.xmlBlaster.util.qos.address.CallbackAddress;
28 import org.xmlBlaster.util.MsgUnit;
29
30 import org.xmlBlaster.test.Util;
31 import org.xmlBlaster.test.MsgInterceptor;
32 import junit.framework.*;
33
34
35 /**
36 * Tests the correct reference counting for persistent messages
37 * after recovery.
38 * <pre>
39 * 1. Start xmlBlaster server
40 * 2. Publish two messages
41 * 3. Subscribe to message and block in callback
42 * 4. Kill server and clients
43 * 5. Restart server
44 * 6. Start same subscriber - it will receive the message from 3.
45 * 7. Start another subscriber: we expect an update
46 * If the reference counter is not properly set on recovery
47 * test 7. will fail.
48 * </pre>
49 * Invoke examples:<br />
50 * <pre>
51 * java junit.textui.TestRunner org.xmlBlaster.test.topic.TestReferenceCount
52 * java junit.swingui.TestRunner -noloading org.xmlBlaster.test.topic.TestReferenceCount
53 * </pre>
54 */
55 public class TestReferenceCount extends TestCase implements I_ConnectionStateListener
56 {
57 private static String ME = "TestReferenceCount";
58 private Global glob;
59 private static Logger log = Logger.getLogger(TestReferenceCount.class.getName());
60
61 private int serverPort = 7694;
62 private EmbeddedXmlBlaster serverThread;
63
64 private String oid = "referenceCountMsg";
65
66 class Client {
67 I_XmlBlasterAccess con;
68 MsgInterceptor updateInterceptor;
69 }
70
71 public TestReferenceCount(String testName) {
72 this(null, testName);
73 }
74
75 /**
76 * Constructs the TestReferenceCount object.
77 * <p />
78 * @param testName The name used in the test suite
79 */
80 public TestReferenceCount(Global glob, String testName) {
81 super(testName);
82 this.glob = glob;
83 }
84
85 /**
86 * Sets up the fixture.
87 * <p />
88 * Connect to xmlBlaster and login
89 */
90 protected void setUp() {
91 this.glob = (this.glob == null) ? new Global() : this.glob;
92
93 this.glob.init(Util.getOtherServerPorts(serverPort));
94 }
95
96 /**
97 * Tears down the fixture.
98 * <p />
99 * cleaning up .... erase() the previous message OID and logout
100 */
101 protected void tearDown() {
102 log.info("Entering tearDown(), test is finished");
103
104 if (this.serverThread != null) {
105 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
106 }
107
108 // reset to default server port (necessary if other tests follow in the same JVM).
109 Util.resetPorts(this.glob);
110 Global.instance().shutdown();
111 this.glob = null;
112
113 this.serverThread = null;
114 }
115
116 /**
117 * Create a new connection
118 * @param loginName The login name
119 * @param cb The callback handle or null
120 */
121 private Client doConnect(String loginName, I_Callback cb) {
122 try {
123 Client client = new Client();
124 Global gg = this.glob.getClone(null);
125 ConnectQos connectQos = new ConnectQos(gg, loginName, "secret");
126 CallbackAddress cbAddress = new CallbackAddress(glob);
127 cbAddress.setRetries(-1); // -1 == forever to avoid server side clearing of queue
128 connectQos.addCallbackAddress(cbAddress);
129 client.con = gg.getXmlBlasterAccess();
130 client.con.registerConnectionListener(this);
131 client.updateInterceptor = new MsgInterceptor(gg, log, cb); // Collect received msgs
132 client.con.connect(connectQos, client.updateInterceptor); // Login to xmlBlaster
133 return client;
134 }
135 catch (XmlBlasterException e) {
136 log.warning("doConnect() - login failed: " + e.getMessage());
137 fail(ME+".doConnect() failed: " + e.getMessage());
138 }
139 return null;
140 }
141
142 /**
143 * Subscribe to message.
144 */
145 private void doSubscribe(I_XmlBlasterAccess con) {
146 if (log.isLoggable(Level.FINE)) log.fine("Subscribing using EXACT oid syntax ...");
147 try {
148 SubscribeKey subscribeKey = new SubscribeKey(con.getGlobal(), this.oid);
149 SubscribeQos subscribeQos = new SubscribeQos(con.getGlobal());
150 String subscribeOid = con.subscribe(subscribeKey, subscribeQos).getSubscriptionId();
151 log.info("Success: Subscribe on " + subscribeOid + " done");
152 assertTrue("returned null subscribeOid", subscribeOid != null);
153 } catch(XmlBlasterException e) {
154 log.severe("XmlBlasterException: " + e.getMessage());
155 fail(ME+".doSubscribe() failed: " + e.getMessage());
156 }
157 }
158
159 /**
160 * Construct a message and publish it persistent.
161 */
162 private void doPublish(I_XmlBlasterAccess con) {
163 if (log.isLoggable(Level.FINE)) log.fine("Publishing a message");
164 try {
165 PublishKey publishKey = new PublishKey(con.getGlobal(), this.oid);
166 PublishQos publishQos = new PublishQos(con.getGlobal());
167 publishQos.setPersistent(true);
168 String content = "Hi";
169 MsgUnit msgUnit = new MsgUnit(publishKey, content.getBytes(), publishQos);
170 con.publish(msgUnit);
171 log.info("Success: Publishing of " + this.oid + " done");
172 } catch(XmlBlasterException e) {
173 log.severe("XmlBlasterException: " + e.getMessage());
174 fail(ME+".doPublish() failed: " + e.getMessage());
175 }
176 }
177
178 /**
179 * Erase the message.
180 */
181 private void doErase(I_XmlBlasterAccess con) {
182 if (log.isLoggable(Level.FINE)) log.fine("Erasing ...");
183 try {
184 EraseKey eraseKey = new EraseKey(con.getGlobal(), this.oid);
185 EraseQos eraseQos = new EraseQos(con.getGlobal());
186 eraseQos.setForceDestroy(true);
187 EraseReturnQos[] arr = con.erase(eraseKey, eraseQos);
188 }
189 catch(XmlBlasterException e) {
190 log.severe("XmlBlasterException: " + e.getMessage());
191 fail(ME+".doErase() failed: " + e.getMessage());
192 }
193 }
194
195 /**
196 * Test as described in class javadoc.
197 */
198 public void testReferenceCount() {
199 // long waitTime = 180000L;
200 long waitTime = 1000L;
201 log.info("testReferenceCount START");
202 log.info("STEP1: Start xmlBlaster server");
203 this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.glob);
204
205 log.info("STEP2: Publish a message twice");
206 Client pub = doConnect("publisher", null);
207 doPublish(pub.con);
208 doPublish(pub.con);
209
210 log.info("STEP3: Start subscriber and subscribe and block in callback");
211 Client sub1 = doConnect("subscribe/1", new I_Callback() {
212 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
213 log.info("Receiving update of a message oid=" + updateKey.getOid() +
214 " priority=" + updateQos.getPriority() +
215 " state=" + updateQos.getState() +
216 " we are going to sleep and don't return control to server");
217 try { Thread.sleep(1000000L); } catch( InterruptedException i) {}
218 log.severe("Waking up from sleep");
219 fail("Waking up from sleep");
220 return "";
221 }
222 });
223 doSubscribe(sub1.con);
224 assertEquals("", 1, sub1.updateInterceptor.waitOnUpdate(waitTime, 1));
225 sub1.updateInterceptor.clear();
226
227 log.info("STEP4: Kill server and thereafter the clients");
228 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
229 this.serverThread = null;
230 pub.con.disconnect(null);
231 sub1.con.leaveServer(null);
232
233 log.info("STEP5: Start server and recover message from persistence store");
234 this.serverThread = EmbeddedXmlBlaster.startXmlBlaster(this.glob);
235
236 log.info("STEP6: Start subscriber and expect the last not delivered message to be sent automatically");
237 sub1 = doConnect("subscribe/1", null);
238 assertEquals("", 1, sub1.updateInterceptor.waitOnUpdate(waitTime, 1));
239 sub1.updateInterceptor.clear();
240 sub1.con.disconnect(null);
241
242 log.info("STEP7: Start another subscriber and subscribe");
243 Client sub2 = doConnect("subscribe2", null);
244 doSubscribe(sub2.con);
245 assertEquals("", 1, sub2.updateInterceptor.waitOnUpdate(waitTime, 1));
246 sub2.updateInterceptor.clear();
247
248 log.info("testReferenceCount SUCCESS");
249
250 log.info("STEP8: Cleanup");
251 doErase(sub2.con);
252 sub2.con.disconnect(null);
253
254 EmbeddedXmlBlaster.stopXmlBlaster(this.serverThread);
255 this.serverThread = null;
256 }
257
258 /**
259 * This is the callback method invoked from I_XmlBlasterAccess
260 * informing the client in an asynchronous mode if the connection was established.
261 * <p />
262 * This method is enforced through interface I_ConnectionStateListener
263 */
264 public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
265 log.info("I_ConnectionStateListener-"+connection.getId()+": We were lucky, reconnected to xmlBlaster");
266 }
267
268 public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
269 }
270
271 public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
272 if (log!=null) log.warning("I_ConnectionStateListener-"+connection.getId()+": Lost connection to xmlBlaster");
273 }
274
275 public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
276 if (log!=null) log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
277 }
278
279 /**
280 * Method is used by TestRunner to load these tests
281 */
282 public static Test suite() {
283 TestSuite suite= new TestSuite();
284 suite.addTest(new TestReferenceCount(null, "testReferenceCount"));
285 return suite;
286 }
287
288 /**
289 * Invoke: java org.xmlBlaster.test.topic.TestReferenceCount
290 * @deprecated Use the TestRunner from the testsuite to run it:<p />
291 * <pre> java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.topic.TestReferenceCount</pre>
292 */
293 public static void main(String args[]) {
294 Global glob = new Global();
295 if (glob.init(args) != 0) {
296 System.err.println(ME + ": Init failed");
297 System.exit(1);
298 }
299 TestReferenceCount testSub = new TestReferenceCount(glob, "TestReferenceCount");
300 testSub.setUp();
301 testSub.testReferenceCount();
302 testSub.tearDown();
303 }
304 }
syntax highlighted by Code2HTML, v. 0.9.1