1 /*------------------------------------------------------------------------------
2 Name: TestConsumableQueue.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.distributor;
7
8
9 import java.util.ArrayList;
10
11 import org.xmlBlaster.test.util.Client;
12 import org.xmlBlaster.util.Global;
13 import org.xmlBlaster.util.XmlBlasterException;
14 import org.xmlBlaster.util.def.ErrorCode;
15
16 import junit.framework.*;
17
18 /**
19 * Test JmsSubscribe.
20 * <p />
21 * All methods starting with 'test' and without arguments are invoked automatically
22 * <p />
23 * Invoke:
24 * java -Djava.compiler= junit.swingui.TestRunner -noloading org.xmlBlaster.test.distributor.TestConsumableQueue
25 * @see org.xmlBlaster.util.qos.ConnectQosData
26 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/jms.html" target="others">the jms requirement</a>
27 * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
28 */
29 public class TestConsumableQueue extends TestCase {
30 private Global global;
31 // as a container and as a latch
32 static ArrayList<String> responses = new ArrayList<String>();
33 private static long WAIT_DELAY = 1000L;
34
35 public TestConsumableQueue(String name) {
36 super(name);
37 }
38
39 public void prepare(String[] args) {
40 this.global = new Global(args);
41 }
42
43 protected void setUp() {
44 this.global = Global.instance();
45
46 responses.clear();
47 }
48
49 protected void tearDown() {
50 }
51
52
53 /**
54 * Two subscribers log subscribe and then a publisher publishes
55 * Only one of the subscribers should get the message.
56 * This should test synchronous distribution
57 */
58 public void testSubSubPub() {
59 try {
60 boolean consumable = true;
61 int session = 1;
62 Client pub1 = new Client(this.global, "pub1", responses);
63 pub1.init("testConsumableQueue", null, consumable, session);
64
65 Client sub1 = new Client(this.global, "sub1", responses);
66 sub1.init(null, "testConsumableQueue", consumable, session);
67 Client sub2 = new Client(this.global, "sub2", responses);
68 sub2.init(null, "testConsumableQueue", consumable, session);
69 Client deadMsg = new Client(this.global, "deadMsg", responses);
70 deadMsg.init(null, "__sys__deadMessage", !consumable, session);
71
72 assertEquals("wrong number of initial responses", 0, responses.size());
73
74 synchronized(responses) {
75 pub1.publish("firstMessage");
76 for (int i=0; i < 1; i++) responses.wait(WAIT_DELAY);
77 Thread.sleep(200L); // wait in case an unexpected update comes in betweeen
78 assertEquals("wrong number of updates", 1, responses.size());
79 }
80 responses.clear();
81
82 synchronized(responses) {
83 sub1.setUpdateException(new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "testSubSubPub"));
84 pub1.publish("firstMessage");
85 for (int i=0; i < 2; i++) responses.wait(WAIT_DELAY);
86 Thread.sleep(200L); // wait in case an unexpected update comes in betweeen
87 assertEquals("wrong number of updates", 2, responses.size());
88 assertEquals("update should be a dead message", "deadMsg", responses.get(1));
89 }
90
91 responses.clear();
92 synchronized(responses) {
93 sub1.setUpdateException(null);
94 sub2.setUpdateException(new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "testSubSubPub"));
95 pub1.publish("firstMessage");
96 for (int i=0; i < 1; i++) responses.wait(WAIT_DELAY);
97 Thread.sleep(200L); // wait in case an unexpected update comes in betweeen
98 assertEquals("wrong number of updates, since the first sub receives, so it should not even try the second", 1, responses.size());
99 }
100
101 /** only one dead message here since the first gives up delivery */
102 responses.clear();
103 synchronized(responses) {
104 sub1.setUpdateException(new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "testSubSubPub"));
105 sub2.setUpdateException(new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "testSubSubPub"));
106 pub1.publish("firstMessage");
107 for (int i=0; i < 2; i++) responses.wait(WAIT_DELAY);
108 Thread.sleep(200L); // wait in case an unexpected update comes in betweeen
109 assertEquals("wrong number of updates", 2, responses.size());
110 assertEquals("update should be a dead message", "deadMsg", responses.get(1));
111 }
112
113 sub1.shutdown(false);
114 sub2.shutdown(false);
115 pub1.shutdown(true);
116 deadMsg.shutdown(false);
117 }
118 catch (Exception ex) {
119 ex.printStackTrace();
120 assertTrue(false);
121 }
122 }
123
124 /**
125 * A publisher publishes and then one subscriber subscribe
126 * The subscriber should get the message.
127 * This should test asynchronous distribution
128 */
129 public void testPubSub() {
130 try {
131 boolean consumable = true;
132 int session = 1;
133 Client pub1 = new Client(this.global, "pub1", responses);
134 pub1.init("testConsumableQueue", null, consumable, session);
135
136 pub1.publish("firstMessage");
137
138 Client sub1 = new Client(this.global, "sub1", responses);
139 assertEquals("wrong number of initial responses", 0, responses.size());
140
141 synchronized(responses) {
142 sub1.init(null, "testConsumableQueue", consumable, session);
143
144 responses.wait(WAIT_DELAY);
145 Thread.sleep(1000L); // wait in case an unexpected update comes in betweeen
146 assertEquals("wrong number of updates", 1, responses.size());
147 }
148
149 sub1.shutdown(false);
150 pub1.shutdown(true);
151 }
152 catch (Exception ex) {
153 ex.printStackTrace();
154 assertTrue(false);
155 }
156 }
157
158 /**
159 * A publisher publishes and then two subscribers log subscribe
160 * Only one of the subscribers should get the message.
161 * This should test asynchronous distribution
162 */
163 public void testPubSubSub() {
164 try {
165 boolean consumable = true;
166 int session = 1;
167 Client pub1 = new Client(this.global, "pub1", responses);
168 pub1.init("testConsumableQueue", null, consumable, session);
169
170 pub1.publish("firstMessage");
171
172 Client sub1 = new Client(this.global, "sub1", responses);
173 Client sub2 = new Client(this.global, "sub2", responses);
174 assertEquals("wrong number of initial responses", 0, responses.size());
175
176 synchronized(responses) {
177 sub1.init(null, "testConsumableQueue", consumable, session);
178 sub2.init(null, "testConsumableQueue", consumable, session);
179
180 responses.wait(WAIT_DELAY);
181 Thread.sleep(1000L); // wait in case an unexpected update comes in betweeen
182 assertEquals("wrong number of updates", 1, responses.size());
183 }
184
185 sub1.shutdown(false);
186 sub2.shutdown(false);
187 pub1.shutdown(true);
188 }
189 catch (Exception ex) {
190 ex.printStackTrace();
191 assertTrue(false);
192 }
193 }
194
195 /**
196 * <pre>
197 * java org.xmlBlaster.test.classtest.TestConsumableQueue
198 * </pre>
199 */
200 public static void main(String args[]) {
201 TestConsumableQueue test = new TestConsumableQueue("TestConsumableQueue");
202 test.prepare(args);
203
204 test.setUp();
205 test.testSubSubPub();
206 test.tearDown();
207
208 test.setUp();
209 test.testPubSub();
210 test.tearDown();
211
212 test.setUp();
213 test.testPubSubSub();
214 test.tearDown();
215
216 }
217 }
syntax highlighted by Code2HTML, v. 0.9.1