1 /*------------------------------------------------------------------------------
2 Name: TestJmsSubscribe.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test.jms;
7
8 import javax.jms.Connection;
9 import javax.jms.ConnectionFactory;
10 import javax.jms.Destination;
11 import javax.jms.JMSException;
12 import javax.jms.Message;
13 import javax.jms.MessageConsumer;
14 import javax.jms.MessageListener;
15 import javax.jms.MessageProducer;
16 import javax.jms.Session;
17 import javax.jms.TextMessage;
18 import javax.naming.InitialContext;
19 import javax.naming.NamingException;
20
21 import org.apache.naming.NamingService;
22 import java.util.logging.Logger;
23 import java.util.logging.Level;
24 import org.xmlBlaster.util.Global;
25
26 import org.xmlBlaster.jms.XBConnectionFactory;
27 import org.xmlBlaster.jms.XBDestination;
28
29 import junit.framework.*;
30
31 /**
32 * Test JmsSubscribe.
33 * <p />
34 * All methods starting with 'test' and without arguments are invoked automatically
35 * <p />
36 * Invoke: java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.classtest.TestJmsSubscribe
37 * @see org.xmlBlaster.util.qos.ConnectQosData
38 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/jms.html" target="others">the jms requirement</a>
39 * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
40 */
41 public class TestJmsSubscribe extends TestCase implements MessageListener {
42 private final static String CONNECTION_FACTORY = "connectionFactory";
43 private final static String TOPIC = "jms-test";
44 protected Global glob;
45 private static Logger log = Logger.getLogger(TestJmsSubscribe.class.getName());
46 int counter = 0, nmax;
47 private Throwable ex;
48
49 private ConnectionFactory factory;
50 private Destination topic;
51 private Connection connection;
52 private Message msg;
53
54 private String[] args;
55 private NamingService namingService;
56
57 class PublisherThread extends Thread {
58
59 private MessageProducer producer;
60 private int numOfPublishes;
61 private long delayBetweenPublishes;
62 private Message msg;
63
64 public PublisherThread(MessageProducer producer, Message msg, int numOfPublishes, long delayBetweenPublishes) {
65 this.producer = producer;
66 this.numOfPublishes = numOfPublishes;
67 this.delayBetweenPublishes = delayBetweenPublishes;
68 this.msg = msg;
69 }
70
71 public void run() {
72 for (int i=0; i < this.numOfPublishes; i++) {
73 try {
74 Thread.sleep(this.delayBetweenPublishes);
75 this.producer.send(this.msg);
76 }
77 catch (Exception ex) {
78 ex.printStackTrace();
79 assertTrue("Exception in publisher thread " + ex.getMessage() , false);
80 }
81 }
82 }
83 }
84
85
86 public TestJmsSubscribe(String name) {
87 super(name);
88 try {
89 this.namingService = new NamingService();
90 this.namingService.start();
91 }
92 catch (Exception ex) {
93 ex.printStackTrace();
94 assertTrue("exception in constructor when starting naming service", false);
95 }
96 }
97
98 public void finalize() {
99 this.namingService.stop();
100 }
101
102 public void prepare(String[] args) {
103 this.args = args;
104 this.glob = new Global(args);
105 }
106
107 public void onMessage(Message message) {
108 try {
109 if (log.isLoggable(Level.FINER)) log.finer("onMessage start");
110 if (message instanceof TextMessage) {
111 this.counter++;
112 log.fine(((TextMessage)message).getText());
113 this.msg = message;
114 // message.acknowledge();
115 }
116 if (log.isLoggable(Level.FINER)) log.finer("onMessage stop");
117 }
118 catch (Throwable ex) {
119 ex.printStackTrace();
120 this.ex = ex;
121 }
122 }
123
124
125 protected void setUp() {
126 this.glob = Global.instance();
127
128 try {
129 adminJmsStart();
130 this.ex = null;
131 try {
132 // TODO Re-introduce these. It seems that the serialization of Global is not
133 // working properly yet.
134
135 //InitialContext ctx = new InitialContext();
136 //this.factory = (XBConnectionFactory)ctx.lookup(CONNECTION_FACTORY);
137 //this.topic = (XBTopic)ctx.lookup(TOPIC);
138 }
139 catch (Exception ex) {
140 ex.printStackTrace();
141 assertTrue("naming exception", false);
142 }
143
144 this.connection = this.factory.createConnection();
145 this.connection.start();
146 this.nmax = 5;
147 this.counter = 0;
148
149 }
150 catch (JMSException ex) {
151 ex.printStackTrace();
152 assertTrue(false);
153 }
154 }
155
156 protected void tearDown() {
157 try {
158 this.connection.close();
159 InitialContext ctx = new InitialContext();
160 ctx.unbind(CONNECTION_FACTORY);
161 ctx.unbind(TOPIC);
162 this.connection = null;
163 }
164 catch (JMSException ex) {
165 ex.printStackTrace();
166 assertTrue(false);
167 }
168 catch (NamingException ex) {
169 ex.printStackTrace();
170 assertTrue("exception when unbinding", false);
171 }
172 }
173
174 protected void adminJmsStart() {
175 try {
176 // System.setProperty("java.naming.factory.initial", "org.apache.naming.modules.memory.MemoryURLContextFactory");
177 // System.setProperty("java.naming.factory.url.pkgs", "org.apache.naming.modules");
178 InitialContext ctx = new InitialContext();
179 String connQosTxt = null;
180 boolean forQueues = false;
181 this.factory = new XBConnectionFactory(connQosTxt, this.args, forQueues);
182 this.topic = new XBDestination(TOPIC, null, false);
183 ctx.bind(CONNECTION_FACTORY, this.factory);
184 ctx.bind(TOPIC, this.topic);
185 }
186 catch (NamingException ex) {
187 ex.printStackTrace();
188 assertTrue("exception occured in testJndi", false);
189 }
190 catch (Exception ex) {
191 ex.printStackTrace();
192 assertTrue("exception when starting naming service", false);
193 }
194 }
195
196 private void async(int ackMode, String type) {
197 // Session.AUTO_ACKNOWLEDGE
198 try {
199 boolean transacted = false;
200 Session consumerSession = connection.createSession(transacted, ackMode);
201 MessageConsumer subscriber = consumerSession.createConsumer(this.topic);
202 subscriber.setMessageListener(this);
203 Session producerSession = connection.createSession(transacted, ackMode);
204 MessageProducer publisher = producerSession.createProducer(this.topic);
205 connection.start();
206
207 for (int i=0; i < this.nmax; i++) {
208 TextMessage msg = producerSession.createTextMessage();
209 msg.setText("this is a " + type + " jms message nr. " + i);
210 publisher.send(this.topic, msg);
211 }
212
213 if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
214 for (int i=0; i < this.nmax; i++) {
215 Thread.sleep(250L);
216 if (this.ex != null) {
217 assertTrue("An exception occured in the onMessage method. It should not. " + this.ex.getMessage(), false);
218 }
219 assertEquals("wrong number of " + type + " messages arrived", i+1, this.counter);
220 this.msg.acknowledge(); // now it should continue
221 }
222 }
223 else {
224 Thread.sleep(1000L);
225 if (this.ex != null) {
226 assertTrue("An exception occured in the onMessage method. It should not. " + this.ex.getMessage(), false);
227 }
228 assertEquals("wrong number of " + type + " messages arrived", this.nmax, this.counter);
229 }
230 this.counter = 0;
231 }
232 catch (Exception ex) {
233 ex.printStackTrace();
234 assertTrue("Exception occured when it should not. " + ex.getMessage(), false);
235 }
236 }
237
238
239
240
241
242 public void dummy() {
243 // Session.AUTO_ACKNOWLEDGE
244 try {
245 boolean transacted = false;
246
247 Session session = connection.createSession(transacted, Session.CLIENT_ACKNOWLEDGE);
248 // String topic = "jms-topic";
249 String topic = null;
250 String sessionName = "hello/1";
251 MessageProducer producer = session.createProducer(new XBDestination(topic, sessionName));
252 // producer.setPriority(PriorityEnum.HIGH_PRIORITY.getInt());
253 // producer.setDeliveryMode(DeliveryMode.PERSISTENT);
254 TextMessage msg = session.createTextMessage();
255 msg.setText("Hallo");
256 producer.send(msg);
257 }
258 catch (Exception ex) {
259 ex.printStackTrace();
260 }
261 }
262
263
264
265
266
267
268 public void testSubClientAck() {
269 async(Session.CLIENT_ACKNOWLEDGE, "clientAcknowledge");
270 }
271
272 public void testSubAutoAck() {
273 async(Session.AUTO_ACKNOWLEDGE, "autoAcknowledge");
274 }
275
276 public void testSubDupsOk() {
277 // TODO remove this comment once DUPS_OK_ACKNOWLEDGE works
278 // async(Session.DUPS_OK_ACKNOWLEDGE, "dupsOkAcknowledge");
279 }
280
281 public void testSyncReceiver() {
282 try {
283 Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
284 MessageConsumer consumer = consumerSession.createConsumer(this.topic);
285 Session publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
286 MessageProducer publisher = publisherSession.createProducer(this.topic);
287 int nmax = 3;
288
289 // test receiveNoWait()
290 TextMessage[] msgIn = new TextMessage[nmax];
291 Message msg2 = null;
292 for (int i=0; i < nmax; i++) {
293 msgIn[i] = publisherSession.createTextMessage();
294 msgIn[i].setText("msg " + i);
295 publisher.send(this.topic, msgIn[i]);
296 }
297 for (int i=0; i < nmax; i++) {
298 msg2 = consumer.receiveNoWait();
299 if (!(msg2 instanceof TextMessage)) {
300 assertTrue("received message if of wrong type, should be TextMessage but is '" + msg2.getClass().getName() + "'", false);
301 }
302 assertEquals("receive(): messages are not the same", msgIn[i].getText(), ((TextMessage)msg2).getText());
303 }
304 msg2 = consumer.receiveNoWait();
305 if (msg2 != null) {
306 assertTrue("no message was sent, so null should have been returned here but it was " + msg.toString(), false);
307 }
308
309 // test receive(long)
310 msgIn = new TextMessage[nmax];
311 for (int i=0; i < nmax; i++) {
312 msgIn[i] = publisherSession.createTextMessage();
313 msgIn[i].setText("msg " + i);
314 publisher.send(this.topic, msgIn[i]);
315 }
316 for (int i=0; i < nmax; i++) {
317 msg2 = consumer.receive(200L);
318 if (!(msg2 instanceof TextMessage)) {
319 assertTrue("received message if of wrong type, should be TextMessage but is '" + msg2.getClass().getName() + "'", false);
320 }
321 assertEquals("receive(): messages are not the same", msgIn[i].getText(), ((TextMessage)msg2).getText());
322 }
323 msg2 = consumer.receive(200L);
324 if (msg2 != null) {
325 assertTrue("no message was sent, so null should have been returned here but it was " + msg.toString(), false);
326 }
327
328 // test receive()
329 msgIn = new TextMessage[nmax];
330 for (int i=0; i < nmax; i++) {
331 msgIn[i] = publisherSession.createTextMessage();
332 msgIn[i].setText("msg " + i);
333 publisher.send(this.topic, msgIn[i]);
334 }
335 for (int i=0; i < nmax; i++) {
336 msg2 = consumer.receive();
337 if (!(msg2 instanceof TextMessage)) {
338 assertTrue("received message if of wrong type, should be TextMessage but is '" + msg2.getClass().getName() + "'", false);
339 }
340 assertEquals("receive(): messages are not the same", msgIn[i].getText(), ((TextMessage)msg2).getText());
341 }
342 //PublisherThread pub = new PublisherThread(publisher, msg, 6, 100L);
343 //pub.start();
344 }
345 catch (Exception ex) {
346 ex.printStackTrace();
347 assertTrue(false);
348 }
349 }
350
351 /**
352 * <pre>
353 * java org.xmlBlaster.test.classtest.TestJmsSubscribe
354 * </pre>
355 */
356 public static void main(String args[])
357 {
358 TestJmsSubscribe test = new TestJmsSubscribe("TestJmsSubscribe");
359 test.prepare(args);
360
361 test.setUp();
362 test.dummy();
363 test.tearDown();
364
365 test.setUp();
366 test.testSubDupsOk();
367 test.tearDown();
368
369 test.setUp();
370 test.testSubAutoAck();
371 test.tearDown();
372
373 test.setUp();
374 test.testSubClientAck();
375 test.tearDown();
376
377 test.setUp();
378 test.testSyncReceiver();
379 test.tearDown();
380 }
381 }
syntax highlighted by Code2HTML, v. 0.9.1