1 /*------------------------------------------------------------------------------
  2 Name:      TestConsumableQueue.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test.distributor;
  7 
  8 
  9 import java.util.ArrayList;
 10 
 11 import org.xmlBlaster.test.util.Client;
 12 import org.xmlBlaster.util.Global;
 13 import org.xmlBlaster.util.XmlBlasterException;
 14 import org.xmlBlaster.util.def.ErrorCode;
 15 
 16 import junit.framework.*;
 17 
 18 /**
 19  * Test JmsSubscribe. 
 20  * <p />
 21  * All methods starting with 'test' and without arguments are invoked automatically
 22  * <p />
 23  * Invoke:
 24  *   java -Djava.compiler= junit.swingui.TestRunner -noloading org.xmlBlaster.test.distributor.TestConsumableQueue
 25  * @see org.xmlBlaster.util.qos.ConnectQosData
 26  * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/jms.html" target="others">the jms requirement</a>
 27  * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
 28  */
 29 public class TestConsumableQueue extends TestCase {
 30    private Global global;
 31    // as a container and as a latch
 32    static ArrayList<String> responses = new ArrayList<String>();
 33    private static long WAIT_DELAY = 1000L;
 34 
 35    public TestConsumableQueue(String name) {
 36       super(name);
 37    }
 38 
 39    public void prepare(String[] args) {
 40       this.global = new Global(args);
 41    }
 42 
 43    protected void setUp() {
 44       this.global = Global.instance();
 45 
 46       responses.clear();
 47    }
 48 
 49    protected void tearDown() {
 50    }
 51 
 52    
 53    /**
 54     * Two subscribers log subscribe and then a publisher publishes
 55     * Only one of the subscribers should get the message.
 56     * This should test synchronous distribution
 57     */
 58    public void testSubSubPub() {
 59       try {
 60          boolean consumable = true;
 61          int session = 1;
 62          Client pub1 = new Client(this.global, "pub1", responses);
 63          pub1.init("testConsumableQueue", null, consumable, session);
 64 
 65          Client sub1 = new Client(this.global, "sub1", responses);
 66          sub1.init(null, "testConsumableQueue", consumable, session);
 67          Client sub2 = new Client(this.global, "sub2", responses);
 68          sub2.init(null, "testConsumableQueue", consumable, session);
 69          Client deadMsg = new Client(this.global, "deadMsg", responses);
 70          deadMsg.init(null, "__sys__deadMessage", !consumable, session);
 71          
 72          assertEquals("wrong number of initial responses", 0, responses.size());
 73 
 74          synchronized(responses) {
 75             pub1.publish("firstMessage");
 76             for (int i=0; i < 1; i++) responses.wait(WAIT_DELAY);
 77             Thread.sleep(200L); // wait in case an unexpected update comes in betweeen
 78             assertEquals("wrong number of updates", 1, responses.size());
 79          }
 80          responses.clear();         
 81 
 82          synchronized(responses) {
 83             sub1.setUpdateException(new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "testSubSubPub"));
 84             pub1.publish("firstMessage");
 85             for (int i=0; i < 2; i++) responses.wait(WAIT_DELAY);
 86             Thread.sleep(200L); // wait in case an unexpected update comes in betweeen
 87             assertEquals("wrong number of updates", 2, responses.size());
 88             assertEquals("update should be a dead message", "deadMsg", responses.get(1));
 89          }
 90 
 91          responses.clear();         
 92          synchronized(responses) {
 93             sub1.setUpdateException(null);
 94             sub2.setUpdateException(new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "testSubSubPub"));
 95             pub1.publish("firstMessage");
 96             for (int i=0; i < 1; i++) responses.wait(WAIT_DELAY);
 97             Thread.sleep(200L); // wait in case an unexpected update comes in betweeen
 98             assertEquals("wrong number of updates, since the first sub receives, so it should not even try the second", 1, responses.size());
 99          }
100 
101          /** only one dead message here since the first gives up delivery */         
102          responses.clear();         
103          synchronized(responses) {
104             sub1.setUpdateException(new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "testSubSubPub"));
105             sub2.setUpdateException(new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_ERROR, "testSubSubPub"));
106             pub1.publish("firstMessage");
107             for (int i=0; i < 2; i++) responses.wait(WAIT_DELAY);
108             Thread.sleep(200L); // wait in case an unexpected update comes in betweeen
109             assertEquals("wrong number of updates", 2, responses.size());
110             assertEquals("update should be a dead message", "deadMsg", responses.get(1));
111          }
112          
113          sub1.shutdown(false);
114          sub2.shutdown(false);
115          pub1.shutdown(true);
116          deadMsg.shutdown(false);
117       }
118       catch (Exception ex) {
119          ex.printStackTrace();
120          assertTrue(false);
121       }
122    }
123 
124    /**
125     * A publisher publishes and then one subscriber subscribe 
126     * The subscriber should get the message.
127     * This should test asynchronous distribution
128     */
129    public void testPubSub() {
130       try {
131          boolean consumable = true;
132          int session = 1; 
133          Client pub1 = new Client(this.global, "pub1", responses);
134          pub1.init("testConsumableQueue", null, consumable, session);
135 
136          pub1.publish("firstMessage");
137 
138          Client sub1 = new Client(this.global, "sub1", responses);
139          assertEquals("wrong number of initial responses", 0, responses.size());
140 
141          synchronized(responses) {
142             sub1.init(null, "testConsumableQueue", consumable, session);
143 
144             responses.wait(WAIT_DELAY);
145             Thread.sleep(1000L); // wait in case an unexpected update comes in betweeen
146             assertEquals("wrong number of updates", 1, responses.size());
147          }
148          
149          sub1.shutdown(false);
150          pub1.shutdown(true);
151       }
152       catch (Exception ex) {
153          ex.printStackTrace();
154          assertTrue(false);
155       }
156    }
157 
158    /**
159     * A publisher publishes and then two subscribers log subscribe 
160     * Only one of the subscribers should get the message.
161     * This should test asynchronous distribution
162     */
163    public void testPubSubSub() {
164       try {
165          boolean consumable = true; 
166          int session = 1;
167          Client pub1 = new Client(this.global, "pub1", responses);
168          pub1.init("testConsumableQueue", null, consumable, session);
169 
170          pub1.publish("firstMessage");
171 
172          Client sub1 = new Client(this.global, "sub1", responses);
173          Client sub2 = new Client(this.global, "sub2", responses);
174          assertEquals("wrong number of initial responses", 0, responses.size());
175 
176          synchronized(responses) {
177             sub1.init(null, "testConsumableQueue", consumable, session);
178             sub2.init(null, "testConsumableQueue", consumable, session);
179 
180             responses.wait(WAIT_DELAY);
181             Thread.sleep(1000L); // wait in case an unexpected update comes in betweeen
182             assertEquals("wrong number of updates", 1, responses.size());
183          }
184          
185          sub1.shutdown(false);
186          sub2.shutdown(false);
187          pub1.shutdown(true);
188       }
189       catch (Exception ex) {
190          ex.printStackTrace();
191          assertTrue(false);
192       }
193    }
194 
195    /**
196     * <pre>
197     *  java org.xmlBlaster.test.classtest.TestConsumableQueue
198     * </pre>
199     */
200    public static void main(String args[]) {
201       TestConsumableQueue test = new TestConsumableQueue("TestConsumableQueue");
202       test.prepare(args);
203 
204       test.setUp();
205       test.testSubSubPub();
206       test.tearDown();
207 
208       test.setUp();
209       test.testPubSub();
210       test.tearDown();
211 
212       test.setUp();
213       test.testPubSubSub();
214       test.tearDown();
215 
216    }
217 }


syntax highlighted by Code2HTML, v. 0.9.1