1 package org.xmlBlaster.test.classtest.queue;
  2 
  3 import java.util.logging.Logger;
  4 import java.util.logging.Level;
  5 import org.xmlBlaster.util.StopWatch;
  6 import org.xmlBlaster.util.Global;
  7 import org.xmlBlaster.util.XmlBlasterException;
  8 import org.xmlBlaster.util.def.ErrorCode;
  9 import org.xmlBlaster.util.def.PriorityEnum;
 10 import org.xmlBlaster.util.queue.StorageId;
 11 import org.xmlBlaster.util.def.Constants;
 12 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
 13 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
 14 
 15 import org.xmlBlaster.util.queuemsg.DummyEntry;
 16 
 17 import java.sql.Connection;
 18 import java.util.ArrayList;
 19 
 20 import junit.framework.*;
 21 import org.xmlBlaster.util.queue.I_Queue;
 22 import org.xmlBlaster.util.queue.QueuePluginManager;
 23 import org.xmlBlaster.util.queue.jdbc.JdbcConnectionPool;
 24 import org.xmlBlaster.util.plugin.PluginInfo;
 25 
 26 /**
 27  * Test JdbcQueuePlugin failover when persistent store disappears. 
 28  * <p>
 29  * Invoke: java org.xmlBlaster.test.classtest.queue.JdbcQueueTest
 30  * </p>
 31  * <p>
 32  * Test database with PostgreSQL:
 33  * </p>
 34  * <pre>
 35  * initdb /tmp/postgres
 36  * cp /var/lib/pgsql/data/pg_hba.conf /tmp/postgres    (edit host access)
 37  * createdb test
 38  * postmaster -i -D /tmp/postgres
 39  * </pre>
 40  * @see org.xmlBlaster.util.queue.I_Queue
 41  * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
 42  */
 43 public class JdbcQueueTest extends TestCase {
 44    
 45    
 46    public class ConnectionConsumer extends Thread {
 47       private JdbcConnectionPool pool;
 48       private int count;
 49       
 50       public ConnectionConsumer(JdbcConnectionPool pool, int count) {
 51          this.pool = pool;
 52          this.count = count;
 53          start();
 54       }
 55       
 56       public void run() {
 57          boolean success = true;
 58          try {
 59             log.info("connectionConsumer " + this.count + " starting");
 60             Connection conn = this.pool.getConnection();
 61             log.info("connectionConsumer " + this.count + " got the connection " + conn);
 62             if (conn != null) 
 63                this.pool.releaseConnection(conn, success);
 64          }
 65          catch (XmlBlasterException ex) {
 66             log.info("connectionConsumer exception " + ex.getMessage());
 67             if (ex.getErrorCode().getErrorCode().equals(ErrorCode.RESOURCE_TOO_MANY_THREADS.getErrorCode())) {
 68                synchronized(JdbcQueueTest.class) {
 69                   exceptionCount++;
 70                }
 71             }
 72          }
 73       }
 74       
 75    }
 76    
 77    int exceptionCount = 0;
 78    
 79    private String ME = "JdbcQueueTest";
 80    protected Global glob;
 81    private static Logger log = Logger.getLogger(JdbcQueueTest.class.getName());
 82    private StopWatch stopWatch = new StopWatch();
 83 
 84    private int numOfQueues = 10;
 85    private int numOfMsg    = 10000;
 86    private long sizeOfMsg  = 100L;
 87    private I_Queue queue   = null;
 88 
 89    public ArrayList queueList = null;
 90 //   public static String[] PLUGIN_TYPES = { new String("JDBC"), new String("CACHE") };
 91    public static String[] PLUGIN_TYPES = { new String("JDBC") };
 92    public int count = 0;
 93    boolean suppressTest = false;
 94    boolean doExecute = true;
 95 
 96    /** Constructor for junit not possible since we need to run it 3 times
 97    public JdbcQueueTest(String name) {
 98       super(name);
 99       for (int i=0; i < NUM_IMPL; i++)
100          initialize(new Global(), name, i);
101    }
102    */
103 
104    public JdbcQueueTest(Global glob, String name, int currImpl, boolean doExecute) {
105       super(name);
106       this.doExecute = doExecute;
107       initialize(glob, name, currImpl);
108    }
109 
110    private void initialize(Global glob, String name, int currImpl) {
111       this.glob = Global.instance();
112 
113 
114       this.numOfQueues = glob.getProperty().get("queues", 2);
115       this.numOfMsg = glob.getProperty().get("entries", 100);
116       this.sizeOfMsg = glob.getProperty().get("sizes", 10L);
117       this.suppressTest = false;
118       this.count = currImpl;
119 
120       try {
121          String type = PLUGIN_TYPES[currImpl];
122          this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
123          QueuePluginManager pluginManager = new QueuePluginManager(glob);
124          PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
125          java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
126          prop.put("tableNamePrefix", "TEST");
127          prop.put("entriesTableName", "_entries");
128 
129          CbQueueProperty cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
130          StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SetupQueue");
131 
132          this.queue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
133          this.queue.shutdown(); // to allow to initialize again
134       }
135       catch (Exception ex) {
136          log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
137       }
138    }
139 
140    protected void setUp() {
141 
142       try {
143          glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
144          ME = "JdbcQueueTest with class: " + PLUGIN_TYPES[this.count];
145       }
146       catch (Exception ex) {
147          log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'" + ex.getMessage());
148       }
149 
150       // cleaning up the database from previous runs ...
151 
152       try {
153          // test initialize()
154 //         this.queue.destroy();
155          this.queue.shutdown();
156       }
157       catch (Exception ex) {
158          log.severe("could not propertly set up the database: " + ex.getMessage());
159          this.suppressTest = true;
160       }
161    }
162 
163    public void tearDown() {
164       try {
165          this.queue.clear();
166          this.queue.shutdown();
167       }
168       catch (Exception ex) {
169          log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp " + ex.getMessage());
170       }
171    }
172 
173    
174    public void testPutWithBreak() {
175       if (this.suppressTest) {
176          log.severe("JDBC test is not driven as no database was found");
177          return;
178       }
179       try {
180          if (this.doExecute) putWithBreak();
181          else {
182             log.warning("test desactivated since needs to be run manually");
183             log.warning("please invoke it as 'java org.xmlBlaster.test.classtest.queue.JdbcQueueTest'");
184          }
185       }
186       catch (XmlBlasterException ex) {
187          fail("Exception when testing PutWithBreak probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
188          ex.printStackTrace();
189       }
190    }
191 
192    public void putWithBreak() throws XmlBlasterException {
193       String me = ME + ".putWithBreak";
194       // set up the queues ....
195       QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
196       prop.setMaxEntries(10000);
197       StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "putWithBreak");
198       queue.initialize(queueId, prop);
199       queue.clear();
200 
201       int num = 30;
202       boolean success = false;
203       for (int i=0; i < num; i++) {
204          try {
205             log.info("put with break entry " + i + "/" + num + " please kill the DB manually to test reconnect");
206             DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), sizeOfMsg, true);
207             queue.put(entry, false);
208             try {
209                Thread.sleep(5000L);
210             }
211             catch (Exception ex) {
212             }
213          }
214          catch (XmlBlasterException ex) {
215             if (log.isLoggable(Level.FINE))  log.fine(ex.getMessage());
216             if ("resource.db.unavailable".equalsIgnoreCase(ex.getErrorCodeStr())) {
217                log.info("the communication to the db has been lost");
218                success = true;
219                break;
220             }
221             else throw ex;
222          }
223       }
224       
225       assertTrue(me + ": Timed out when waiting to loose the connection to the DB", success);
226       success = false; // reset the flag
227       log.info("preparing to reconnect again ...");
228 
229       for (int i=0; i < num; i++) {
230          try {
231             log.info("put with break entry " + i + "/" + num + " please restart the the DB to test reconnect");
232             DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), sizeOfMsg, true);
233             queue.put(entry, false);
234             log.info("the communication to the db has been reestablished");
235             success = true;
236             break;
237          }
238          catch (XmlBlasterException ex) {
239             if (log.isLoggable(Level.FINE))  log.fine(ex.getMessage());
240             if ("resource.db.unavailable".equalsIgnoreCase(ex.getErrorCodeStr())) {
241                try {
242                   Thread.sleep(5000L);
243                }
244                catch (Exception e) {
245                }
246             }
247             else throw ex;
248          }
249       }
250       assertTrue(me + ": Timed out when waiting to regain the connection to the DB", success);
251       log.info("successfully ended");
252    }
253 
254    public void testInitialEntries() {
255       if (this.suppressTest) {
256          log.severe("JDBC test is not driven as no database was found");
257          return;
258       }
259       try {
260          initialEntries();
261       }
262       catch (XmlBlasterException ex) {
263          fail("Exception when testing InitialEntries probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
264          ex.printStackTrace();
265       }
266    }
267 
268    public void initialEntries() throws XmlBlasterException {
269       // set up the queues ....
270       log.info("initialEntries test starts");
271       QueuePropertyBase cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
272       cbProp.setMaxEntries(10000L);
273       cbProp.setMaxBytes(200000L);
274       StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "initialEntries");
275 
276       try {
277          String type = PLUGIN_TYPES[this.count];
278          this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
279          QueuePluginManager pluginManager = new QueuePluginManager(glob);
280          PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
281          java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
282          prop.put("tableNamePrefix", "TEST");
283          prop.put("entriesTableName", "_entries");
284          I_Queue tmpQueue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
285          tmpQueue.clear();
286          // add some persistent entries and then shutdown ...
287          DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
288          tmpQueue.put(entry, false);
289          entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
290          tmpQueue.put(entry, false);
291          entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
292          tmpQueue.put(entry, false);
293          tmpQueue.shutdown(); // to allow to initialize again
294          I_Queue tmpQueue2 = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
295          long numOfEntries = tmpQueue2.getNumOfEntries();
296          assertEquals("Wrong number of entries in queue", 3L, numOfEntries);
297          ArrayList lst = tmpQueue2.peek(-1, -1L);
298          assertEquals("Wrong number of entries retrieved from queue", 3, lst.size());
299          queue.shutdown();
300       }
301       catch (Exception ex) {
302          log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
303          ex.printStackTrace();
304          assertTrue("exception occured when testing initialEntries", false);
305       }
306       log.info("initialEntries test successfully ended");
307    }
308 
309 
310 
311 
312    public void testMultiplePut() {
313       try {
314          multiplePut();
315       }
316       catch (XmlBlasterException ex) {
317          fail("Exception when testing multiplePut probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
318          ex.printStackTrace();
319       }
320    }
321 
322    public void multiplePut() throws XmlBlasterException {
323       // set up the queues ....
324       log.info("initialEntries test starts");
325       QueuePropertyBase cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
326       cbProp.setMaxEntries(10000L);
327       cbProp.setMaxBytes(200000L);
328       StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "initialEntries");
329 
330       try {
331          String type = PLUGIN_TYPES[this.count];
332          this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
333          QueuePluginManager pluginManager = new QueuePluginManager(glob);
334          PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
335          java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
336          prop.put("tableNamePrefix", "TEST");
337          prop.put("entriesTableName", "_entries");
338          I_Queue tmpQueue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
339          tmpQueue.clear();
340          // add some persistent entries and then shutdown ...
341          int nmax = 1;
342          int size = 100;
343 
344          for (int j=0; j < 4; j++) {
345             DummyEntry[] entries = new DummyEntry[nmax];
346             for (int i=0; i < nmax; i++) {
347                entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), size, true);
348             }
349             long time1 = System.currentTimeMillis();
350             tmpQueue.put(entries, false);
351             long delta = System.currentTimeMillis() - time1;
352             log.info("multiple put '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
353            
354             ArrayList list = tmpQueue.peek(-1, -1L);
355             assertEquals("Wrong number of entries in queue", nmax, list.size());
356            
357             time1 = System.currentTimeMillis();
358             tmpQueue.removeRandom(entries);
359             delta = System.currentTimeMillis() - time1;
360             log.info("multiple remove '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
361             tmpQueue.clear();
362            
363             time1 = System.currentTimeMillis();
364             for (int i=0; i < nmax; i++) {
365                tmpQueue.put(entries[i], false);
366             }
367             delta = System.currentTimeMillis() - time1;
368             log.info("repeated single put '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
369            
370             time1 = System.currentTimeMillis();
371             for (int i=0; i < nmax; i++) tmpQueue.removeRandom(entries[i]);
372             delta = System.currentTimeMillis() - time1;
373             log.info("repeated single remove '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
374             nmax *= 10;
375          }
376          tmpQueue.shutdown(); // to allow to initialize again
377       }
378       catch (Exception ex) {
379          log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
380          ex.printStackTrace();
381          assertTrue("exception occured when testing initialEntries", false);
382       }
383       log.info("initialEntries test successfully ended");
384    }
385 
386 
387    public void testConnectionPool() {
388       try {
389          String me = ME + "-testConnectionPool";
390          log.info(" starting ");
391          int numConn = 3;
392          int maxWaitingThreads = 10;
393 
394          Global ownGlobal = this.glob.getClone(null);
395 
396          QueuePluginManager pluginManager = new QueuePluginManager(ownGlobal);
397          PluginInfo pluginInfo = new PluginInfo(ownGlobal, pluginManager, "JDBC", "1.0");
398 
399          pluginInfo.getParameters().put("connectionBusyTimeout", "10000");
400          pluginInfo.getParameters().put("maxWaitingThreads", "" + maxWaitingThreads);
401          pluginInfo.getParameters().put("connectionPoolSize", "" + numConn);
402 
403          JdbcConnectionPool pool = new JdbcConnectionPool();
404          pool.initialize(ownGlobal, pluginInfo.getParameters());
405 
406          Connection[] conn = new Connection[numConn];         
407          for (int i=0; i < numConn; i++) {
408             log.info(" getting connection " + i);
409             conn[i] = pool.getConnection();
410             assertNotNull("The connection " + i + " shall not be null", conn[i]);
411          }
412          
413          log.info(" getting extra connection");
414          
415          Connection extraConn = null;
416          try {
417             extraConn = pool.getConnection();
418             assertTrue("An Exception should have occured here: ", false);
419          }
420          catch (Exception ex) {
421          }
422          // should wait 10 seconds and then return null
423          assertNull("the extra connection should be null", extraConn);
424          boolean success = true;
425          pool.releaseConnection(conn[0], success);
426          extraConn = pool.getConnection();
427          assertNotNull("the extra connection should not be null", extraConn);
428          //pool.releaseConnection(extraConn);
429 
430          this.exceptionCount = 0;         
431          int expectedEx = 4;
432          for (int i=0; i < maxWaitingThreads + expectedEx; i++) {
433             ConnectionConsumer cc = new ConnectionConsumer(pool, i);
434          }
435  
436          try {
437             Thread.sleep(15000L);
438          }
439          catch (InterruptedException ex) {
440          }
441  
442          assertEquals("Number of exceptions due to too many waiting threads is wrong", expectedEx, this.exceptionCount);
443          log.info(" successfully ended ");
444       }
445       catch (Exception ex) {
446          fail("Exception when testing multiplePut probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
447          ex.printStackTrace();
448       }
449    }
450 
451 
452    /**
453     * Method is used by TestRunner to load these tests
454     */
455    public static Test suite() {
456       TestSuite suite= new TestSuite();
457       Global glob = new Global();
458       for (int i=0; i < PLUGIN_TYPES.length; i++) {
459          suite.addTest(new JdbcQueueTest(glob, "testConnectionPool", i, true));
460          suite.addTest(new JdbcQueueTest(glob, "testMultiplePut", i, true));
461          suite.addTest(new JdbcQueueTest(glob, "testPutWithBreak", i, false));
462          suite.addTest(new JdbcQueueTest(glob, "testInitialEntries", i, true));
463       }
464       return suite;
465    }
466 
467    /**
468     * <pre>
469     *  java org.xmlBlaster.test.classtest.queue.JdbcQueueTest
470     * </pre>
471     */
472    public static void main(String args[]) {
473       Global glob = new Global(args);
474 
475       for (int i=0; i < PLUGIN_TYPES.length; i++) {
476          JdbcQueueTest testSub = new JdbcQueueTest(glob, "JdbcQueueTest", i, true);
477 
478          testSub.setUp();
479          testSub.testConnectionPool();
480          testSub.tearDown();
481 
482          testSub.setUp();
483          testSub.testMultiplePut();
484          testSub.tearDown();
485 
486          testSub.setUp();
487          testSub.testPutWithBreak();
488          testSub.tearDown();
489 
490          testSub.setUp();
491          testSub.testInitialEntries();
492          testSub.tearDown();
493       }
494    }
495 }


syntax highlighted by Code2HTML, v. 0.9.1