1 package org.xmlBlaster.test.classtest.queue;
2
3 import java.util.logging.Logger;
4 import java.util.logging.Level;
5 import org.xmlBlaster.util.StopWatch;
6 import org.xmlBlaster.util.Global;
7 import org.xmlBlaster.util.XmlBlasterException;
8 import org.xmlBlaster.util.def.ErrorCode;
9 import org.xmlBlaster.util.def.PriorityEnum;
10 import org.xmlBlaster.util.queue.StorageId;
11 import org.xmlBlaster.util.def.Constants;
12 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
13 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
14
15 import org.xmlBlaster.util.queuemsg.DummyEntry;
16
17 import java.sql.Connection;
18 import java.util.ArrayList;
19
20 import junit.framework.*;
21 import org.xmlBlaster.util.queue.I_Queue;
22 import org.xmlBlaster.util.queue.QueuePluginManager;
23 import org.xmlBlaster.util.queue.jdbc.JdbcConnectionPool;
24 import org.xmlBlaster.util.plugin.PluginInfo;
25
26 /**
27 * Test JdbcQueuePlugin failover when persistent store disappears.
28 * <p>
29 * Invoke: java org.xmlBlaster.test.classtest.queue.JdbcQueueTest
30 * </p>
31 * <p>
32 * Test database with PostgreSQL:
33 * </p>
34 * <pre>
35 * initdb /tmp/postgres
36 * cp /var/lib/pgsql/data/pg_hba.conf /tmp/postgres (edit host access)
37 * createdb test
38 * postmaster -i -D /tmp/postgres
39 * </pre>
40 * @see org.xmlBlaster.util.queue.I_Queue
41 * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
42 */
43 public class JdbcQueueTest extends TestCase {
44
45
46 public class ConnectionConsumer extends Thread {
47 private JdbcConnectionPool pool;
48 private int count;
49
50 public ConnectionConsumer(JdbcConnectionPool pool, int count) {
51 this.pool = pool;
52 this.count = count;
53 start();
54 }
55
56 public void run() {
57 boolean success = true;
58 try {
59 log.info("connectionConsumer " + this.count + " starting");
60 Connection conn = this.pool.getConnection();
61 log.info("connectionConsumer " + this.count + " got the connection " + conn);
62 if (conn != null)
63 this.pool.releaseConnection(conn, success);
64 }
65 catch (XmlBlasterException ex) {
66 log.info("connectionConsumer exception " + ex.getMessage());
67 if (ex.getErrorCode().getErrorCode().equals(ErrorCode.RESOURCE_TOO_MANY_THREADS.getErrorCode())) {
68 synchronized(JdbcQueueTest.class) {
69 exceptionCount++;
70 }
71 }
72 }
73 }
74
75 }
76
77 int exceptionCount = 0;
78
79 private String ME = "JdbcQueueTest";
80 protected Global glob;
81 private static Logger log = Logger.getLogger(JdbcQueueTest.class.getName());
82 private StopWatch stopWatch = new StopWatch();
83
84 private int numOfQueues = 10;
85 private int numOfMsg = 10000;
86 private long sizeOfMsg = 100L;
87 private I_Queue queue = null;
88
89 public ArrayList queueList = null;
90 // public static String[] PLUGIN_TYPES = { new String("JDBC"), new String("CACHE") };
91 public static String[] PLUGIN_TYPES = { new String("JDBC") };
92 public int count = 0;
93 boolean suppressTest = false;
94 boolean doExecute = true;
95
96 /** Constructor for junit not possible since we need to run it 3 times
97 public JdbcQueueTest(String name) {
98 super(name);
99 for (int i=0; i < NUM_IMPL; i++)
100 initialize(new Global(), name, i);
101 }
102 */
103
104 public JdbcQueueTest(Global glob, String name, int currImpl, boolean doExecute) {
105 super(name);
106 this.doExecute = doExecute;
107 initialize(glob, name, currImpl);
108 }
109
110 private void initialize(Global glob, String name, int currImpl) {
111 this.glob = Global.instance();
112
113
114 this.numOfQueues = glob.getProperty().get("queues", 2);
115 this.numOfMsg = glob.getProperty().get("entries", 100);
116 this.sizeOfMsg = glob.getProperty().get("sizes", 10L);
117 this.suppressTest = false;
118 this.count = currImpl;
119
120 try {
121 String type = PLUGIN_TYPES[currImpl];
122 this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
123 QueuePluginManager pluginManager = new QueuePluginManager(glob);
124 PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
125 java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
126 prop.put("tableNamePrefix", "TEST");
127 prop.put("entriesTableName", "_entries");
128
129 CbQueueProperty cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
130 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "SetupQueue");
131
132 this.queue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
133 this.queue.shutdown(); // to allow to initialize again
134 }
135 catch (Exception ex) {
136 log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
137 }
138 }
139
140 protected void setUp() {
141
142 try {
143 glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
144 ME = "JdbcQueueTest with class: " + PLUGIN_TYPES[this.count];
145 }
146 catch (Exception ex) {
147 log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'" + ex.getMessage());
148 }
149
150 // cleaning up the database from previous runs ...
151
152 try {
153 // test initialize()
154 // this.queue.destroy();
155 this.queue.shutdown();
156 }
157 catch (Exception ex) {
158 log.severe("could not propertly set up the database: " + ex.getMessage());
159 this.suppressTest = true;
160 }
161 }
162
163 public void tearDown() {
164 try {
165 this.queue.clear();
166 this.queue.shutdown();
167 }
168 catch (Exception ex) {
169 log.warning("error when tearing down " + ex.getMessage() + " this normally happens when invoquing multiple times cleanUp " + ex.getMessage());
170 }
171 }
172
173
174 public void testPutWithBreak() {
175 if (this.suppressTest) {
176 log.severe("JDBC test is not driven as no database was found");
177 return;
178 }
179 try {
180 if (this.doExecute) putWithBreak();
181 else {
182 log.warning("test desactivated since needs to be run manually");
183 log.warning("please invoke it as 'java org.xmlBlaster.test.classtest.queue.JdbcQueueTest'");
184 }
185 }
186 catch (XmlBlasterException ex) {
187 fail("Exception when testing PutWithBreak probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
188 ex.printStackTrace();
189 }
190 }
191
192 public void putWithBreak() throws XmlBlasterException {
193 String me = ME + ".putWithBreak";
194 // set up the queues ....
195 QueuePropertyBase prop = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
196 prop.setMaxEntries(10000);
197 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "putWithBreak");
198 queue.initialize(queueId, prop);
199 queue.clear();
200
201 int num = 30;
202 boolean success = false;
203 for (int i=0; i < num; i++) {
204 try {
205 log.info("put with break entry " + i + "/" + num + " please kill the DB manually to test reconnect");
206 DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), sizeOfMsg, true);
207 queue.put(entry, false);
208 try {
209 Thread.sleep(5000L);
210 }
211 catch (Exception ex) {
212 }
213 }
214 catch (XmlBlasterException ex) {
215 if (log.isLoggable(Level.FINE)) log.fine(ex.getMessage());
216 if ("resource.db.unavailable".equalsIgnoreCase(ex.getErrorCodeStr())) {
217 log.info("the communication to the db has been lost");
218 success = true;
219 break;
220 }
221 else throw ex;
222 }
223 }
224
225 assertTrue(me + ": Timed out when waiting to loose the connection to the DB", success);
226 success = false; // reset the flag
227 log.info("preparing to reconnect again ...");
228
229 for (int i=0; i < num; i++) {
230 try {
231 log.info("put with break entry " + i + "/" + num + " please restart the the DB to test reconnect");
232 DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), sizeOfMsg, true);
233 queue.put(entry, false);
234 log.info("the communication to the db has been reestablished");
235 success = true;
236 break;
237 }
238 catch (XmlBlasterException ex) {
239 if (log.isLoggable(Level.FINE)) log.fine(ex.getMessage());
240 if ("resource.db.unavailable".equalsIgnoreCase(ex.getErrorCodeStr())) {
241 try {
242 Thread.sleep(5000L);
243 }
244 catch (Exception e) {
245 }
246 }
247 else throw ex;
248 }
249 }
250 assertTrue(me + ": Timed out when waiting to regain the connection to the DB", success);
251 log.info("successfully ended");
252 }
253
254 public void testInitialEntries() {
255 if (this.suppressTest) {
256 log.severe("JDBC test is not driven as no database was found");
257 return;
258 }
259 try {
260 initialEntries();
261 }
262 catch (XmlBlasterException ex) {
263 fail("Exception when testing InitialEntries probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
264 ex.printStackTrace();
265 }
266 }
267
268 public void initialEntries() throws XmlBlasterException {
269 // set up the queues ....
270 log.info("initialEntries test starts");
271 QueuePropertyBase cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
272 cbProp.setMaxEntries(10000L);
273 cbProp.setMaxBytes(200000L);
274 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "initialEntries");
275
276 try {
277 String type = PLUGIN_TYPES[this.count];
278 this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
279 QueuePluginManager pluginManager = new QueuePluginManager(glob);
280 PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
281 java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
282 prop.put("tableNamePrefix", "TEST");
283 prop.put("entriesTableName", "_entries");
284 I_Queue tmpQueue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
285 tmpQueue.clear();
286 // add some persistent entries and then shutdown ...
287 DummyEntry entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
288 tmpQueue.put(entry, false);
289 entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
290 tmpQueue.put(entry, false);
291 entry = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), 100, true);
292 tmpQueue.put(entry, false);
293 tmpQueue.shutdown(); // to allow to initialize again
294 I_Queue tmpQueue2 = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
295 long numOfEntries = tmpQueue2.getNumOfEntries();
296 assertEquals("Wrong number of entries in queue", 3L, numOfEntries);
297 ArrayList lst = tmpQueue2.peek(-1, -1L);
298 assertEquals("Wrong number of entries retrieved from queue", 3, lst.size());
299 queue.shutdown();
300 }
301 catch (Exception ex) {
302 log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
303 ex.printStackTrace();
304 assertTrue("exception occured when testing initialEntries", false);
305 }
306 log.info("initialEntries test successfully ended");
307 }
308
309
310
311
312 public void testMultiplePut() {
313 try {
314 multiplePut();
315 }
316 catch (XmlBlasterException ex) {
317 fail("Exception when testing multiplePut probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
318 ex.printStackTrace();
319 }
320 }
321
322 public void multiplePut() throws XmlBlasterException {
323 // set up the queues ....
324 log.info("initialEntries test starts");
325 QueuePropertyBase cbProp = new CbQueueProperty(glob, Constants.RELATING_CALLBACK, "/node/test");
326 cbProp.setMaxEntries(10000L);
327 cbProp.setMaxBytes(200000L);
328 StorageId queueId = new StorageId(Constants.RELATING_CALLBACK, "initialEntries");
329
330 try {
331 String type = PLUGIN_TYPES[this.count];
332 this.glob.getProperty().set("cb.queue.persistent.tableNamePrefix", "TEST");
333 QueuePluginManager pluginManager = new QueuePluginManager(glob);
334 PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
335 java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
336 prop.put("tableNamePrefix", "TEST");
337 prop.put("entriesTableName", "_entries");
338 I_Queue tmpQueue = pluginManager.getPlugin(pluginInfo, queueId, cbProp);
339 tmpQueue.clear();
340 // add some persistent entries and then shutdown ...
341 int nmax = 1;
342 int size = 100;
343
344 for (int j=0; j < 4; j++) {
345 DummyEntry[] entries = new DummyEntry[nmax];
346 for (int i=0; i < nmax; i++) {
347 entries[i] = new DummyEntry(glob, PriorityEnum.NORM_PRIORITY, queue.getStorageId(), size, true);
348 }
349 long time1 = System.currentTimeMillis();
350 tmpQueue.put(entries, false);
351 long delta = System.currentTimeMillis() - time1;
352 log.info("multiple put '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
353
354 ArrayList list = tmpQueue.peek(-1, -1L);
355 assertEquals("Wrong number of entries in queue", nmax, list.size());
356
357 time1 = System.currentTimeMillis();
358 tmpQueue.removeRandom(entries);
359 delta = System.currentTimeMillis() - time1;
360 log.info("multiple remove '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
361 tmpQueue.clear();
362
363 time1 = System.currentTimeMillis();
364 for (int i=0; i < nmax; i++) {
365 tmpQueue.put(entries[i], false);
366 }
367 delta = System.currentTimeMillis() - time1;
368 log.info("repeated single put '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
369
370 time1 = System.currentTimeMillis();
371 for (int i=0; i < nmax; i++) tmpQueue.removeRandom(entries[i]);
372 delta = System.currentTimeMillis() - time1;
373 log.info("repeated single remove '" + nmax + "' entries took '" + 0.001 * delta + "' seconds which is '" + 1.0 * delta / nmax + "' ms per entry");
374 nmax *= 10;
375 }
376 tmpQueue.shutdown(); // to allow to initialize again
377 }
378 catch (Exception ex) {
379 log.severe("setUp: error when setting the property 'cb.queue.persistent.tableNamePrefix' to 'TEST'");
380 ex.printStackTrace();
381 assertTrue("exception occured when testing initialEntries", false);
382 }
383 log.info("initialEntries test successfully ended");
384 }
385
386
387 public void testConnectionPool() {
388 try {
389 String me = ME + "-testConnectionPool";
390 log.info(" starting ");
391 int numConn = 3;
392 int maxWaitingThreads = 10;
393
394 Global ownGlobal = this.glob.getClone(null);
395
396 QueuePluginManager pluginManager = new QueuePluginManager(ownGlobal);
397 PluginInfo pluginInfo = new PluginInfo(ownGlobal, pluginManager, "JDBC", "1.0");
398
399 pluginInfo.getParameters().put("connectionBusyTimeout", "10000");
400 pluginInfo.getParameters().put("maxWaitingThreads", "" + maxWaitingThreads);
401 pluginInfo.getParameters().put("connectionPoolSize", "" + numConn);
402
403 JdbcConnectionPool pool = new JdbcConnectionPool();
404 pool.initialize(ownGlobal, pluginInfo.getParameters());
405
406 Connection[] conn = new Connection[numConn];
407 for (int i=0; i < numConn; i++) {
408 log.info(" getting connection " + i);
409 conn[i] = pool.getConnection();
410 assertNotNull("The connection " + i + " shall not be null", conn[i]);
411 }
412
413 log.info(" getting extra connection");
414
415 Connection extraConn = null;
416 try {
417 extraConn = pool.getConnection();
418 assertTrue("An Exception should have occured here: ", false);
419 }
420 catch (Exception ex) {
421 }
422 // should wait 10 seconds and then return null
423 assertNull("the extra connection should be null", extraConn);
424 boolean success = true;
425 pool.releaseConnection(conn[0], success);
426 extraConn = pool.getConnection();
427 assertNotNull("the extra connection should not be null", extraConn);
428 //pool.releaseConnection(extraConn);
429
430 this.exceptionCount = 0;
431 int expectedEx = 4;
432 for (int i=0; i < maxWaitingThreads + expectedEx; i++) {
433 ConnectionConsumer cc = new ConnectionConsumer(pool, i);
434 }
435
436 try {
437 Thread.sleep(15000L);
438 }
439 catch (InterruptedException ex) {
440 }
441
442 assertEquals("Number of exceptions due to too many waiting threads is wrong", expectedEx, this.exceptionCount);
443 log.info(" successfully ended ");
444 }
445 catch (Exception ex) {
446 fail("Exception when testing multiplePut probably due to failed initialization of the queue of type " + PLUGIN_TYPES[this.count] + " " + ex.getMessage() );
447 ex.printStackTrace();
448 }
449 }
450
451
452 /**
453 * Method is used by TestRunner to load these tests
454 */
455 public static Test suite() {
456 TestSuite suite= new TestSuite();
457 Global glob = new Global();
458 for (int i=0; i < PLUGIN_TYPES.length; i++) {
459 suite.addTest(new JdbcQueueTest(glob, "testConnectionPool", i, true));
460 suite.addTest(new JdbcQueueTest(glob, "testMultiplePut", i, true));
461 suite.addTest(new JdbcQueueTest(glob, "testPutWithBreak", i, false));
462 suite.addTest(new JdbcQueueTest(glob, "testInitialEntries", i, true));
463 }
464 return suite;
465 }
466
467 /**
468 * <pre>
469 * java org.xmlBlaster.test.classtest.queue.JdbcQueueTest
470 * </pre>
471 */
472 public static void main(String args[]) {
473 Global glob = new Global(args);
474
475 for (int i=0; i < PLUGIN_TYPES.length; i++) {
476 JdbcQueueTest testSub = new JdbcQueueTest(glob, "JdbcQueueTest", i, true);
477
478 testSub.setUp();
479 testSub.testConnectionPool();
480 testSub.tearDown();
481
482 testSub.setUp();
483 testSub.testMultiplePut();
484 testSub.tearDown();
485
486 testSub.setUp();
487 testSub.testPutWithBreak();
488 testSub.tearDown();
489
490 testSub.setUp();
491 testSub.testInitialEntries();
492 testSub.tearDown();
493 }
494 }
495 }
syntax highlighted by Code2HTML, v. 0.9.1