1 /*------------------------------------------------------------------------------
  2 Name:      TestJmsSubscribe.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.jms;
  7 
  8 import javax.jms.Connection;
  9 import javax.jms.ConnectionFactory;
 10 import javax.jms.Destination;
 11 import javax.jms.JMSException;
 12 import javax.jms.Message;
 13 import javax.jms.MessageConsumer;
 14 import javax.jms.MessageListener;
 15 import javax.jms.MessageProducer;
 16 import javax.jms.Session;
 17 import javax.jms.TextMessage;
 18 import javax.naming.InitialContext;
 19 import javax.naming.NamingException;
 20 
 21 import org.apache.naming.NamingService;
 22 import java.util.logging.Logger;
 23 import java.util.logging.Level;
 24 import org.xmlBlaster.util.Global;
 25 
 26 import org.xmlBlaster.jms.XBConnectionFactory;
 27 import org.xmlBlaster.jms.XBDestination;
 28 
 29 import junit.framework.*;
 30 
 31 /**
 32  * Test JmsSubscribe. 
 33  * <p />
 34  * All methods starting with 'test' and without arguments are invoked automatically
 35  * <p />
 36  * Invoke: java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.classtest.TestJmsSubscribe
 37  * @see org.xmlBlaster.util.qos.ConnectQosData
 38  * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/jms.html" target="others">the jms requirement</a>
 39  * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
 40  */
 41 public class TestJmsSubscribe extends TestCase implements MessageListener {
 42    private final static String CONNECTION_FACTORY = "connectionFactory";
 43    private final static String TOPIC = "jms-test";
 44    protected Global glob;
 45    private static Logger log = Logger.getLogger(TestJmsSubscribe.class.getName());
 46    int counter = 0, nmax;
 47    private Throwable ex;
 48    
 49    private ConnectionFactory factory;
 50    private Destination topic;
 51    private Connection connection;
 52    private Message msg;
 53    
 54    private String[] args;
 55    private NamingService namingService;
 56 
 57    class PublisherThread extends Thread {
 58          
 59       private MessageProducer producer;
 60       private int numOfPublishes;
 61       private long delayBetweenPublishes;
 62       private Message msg;
 63       
 64       public PublisherThread(MessageProducer producer, Message msg, int numOfPublishes, long delayBetweenPublishes) {
 65          this.producer = producer;
 66          this.numOfPublishes = numOfPublishes;
 67          this.delayBetweenPublishes = delayBetweenPublishes;
 68          this.msg = msg;
 69       }
 70       
 71       public void run() {
 72          for (int i=0; i < this.numOfPublishes; i++) {
 73             try {
 74                Thread.sleep(this.delayBetweenPublishes);
 75                this.producer.send(this.msg);
 76             }
 77             catch (Exception ex) {
 78                ex.printStackTrace();
 79                assertTrue("Exception in publisher thread " + ex.getMessage() , false);
 80             }
 81          }
 82       }
 83    }
 84 
 85    
 86    public TestJmsSubscribe(String name) {
 87       super(name);
 88       try {
 89          this.namingService = new NamingService();
 90          this.namingService.start(); 
 91       }
 92       catch (Exception ex) {
 93          ex.printStackTrace();
 94          assertTrue("exception in constructor when starting naming service", false);
 95       }
 96    }
 97 
 98    public void finalize() {
 99       this.namingService.stop(); 
100    }
101 
102    public void prepare(String[] args) {
103       this.args = args;
104       this.glob = new Global(args);
105    }
106 
107    public void onMessage(Message message) {
108       try {
109          if (log.isLoggable(Level.FINER)) log.finer("onMessage start");
110          if (message instanceof TextMessage) {
111             this.counter++;
112             log.fine(((TextMessage)message).getText());
113             this.msg = message;
114             // message.acknowledge();
115          }
116          if (log.isLoggable(Level.FINER)) log.finer("onMessage stop");
117       }
118       catch (Throwable ex) {
119          ex.printStackTrace();
120          this.ex = ex;
121       }
122    }
123 
124 
125    protected void setUp() {
126       this.glob = Global.instance();
127 
128       try {
129          adminJmsStart();
130          this.ex = null;
131          try {
132             // TODO Re-introduce these. It seems that the serialization of Global is not
133             // working properly yet.
134             
135             //InitialContext ctx = new InitialContext();
136             //this.factory = (XBConnectionFactory)ctx.lookup(CONNECTION_FACTORY);
137             //this.topic = (XBTopic)ctx.lookup(TOPIC);
138          }
139          catch (Exception ex) {
140             ex.printStackTrace();
141             assertTrue("naming exception", false);
142          }
143 
144          this.connection = this.factory.createConnection();
145          this.connection.start();
146          this.nmax = 5;
147          this.counter = 0;
148 
149       }
150       catch (JMSException ex) {
151          ex.printStackTrace();
152          assertTrue(false);
153       }
154    }
155 
156    protected void tearDown() {
157       try {
158          this.connection.close();
159          InitialContext ctx = new InitialContext();
160          ctx.unbind(CONNECTION_FACTORY);
161          ctx.unbind(TOPIC);
162          this.connection = null;
163       }
164       catch (JMSException ex) {
165          ex.printStackTrace();
166          assertTrue(false);
167       }
168       catch (NamingException ex) {
169          ex.printStackTrace();
170          assertTrue("exception when unbinding", false);
171       }
172    }
173    
174    protected void adminJmsStart() {
175       try {
176          // System.setProperty("java.naming.factory.initial", "org.apache.naming.modules.memory.MemoryURLContextFactory");
177          // System.setProperty("java.naming.factory.url.pkgs", "org.apache.naming.modules");
178          InitialContext ctx = new InitialContext();
179          String connQosTxt = null;
180          boolean forQueues = false;
181          this.factory = new XBConnectionFactory(connQosTxt, this.args, forQueues);
182          this.topic = new XBDestination(TOPIC, null, false);
183          ctx.bind(CONNECTION_FACTORY, this.factory);            
184          ctx.bind(TOPIC, this.topic);
185       }
186       catch (NamingException ex) {
187          ex.printStackTrace();
188          assertTrue("exception occured in testJndi", false);
189       }
190       catch (Exception ex) {
191          ex.printStackTrace();
192          assertTrue("exception when starting naming service", false);
193       }
194    }
195    
196    private void async(int ackMode, String type) {
197       // Session.AUTO_ACKNOWLEDGE
198       try {
199          boolean transacted = false;
200          Session consumerSession = connection.createSession(transacted, ackMode);
201          MessageConsumer subscriber = consumerSession.createConsumer(this.topic);
202          subscriber.setMessageListener(this);
203          Session producerSession = connection.createSession(transacted, ackMode);
204          MessageProducer publisher = producerSession.createProducer(this.topic);
205          connection.start();
206 
207          for (int i=0; i < this.nmax; i++) {
208             TextMessage msg = producerSession.createTextMessage();
209             msg.setText("this is a " + type + " jms message nr. " + i);
210             publisher.send(this.topic, msg);
211          }
212          
213          if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
214             for (int i=0; i < this.nmax; i++) {
215                Thread.sleep(250L);
216                if (this.ex != null) {
217                   assertTrue("An exception occured in the onMessage method. It should not. " + this.ex.getMessage(), false);
218                }
219                assertEquals("wrong number of " + type + " messages arrived", i+1, this.counter);
220                this.msg.acknowledge(); // now it should continue
221             }
222          }
223          else {
224             Thread.sleep(1000L);
225             if (this.ex != null) {
226                assertTrue("An exception occured in the onMessage method. It should not. " + this.ex.getMessage(), false);
227             }   
228             assertEquals("wrong number of " + type + " messages arrived", this.nmax, this.counter);
229          }
230          this.counter = 0;
231       }
232       catch (Exception ex) {
233          ex.printStackTrace();
234          assertTrue("Exception occured when it should not. " + ex.getMessage(), false);
235       }
236    }
237 
238 
239    
240    
241    
242    public void dummy() {
243       // Session.AUTO_ACKNOWLEDGE
244       try {
245          boolean transacted = false;
246          
247          Session session = connection.createSession(transacted, Session.CLIENT_ACKNOWLEDGE);
248          // String topic = "jms-topic";
249          String topic = null;
250          String sessionName = "hello/1";
251          MessageProducer producer = session.createProducer(new XBDestination(topic, sessionName));
252          // producer.setPriority(PriorityEnum.HIGH_PRIORITY.getInt());
253          // producer.setDeliveryMode(DeliveryMode.PERSISTENT);
254          TextMessage msg = session.createTextMessage();
255          msg.setText("Hallo");
256          producer.send(msg);
257       }
258       catch (Exception ex) {
259          ex.printStackTrace();
260       }
261    }
262 
263    
264    
265    
266    
267    
268    public void testSubClientAck() {
269       async(Session.CLIENT_ACKNOWLEDGE, "clientAcknowledge");
270    }
271 
272    public void testSubAutoAck() {
273       async(Session.AUTO_ACKNOWLEDGE, "autoAcknowledge");
274    }
275    
276    public void testSubDupsOk() {
277       // TODO remove this comment once DUPS_OK_ACKNOWLEDGE works
278       // async(Session.DUPS_OK_ACKNOWLEDGE, "dupsOkAcknowledge");
279    }
280    
281    public void testSyncReceiver() {
282       try {
283          Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
284          MessageConsumer consumer = consumerSession.createConsumer(this.topic);
285          Session publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
286          MessageProducer publisher = publisherSession.createProducer(this.topic);
287          int nmax = 3;
288 
289          // test receiveNoWait()
290          TextMessage[] msgIn = new TextMessage[nmax];
291          Message msg2 = null;
292          for (int i=0; i < nmax; i++) {
293             msgIn[i] = publisherSession.createTextMessage();
294             msgIn[i].setText("msg " + i);
295             publisher.send(this.topic, msgIn[i]);
296          }
297          for (int i=0; i < nmax; i++) {
298             msg2 = consumer.receiveNoWait();
299             if (!(msg2 instanceof TextMessage)) {
300                assertTrue("received message if of wrong type, should be TextMessage but is '" + msg2.getClass().getName() + "'", false);
301             }
302             assertEquals("receive(): messages are not the same", msgIn[i].getText(), ((TextMessage)msg2).getText());         
303          }
304          msg2 = consumer.receiveNoWait();
305          if (msg2 != null) {
306             assertTrue("no message was sent, so null should have been returned here but it was " + msg.toString(), false);
307          }
308          
309          // test receive(long)
310          msgIn = new TextMessage[nmax];
311          for (int i=0; i < nmax; i++) {
312             msgIn[i] = publisherSession.createTextMessage();
313             msgIn[i].setText("msg " + i);
314             publisher.send(this.topic, msgIn[i]);
315          }
316          for (int i=0; i < nmax; i++) {
317             msg2 = consumer.receive(200L);
318             if (!(msg2 instanceof TextMessage)) {
319                assertTrue("received message if of wrong type, should be TextMessage but is '" + msg2.getClass().getName() + "'", false);
320             }
321             assertEquals("receive(): messages are not the same", msgIn[i].getText(), ((TextMessage)msg2).getText());         
322          }
323          msg2 = consumer.receive(200L);
324          if (msg2 != null) {
325             assertTrue("no message was sent, so null should have been returned here but it was " + msg.toString(), false);
326          }
327 
328          // test receive()
329          msgIn = new TextMessage[nmax];
330          for (int i=0; i < nmax; i++) {
331             msgIn[i] = publisherSession.createTextMessage();
332             msgIn[i].setText("msg " + i);
333             publisher.send(this.topic, msgIn[i]);
334          }
335          for (int i=0; i < nmax; i++) {
336             msg2 = consumer.receive();
337             if (!(msg2 instanceof TextMessage)) {
338                assertTrue("received message if of wrong type, should be TextMessage but is '" + msg2.getClass().getName() + "'", false);
339             }
340             assertEquals("receive(): messages are not the same", msgIn[i].getText(), ((TextMessage)msg2).getText());         
341          }
342          //PublisherThread pub = new PublisherThread(publisher, msg, 6, 100L);
343          //pub.start();
344       }
345       catch (Exception ex) {
346          ex.printStackTrace();
347          assertTrue(false);
348       }
349    }
350 
351    /**
352     * <pre>
353     *  java org.xmlBlaster.test.classtest.TestJmsSubscribe
354     * </pre>
355     */
356    public static void main(String args[])
357    {
358       TestJmsSubscribe test = new TestJmsSubscribe("TestJmsSubscribe");
359       test.prepare(args);
360 
361       test.setUp();
362       test.dummy();
363       test.tearDown();
364       
365       test.setUp();
366       test.testSubDupsOk();
367       test.tearDown();
368 
369       test.setUp();
370       test.testSubAutoAck();
371       test.tearDown();
372       
373       test.setUp();
374       test.testSubClientAck();
375       test.tearDown();
376       
377       test.setUp();
378       test.testSyncReceiver();
379       test.tearDown();
380    }
381 }


syntax highlighted by Code2HTML, v. 0.9.1