1 package org.xmlBlaster.test.classtest.msgstore;
  2 
  3 import java.util.logging.Logger;
  4 import java.util.logging.Level;
  5 import org.xmlBlaster.engine.ServerScope;
  6 import org.xmlBlaster.util.XmlBlasterException;
  7 import org.xmlBlaster.util.MsgUnit;
  8 import org.xmlBlaster.util.queue.StorageId;
  9 import org.xmlBlaster.engine.msgstore.I_MapEntry;
 10 import org.xmlBlaster.engine.msgstore.I_Map;
 11 import org.xmlBlaster.util.qos.storage.MsgUnitStoreProperty;
 12 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
 13 import org.xmlBlaster.engine.qos.PublishQosServer;
 14 import org.xmlBlaster.engine.MsgUnitWrapper;
 15 
 16 import java.util.ArrayList;
 17 
 18 import junit.framework.*;
 19 import org.xmlBlaster.engine.msgstore.StoragePluginManager;
 20 import org.xmlBlaster.util.plugin.PluginInfo;
 21 
 22 /**
 23  * Test I_Map e.g. MapPlugin which allows to store randomly messages. 
 24  * <p>
 25  * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.msgstore.I_MapTest
 26  * </p>
 27  * @see org.xmlBlaster.engine.msgstore.I_Map
 28  * @see org.xmlBlaster.engine.msgstore.ram.MapPlugin
 29  * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
 30  */
 31 public class I_MapTest extends TestCase {
 32    private String ME = "I_MapTest";
 33    protected ServerScope glob;
 34    private static Logger log = Logger.getLogger(I_MapTest.class.getName());
 35 
 36    private final boolean IS_DURABLE = true;
 37    private final boolean IS_TRANSIENT = false;
 38 
 39    private I_Map currMap;
 40    private int currImpl;
 41 /*
 42    static I_Map[] IMPL = {
 43                    new org.xmlBlaster.engine.msgstore.ram.MapPlugin(),
 44                    new org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin(),
 45                    new org.xmlBlaster.engine.msgstore.cache.PersistenceCachePlugin()
 46                  };
 47 */
 48    static String[] PLUGIN_TYPES = { new String("RAM"),
 49                                     new String("JDBC"),
 50                                     new String("CACHE") };
 51 
 52    public I_MapTest(String name, int currImpl) {
 53       super(name);
 54       this.currImpl = currImpl;
 55 
 56       String[] args = { //configure the cache
 57          "-persistence.persistentQueue", "JDBC,1.0",
 58          "-persistence.transientQueue", "RAM,1.0" };
 59 
 60       this.glob = new ServerScope(args);
 61 
 62       //this.ME = "I_MapTest[" + this.currMap.getClass().getName() + "]";
 63    }
 64 
 65    protected void setUp() {
 66       try {
 67          glob.getProperty().set("topic.queue.persistent.tableNamePrefix", "TEST");
 68 
 69          String type = PLUGIN_TYPES[this.currImpl];
 70          StoragePluginManager pluginManager = this.glob.getStoragePluginManager();
 71          // Overwrite JDBC settings from xmlBlaster.properties
 72          PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
 73          java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
 74          prop.put("tableNamePrefix", "TEST");
 75          prop.put("entriesTableName", "_entries");
 76          this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());
 77 
 78          if (!"JDBC".equals(type))
 79             pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
 80 
 81          MsgUnitStoreProperty storeProp = new MsgUnitStoreProperty(glob, "/node/test");
 82          StorageId queueId = new StorageId("msgUnitStore", "SomeMapId");
 83 
 84          this.currMap = pluginManager.getPlugin(pluginInfo, queueId, storeProp);
 85          this.currMap.shutdown(); // to allow to initialize again
 86       }
 87       catch (Exception ex) {
 88          log.severe("setUp: error when setting the property 'topic.queue.persistent.tableNamePrefix' to 'TEST': " + ex.getMessage());
 89       }
 90 
 91       // cleaning up the database from previous runs ...
 92 /*
 93       QueuePropertyBase prop = null;
 94       try {
 95          prop = new MsgUnitStoreProperty(glob, "/node/test");
 96 
 97          StorageId queueId = new StorageId("msgUnitStore", "SetupMap");
 98          JdbcMapPlugin jdbcMap = new JdbcMapPlugin();
 99          jdbcMap.initialize(queueId, prop);
100          jdbcMap.destroy();
101       }
102       catch (Exception ex) {
103          log.severe("could not propertly set up the database: " + ex.getMessage());
104       }
105 */
106    }
107 
108    private MsgUnit createMsgUnit(boolean persistent) {
109       return createMsgUnit(persistent, -1);
110    }
111 
112    private MsgUnit createMsgUnit(boolean persistent, long contentLen_) {
113       try {
114          int contentLen = (int)contentLen_;
115          PublishQosServer publishQosServer = new PublishQosServer(glob, "<qos/>");
116          publishQosServer.getData().setPersistent(persistent);
117          String contentStr = "content";
118          if (contentLen >= 0) {
119             StringBuffer content = new StringBuffer(contentLen);
120             for (int i=0; i<contentLen; i++) {
121                content.append("X");
122             }
123             contentStr = content.toString();
124          }
125          return new MsgUnit(glob, "<key oid='Hi'/>", contentStr.getBytes(), publishQosServer.toXml());
126       }
127       catch (XmlBlasterException ex) {
128          fail("msgUnit not constructed: " + ex.getMessage());
129       }
130       return null;
131    }
132 
133 
134    /**
135     * Tests QueuePropertyBase() and getStorageId()
136     * @param queueTypeList A space separated list of names for the
137     *        implementations to be tested. Valid names are:
138     *        RamMapPlugin JdbcMapPlugin
139     */
140    public void testConfig() {
141       config(this.currMap);
142    }
143 
144    /**
145     * Tests initialize(), getProperties(), setProperties() and capacity()
146     * @param queue !!!Is not initialized in this case!!!!
147     */
148    private void config(I_Map i_map) {
149       ME = "I_MapTest.config(" + i_map.getStorageId() + ")[" + i_map.getClass().getName() + "]";
150       System.out.println("***" + ME);
151 
152       QueuePropertyBase prop1 = null;
153       QueuePropertyBase prop = null;
154       try {
155          // test initialize()
156          prop1 = new MsgUnitStoreProperty(glob, "/node/test");
157          int max = 12;
158          prop1.setMaxEntries(max);
159          prop1.setMaxEntriesCache(max);
160          assertEquals(ME+": Wrong capacity", max, prop1.getMaxEntries());
161          assertEquals(ME+": Wrong cache capacity", max, prop1.getMaxEntriesCache());
162          StorageId queueId = new StorageId("msgUnitStore", "SomeMapId");
163 
164          i_map.initialize(queueId, prop1);
165          assertEquals(ME+": Wrong queue ID", queueId, i_map.getStorageId());
166 
167          try {
168             prop = new MsgUnitStoreProperty(glob, "/node/test");
169             prop.setMaxEntries(99);
170             prop.setMaxEntriesCache(99);
171             i_map.setProperties(prop);
172          }
173          catch(XmlBlasterException e) {
174             fail("Changing properties failed: " + e.getMessage());
175          }
176 
177       }
178       catch(XmlBlasterException e) {
179          fail(ME + ": Exception thrown: " + e.getMessage());
180       }
181 
182       long len = prop.getMaxEntries();
183       assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), i_map.getMaxNumOfEntries());
184       assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), ((QueuePropertyBase)i_map.getProperties()).getMaxEntries());
185       assertEquals(ME+": Wrong size", 0, i_map.getNumOfEntries());
186 
187       try {
188          for (int ii=0; ii<len; ii++) {
189             i_map.put(new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId()));
190          }
191          assertEquals(ME+": Wrong total size", len, i_map.getNumOfEntries());
192 
193          try {
194             MsgUnitWrapper queueEntry = new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId());
195             i_map.put(queueEntry);
196             i_map.put(queueEntry);
197             fail("Did expect an exception on overflow getMaxNumOfEntries=" + i_map.getMaxNumOfEntries() + " size=" + i_map.getNumOfEntries());
198          }
199          catch(XmlBlasterException e) {
200             log.info("SUCCESS the exception is OK: " + e.getMessage());
201          }
202 
203          log.info("toXml() test:" + i_map.toXml(""));
204          log.info("usage() test:" + i_map.usage());
205 
206          assertEquals(ME+": should not be shutdown", false, i_map.isShutdown());
207          i_map.shutdown();
208          assertEquals(ME+": should be shutdown", true, i_map.isShutdown());
209 
210          log.info("#2 Success, filled " + i_map.getNumOfEntries() + " messages into queue");
211          System.out.println("***" + ME + " [SUCCESS]");
212          i_map.shutdown();
213       }
214       catch(XmlBlasterException e) {
215          fail(ME + ": Exception thrown: " + e.getMessage());
216       }
217    }
218 
219 //------------------------------------
220    public void testPutMsg() {
221       String queueType = "unknown";
222       try {
223          QueuePropertyBase prop = new MsgUnitStoreProperty(glob, "/node/test");
224          queueType = this.currMap.toString();
225          StorageId queueId = new StorageId("msgUnitStore", "MapPlugin/putMsg");
226          this.currMap.initialize(queueId, prop);
227          this.currMap.clear();
228          assertEquals(ME + "wrong size before starting ", 0L, this.currMap.getNumOfEntries());
229          putMsg(this.currMap);
230       }
231       catch (XmlBlasterException ex) {
232          fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType + ": " + ex.getMessage());
233       }
234    }
235 
236    /**
237     * Tests put(MsgMapEntry[]) and put(MsgMapEntry) and clear()
238     */
239    private void putMsg(I_Map i_map) {
240       ME = "I_MapTest.putMsg(" + i_map.getStorageId() + ")[" + i_map.getClass().getName() + "]";
241       System.out.println("***" + ME);
242       try {
243          //========== Test 1: put(I_MapEntry[])
244          int numLoop = 10;
245          ArrayList list = new ArrayList();
246          for (int ii=0; ii<numLoop; ii++) {
247             MsgUnitWrapper[] queueEntries = {
248                          new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId()),
249                          new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId()),
250                          new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId())};
251 
252             for(int i=0; i<queueEntries.length; i++)
253                i_map.put(queueEntries[i]);
254 
255             for (int i=0; i < 3; i++) list.add(queueEntries[i]);
256 
257             this.checkSizeAndEntries(" put(I_MapEntry[]) ", list, i_map);
258             assertEquals(ME+": Wrong size", (ii+1)*queueEntries.length, i_map.getNumOfEntries());
259          }
260          int total = numLoop*3;
261          assertEquals(ME+": Wrong total size", total, i_map.getNumOfEntries());
262          log.info("#1 Success, filled " + i_map.getNumOfEntries() + " messages into queue");
263 
264 
265          //========== Test 2: put(I_MapEntry)
266          for (int ii=0; ii<numLoop; ii++) {
267             MsgUnitWrapper queueEntry = new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId());
268             list.add(queueEntry);
269             i_map.put(queueEntry);
270          }
271          assertEquals(ME+": Wrong total size", numLoop+total, i_map.getNumOfEntries());
272          this.checkSizeAndEntries(" put(I_MapEntry) ", list, i_map);
273          log.info("#2 Success, filled " + i_map.getNumOfEntries() + " messages into queue");
274 
275          i_map.clear();
276          checkSizeAndEntries("Test 2 put()", new I_MapEntry[0], i_map);
277          assertEquals(ME+": Wrong empty size", 0L, i_map.getNumOfEntries());
278 
279          System.out.println("***" + ME + " [SUCCESS]");
280          i_map.shutdown();
281       }
282       catch(XmlBlasterException e) {
283          fail(ME + ": Exception thrown: " + e.getMessage());
284       }
285    }
286 
287 
288    /**
289     * Tests overflow of maxNumOfBytes() of a CACHE. 
290     */
291    public void testByteOverflow() {
292       I_Map i_map = this.currMap;
293       ME = "I_MapTest.testByteOverflow(" + i_map.getStorageId() + ")[" + i_map.getClass().getName() + "]";
294       System.out.println("***" + ME);
295       try {
296          StorageId storageId = new StorageId("msgUnitStore", "ByteOverflowMapId");
297          QueuePropertyBase prop = new MsgUnitStoreProperty(glob, "/node/test");
298 
299          MsgUnitWrapper mu = new MsgUnitWrapper(glob, createMsgUnit(false, 0),  storageId);
300          long sizeEmpty = mu.getSizeInBytes();
301 
302          MsgUnitWrapper[] queueEntries = {
303             new MsgUnitWrapper(glob, createMsgUnit(false, 0),  storageId),
304             new MsgUnitWrapper(glob, createMsgUnit(false, 0),  storageId),
305             new MsgUnitWrapper(glob, createMsgUnit(false, 0),  storageId),
306             // Each above entry has 3,311 bytes = 9,922, the next one has 9,932 bytes
307             // so when it is entered two of the above need to be swapped away
308             // as maxBytes=13,244
309             new MsgUnitWrapper(glob, createMsgUnit(false, 2*sizeEmpty-1), storageId),
310             new MsgUnitWrapper(glob, createMsgUnit(false, 0),  storageId)};
311 
312          final long maxBytesCache = 4*sizeEmpty;
313          prop.setMaxBytes(1000000);
314          prop.setMaxBytesCache(maxBytesCache);
315          assertEquals(ME+": Wrong capacity", 1000000, prop.getMaxBytes());
316          assertEquals(ME+": Wrong cache capacity", maxBytesCache, prop.getMaxBytesCache());
317          i_map.initialize(storageId, prop);
318          assertEquals(ME+": Wrong queue ID", storageId, i_map.getStorageId());
319 
320          long numOfBytes = 0;
321          for(int i=0; i<queueEntries.length; i++) {
322             i_map.put(queueEntries[i]);
323             numOfBytes += queueEntries[i].getSizeInBytes();
324          }
325 
326          assertEquals(ME+": Wrong size", queueEntries.length, i_map.getNumOfEntries());
327          assertEquals(ME+": Wrong bytes", numOfBytes, i_map.getNumOfBytes());
328 
329          System.out.println("***" + ME + " [SUCCESS]");
330          i_map.clear();
331          i_map.shutdown();
332       }
333       catch(XmlBlasterException e) {
334          log.severe("Exception thrown: " + e.getMessage());
335          fail(ME + ": Exception thrown: " + e.getMessage());
336       }
337    }
338 
339 
340 //------------------------------------
341    public void testGetMsg() {
342 
343       String queueType = "unknown";
344       try {
345          QueuePropertyBase prop = new MsgUnitStoreProperty(glob, "/node/test");
346          queueType = this.currMap.toString();
347          StorageId queueId = new StorageId("msgUnitStore", "MapPlugin/getMsg");
348          this.currMap.initialize(queueId, prop);
349          this.currMap.clear();
350          assertEquals(ME + "wrong size before starting ", 0, this.currMap.getNumOfEntries());
351          getMsg(this.currMap);
352       }
353       catch (XmlBlasterException ex) {
354          log.severe("Exception when testing getMsg probably due to failed initialization of the queue " + queueType + ": " + ex.getMessage());
355       }
356 
357    }
358 
359    /**
360     * Tests get() and get(int num) and remove()
361     * For a discussion of the sorting order see Javadoc of this class
362     */
363    private void getMsg(I_Map i_map) {
364       ME = "I_MapTest.getMsg(" + i_map.getStorageId() + ")[" + i_map.getClass().getName() + "]";
365       System.out.println("***" + ME);
366       try {
367          //========== Test 1: get()
368          {
369             MsgUnitWrapper[] queueEntries = {
370                          new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId()),
371                          new MsgUnitWrapper(glob, createMsgUnit(true), i_map.getStorageId()),
372                          new MsgUnitWrapper(glob, createMsgUnit(true), i_map.getStorageId())
373                                         };
374             for(int i=0; i<queueEntries.length; i++) {
375                i_map.put(queueEntries[i]);
376                log.info("#" + i + " id=" + queueEntries[i].getUniqueId() + " numSizeBytes()=" + queueEntries[i].getSizeInBytes());
377             }
378             log.info("storage bytes sum=" + i_map.getNumOfBytes() + " with persistent bytes=" + i_map.getNumOfPersistentBytes());
379 
380             assertEquals("", 3, i_map.getNumOfEntries());
381             assertEquals("", 2, i_map.getNumOfPersistentEntries());
382 
383             for (int ii=0; ii<10; ii++) {
384                I_MapEntry result = i_map.get(queueEntries[0].getUniqueId());
385                assertTrue("Missing entry", result != null);
386                assertEquals(ME+": Wrong result", queueEntries[0].getUniqueId(), result.getUniqueId());
387 
388                result = i_map.get(queueEntries[1].getUniqueId());
389                assertTrue("Missing entry", result != null);
390                assertEquals(ME+": Wrong result", queueEntries[1].getUniqueId(), result.getUniqueId());
391 
392                result = i_map.get(queueEntries[2].getUniqueId());
393                assertTrue("Missing entry", result != null);
394                assertEquals(ME+": Wrong result", queueEntries[2].getUniqueId(), result.getUniqueId());
395             }
396             assertEquals("", 3, i_map.getNumOfEntries());
397             assertEquals("", 2, i_map.getNumOfPersistentEntries());
398 
399             log.info("storage before remove [0], bytes sum=" + i_map.getNumOfBytes() + " with persistent bytes=" + i_map.getNumOfPersistentBytes());
400             i_map.remove(queueEntries[0]); // Remove one
401             log.info("storage after remove [0], bytes sum=" + i_map.getNumOfBytes() + " with persistent bytes=" + i_map.getNumOfPersistentBytes());
402             ArrayList list = new ArrayList();
403             list.add(queueEntries[1]);
404             list.add(queueEntries[2]);
405             this.checkSizeAndEntries(" getMsg() ", list, i_map);
406 
407             for (int ii=0; ii<10; ii++) {
408                I_MapEntry result = i_map.get(queueEntries[1].getUniqueId());
409                assertTrue("Missing entry", result != null);
410                assertEquals(ME+": Wrong result", queueEntries[1].getUniqueId(), result.getUniqueId());
411             }
412             i_map.remove(queueEntries[1].getUniqueId()); // Remove one
413             assertEquals("", 1, i_map.getNumOfEntries());
414             assertEquals("", 1, i_map.getNumOfPersistentEntries());
415 
416             for (int ii=0; ii<10; ii++) {
417                I_MapEntry result = i_map.get(queueEntries[2].getUniqueId());
418                assertTrue("Missing entry", result != null);
419                assertEquals(ME+": Wrong result", queueEntries[2].getUniqueId(), result.getUniqueId());
420             }
421             i_map.remove(queueEntries[2]); // Remove one
422             for (int ii=0; ii<10; ii++) {
423                I_MapEntry result = i_map.get(queueEntries[0].getUniqueId());
424                assertTrue("Unexpected entry", result == null);
425             }
426             assertEquals("", 0, i_map.getNumOfEntries());
427             assertEquals("", 0, i_map.getNumOfPersistentEntries());
428             log.info("#1 Success, get()");
429          }
430 
431          System.out.println("***" + ME + " [SUCCESS]");
432          i_map.clear();
433          i_map.shutdown();
434       }
435       catch(XmlBlasterException e) {
436          e.printStackTrace();
437          fail(ME + ": Exception thrown: " + e.getMessage());
438       }
439    }
440 
441 
442 
443 //------------------------------------
444    public void testGetAllMsgs() {
445 
446       String queueType = "unknown";
447       try {
448          QueuePropertyBase prop = new MsgUnitStoreProperty(glob, "/node/test");
449          queueType = this.currMap.toString();
450          StorageId queueId = new StorageId("msgUnitStore", "MapPlugin/getAllMsgs");
451          this.currMap.initialize(queueId, prop);
452          this.currMap.clear();
453          assertEquals(ME + "wrong size before starting ", 0, this.currMap.getNumOfEntries());
454          getAllMsgs(this.currMap);
455       }
456       catch (XmlBlasterException ex) {
457          log.severe("Exception when testing getAllMsgs probably due to failed initialization of the queue " + queueType + ": " + ex.getMessage());
458       }
459 
460    }
461 
462    /**
463     * Tests get() and get(int num) and remove()
464     * NOTE: Currently the MapPlugin returns getAll() sorted (it uses a TreeMap)
465     *       But we haven't yet forced this in the I_Map#getAll() Javadoc!
466     *       This test assumes sorting order and needs to be changed if we once
467     *       decide to specify the exact behaviour in I_Map#getAll() javadoc
468     */
469    private void getAllMsgs(I_Map i_map) {
470       ME = "I_MapTest.getAllMsgs(" + i_map.getStorageId() + ")[" + i_map.getClass().getName() + "]";
471       System.out.println("***" + ME);
472       try {