1 package org.xmlBlaster.test.classtest.msgstore;
2
3 import java.util.logging.Logger;
4 import java.util.logging.Level;
5 import org.xmlBlaster.engine.ServerScope;
6 import org.xmlBlaster.util.XmlBlasterException;
7 import org.xmlBlaster.util.MsgUnit;
8 import org.xmlBlaster.util.queue.StorageId;
9 import org.xmlBlaster.engine.msgstore.I_MapEntry;
10 import org.xmlBlaster.engine.msgstore.I_Map;
11 import org.xmlBlaster.util.qos.storage.MsgUnitStoreProperty;
12 import org.xmlBlaster.util.qos.storage.QueuePropertyBase;
13 import org.xmlBlaster.engine.qos.PublishQosServer;
14 import org.xmlBlaster.engine.MsgUnitWrapper;
15
16 import java.util.ArrayList;
17
18 import junit.framework.*;
19 import org.xmlBlaster.engine.msgstore.StoragePluginManager;
20 import org.xmlBlaster.util.plugin.PluginInfo;
21
22 /**
23 * Test I_Map e.g. MapPlugin which allows to store randomly messages.
24 * <p>
25 * Invoke: java -Djava.compiler= junit.textui.TestRunner org.xmlBlaster.test.classtest.msgstore.I_MapTest
26 * </p>
27 * @see org.xmlBlaster.engine.msgstore.I_Map
28 * @see org.xmlBlaster.engine.msgstore.ram.MapPlugin
29 * @see org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin
30 */
31 public class I_MapTest extends TestCase {
32 private String ME = "I_MapTest";
33 protected ServerScope glob;
34 private static Logger log = Logger.getLogger(I_MapTest.class.getName());
35
36 private final boolean IS_DURABLE = true;
37 private final boolean IS_TRANSIENT = false;
38
39 private I_Map currMap;
40 private int currImpl;
41 /*
42 static I_Map[] IMPL = {
43 new org.xmlBlaster.engine.msgstore.ram.MapPlugin(),
44 new org.xmlBlaster.util.queue.jdbc.JdbcQueuePlugin(),
45 new org.xmlBlaster.engine.msgstore.cache.PersistenceCachePlugin()
46 };
47 */
48 static String[] PLUGIN_TYPES = { new String("RAM"),
49 new String("JDBC"),
50 new String("CACHE") };
51
52 public I_MapTest(String name, int currImpl) {
53 super(name);
54 this.currImpl = currImpl;
55
56 String[] args = { //configure the cache
57 "-persistence.persistentQueue", "JDBC,1.0",
58 "-persistence.transientQueue", "RAM,1.0" };
59
60 this.glob = new ServerScope(args);
61
62 //this.ME = "I_MapTest[" + this.currMap.getClass().getName() + "]";
63 }
64
65 protected void setUp() {
66 try {
67 glob.getProperty().set("topic.queue.persistent.tableNamePrefix", "TEST");
68
69 String type = PLUGIN_TYPES[this.currImpl];
70 StoragePluginManager pluginManager = this.glob.getStoragePluginManager();
71 // Overwrite JDBC settings from xmlBlaster.properties
72 PluginInfo pluginInfo = new PluginInfo(glob, pluginManager, "JDBC", "1.0");
73 java.util.Properties prop = (java.util.Properties)pluginInfo.getParameters();
74 prop.put("tableNamePrefix", "TEST");
75 prop.put("entriesTableName", "_entries");
76 this.glob.getProperty().set("QueuePlugin[JDBC][1.0]", pluginInfo.dumpPluginParameters());
77
78 if (!"JDBC".equals(type))
79 pluginInfo = new PluginInfo(glob, pluginManager, type, "1.0");
80
81 MsgUnitStoreProperty storeProp = new MsgUnitStoreProperty(glob, "/node/test");
82 StorageId queueId = new StorageId("msgUnitStore", "SomeMapId");
83
84 this.currMap = pluginManager.getPlugin(pluginInfo, queueId, storeProp);
85 this.currMap.shutdown(); // to allow to initialize again
86 }
87 catch (Exception ex) {
88 log.severe("setUp: error when setting the property 'topic.queue.persistent.tableNamePrefix' to 'TEST': " + ex.getMessage());
89 }
90
91 // cleaning up the database from previous runs ...
92 /*
93 QueuePropertyBase prop = null;
94 try {
95 prop = new MsgUnitStoreProperty(glob, "/node/test");
96
97 StorageId queueId = new StorageId("msgUnitStore", "SetupMap");
98 JdbcMapPlugin jdbcMap = new JdbcMapPlugin();
99 jdbcMap.initialize(queueId, prop);
100 jdbcMap.destroy();
101 }
102 catch (Exception ex) {
103 log.severe("could not propertly set up the database: " + ex.getMessage());
104 }
105 */
106 }
107
108 private MsgUnit createMsgUnit(boolean persistent) {
109 return createMsgUnit(persistent, -1);
110 }
111
112 private MsgUnit createMsgUnit(boolean persistent, long contentLen_) {
113 try {
114 int contentLen = (int)contentLen_;
115 PublishQosServer publishQosServer = new PublishQosServer(glob, "<qos/>");
116 publishQosServer.getData().setPersistent(persistent);
117 String contentStr = "content";
118 if (contentLen >= 0) {
119 StringBuffer content = new StringBuffer(contentLen);
120 for (int i=0; i<contentLen; i++) {
121 content.append("X");
122 }
123 contentStr = content.toString();
124 }
125 return new MsgUnit(glob, "<key oid='Hi'/>", contentStr.getBytes(), publishQosServer.toXml());
126 }
127 catch (XmlBlasterException ex) {
128 fail("msgUnit not constructed: " + ex.getMessage());
129 }
130 return null;
131 }
132
133
134 /**
135 * Tests QueuePropertyBase() and getStorageId()
136 * @param queueTypeList A space separated list of names for the
137 * implementations to be tested. Valid names are:
138 * RamMapPlugin JdbcMapPlugin
139 */
140 public void testConfig() {
141 config(this.currMap);
142 }
143
144 /**
145 * Tests initialize(), getProperties(), setProperties() and capacity()
146 * @param queue !!!Is not initialized in this case!!!!
147 */
148 private void config(I_Map i_map) {
149 ME = "I_MapTest.config(" + i_map.getStorageId() + ")[" + i_map.getClass().getName() + "]";
150 System.out.println("***" + ME);
151
152 QueuePropertyBase prop1 = null;
153 QueuePropertyBase prop = null;
154 try {
155 // test initialize()
156 prop1 = new MsgUnitStoreProperty(glob, "/node/test");
157 int max = 12;
158 prop1.setMaxEntries(max);
159 prop1.setMaxEntriesCache(max);
160 assertEquals(ME+": Wrong capacity", max, prop1.getMaxEntries());
161 assertEquals(ME+": Wrong cache capacity", max, prop1.getMaxEntriesCache());
162 StorageId queueId = new StorageId("msgUnitStore", "SomeMapId");
163
164 i_map.initialize(queueId, prop1);
165 assertEquals(ME+": Wrong queue ID", queueId, i_map.getStorageId());
166
167 try {
168 prop = new MsgUnitStoreProperty(glob, "/node/test");
169 prop.setMaxEntries(99);
170 prop.setMaxEntriesCache(99);
171 i_map.setProperties(prop);
172 }
173 catch(XmlBlasterException e) {
174 fail("Changing properties failed: " + e.getMessage());
175 }
176
177 }
178 catch(XmlBlasterException e) {
179 fail(ME + ": Exception thrown: " + e.getMessage());
180 }
181
182 long len = prop.getMaxEntries();
183 assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), i_map.getMaxNumOfEntries());
184 assertEquals(ME+": Wrong capacity", prop.getMaxEntries(), ((QueuePropertyBase)i_map.getProperties()).getMaxEntries());
185 assertEquals(ME+": Wrong size", 0, i_map.getNumOfEntries());
186
187 try {
188 for (int ii=0; ii<len; ii++) {
189 i_map.put(new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId()));
190 }
191 assertEquals(ME+": Wrong total size", len, i_map.getNumOfEntries());
192
193 try {
194 MsgUnitWrapper queueEntry = new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId());
195 i_map.put(queueEntry);
196 i_map.put(queueEntry);
197 fail("Did expect an exception on overflow getMaxNumOfEntries=" + i_map.getMaxNumOfEntries() + " size=" + i_map.getNumOfEntries());
198 }
199 catch(XmlBlasterException e) {
200 log.info("SUCCESS the exception is OK: " + e.getMessage());
201 }
202
203 log.info("toXml() test:" + i_map.toXml(""));
204 log.info("usage() test:" + i_map.usage());
205
206 assertEquals(ME+": should not be shutdown", false, i_map.isShutdown());
207 i_map.shutdown();
208 assertEquals(ME+": should be shutdown", true, i_map.isShutdown());
209
210 log.info("#2 Success, filled " + i_map.getNumOfEntries() + " messages into queue");
211 System.out.println("***" + ME + " [SUCCESS]");
212 i_map.shutdown();
213 }
214 catch(XmlBlasterException e) {
215 fail(ME + ": Exception thrown: " + e.getMessage());
216 }
217 }
218
219 //------------------------------------
220 public void testPutMsg() {
221 String queueType = "unknown";
222 try {
223 QueuePropertyBase prop = new MsgUnitStoreProperty(glob, "/node/test");
224 queueType = this.currMap.toString();
225 StorageId queueId = new StorageId("msgUnitStore", "MapPlugin/putMsg");
226 this.currMap.initialize(queueId, prop);
227 this.currMap.clear();
228 assertEquals(ME + "wrong size before starting ", 0L, this.currMap.getNumOfEntries());
229 putMsg(this.currMap);
230 }
231 catch (XmlBlasterException ex) {
232 fail("Exception when testing PutMsg probably due to failed initialization of the queue of type " + queueType + ": " + ex.getMessage());
233 }
234 }
235
236 /**
237 * Tests put(MsgMapEntry[]) and put(MsgMapEntry) and clear()
238 */
239 private void putMsg(I_Map i_map) {
240 ME = "I_MapTest.putMsg(" + i_map.getStorageId() + ")[" + i_map.getClass().getName() + "]";
241 System.out.println("***" + ME);
242 try {
243 //========== Test 1: put(I_MapEntry[])
244 int numLoop = 10;
245 ArrayList list = new ArrayList();
246 for (int ii=0; ii<numLoop; ii++) {
247 MsgUnitWrapper[] queueEntries = {
248 new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId()),
249 new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId()),
250 new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId())};
251
252 for(int i=0; i<queueEntries.length; i++)
253 i_map.put(queueEntries[i]);
254
255 for (int i=0; i < 3; i++) list.add(queueEntries[i]);
256
257 this.checkSizeAndEntries(" put(I_MapEntry[]) ", list, i_map);
258 assertEquals(ME+": Wrong size", (ii+1)*queueEntries.length, i_map.getNumOfEntries());
259 }
260 int total = numLoop*3;
261 assertEquals(ME+": Wrong total size", total, i_map.getNumOfEntries());
262 log.info("#1 Success, filled " + i_map.getNumOfEntries() + " messages into queue");
263
264
265 //========== Test 2: put(I_MapEntry)
266 for (int ii=0; ii<numLoop; ii++) {
267 MsgUnitWrapper queueEntry = new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId());
268 list.add(queueEntry);
269 i_map.put(queueEntry);
270 }
271 assertEquals(ME+": Wrong total size", numLoop+total, i_map.getNumOfEntries());
272 this.checkSizeAndEntries(" put(I_MapEntry) ", list, i_map);
273 log.info("#2 Success, filled " + i_map.getNumOfEntries() + " messages into queue");
274
275 i_map.clear();
276 checkSizeAndEntries("Test 2 put()", new I_MapEntry[0], i_map);
277 assertEquals(ME+": Wrong empty size", 0L, i_map.getNumOfEntries());
278
279 System.out.println("***" + ME + " [SUCCESS]");
280 i_map.shutdown();
281 }
282 catch(XmlBlasterException e) {
283 fail(ME + ": Exception thrown: " + e.getMessage());
284 }
285 }
286
287
288 /**
289 * Tests overflow of maxNumOfBytes() of a CACHE.
290 */
291 public void testByteOverflow() {
292 I_Map i_map = this.currMap;
293 ME = "I_MapTest.testByteOverflow(" + i_map.getStorageId() + ")[" + i_map.getClass().getName() + "]";
294 System.out.println("***" + ME);
295 try {
296 StorageId storageId = new StorageId("msgUnitStore", "ByteOverflowMapId");
297 QueuePropertyBase prop = new MsgUnitStoreProperty(glob, "/node/test");
298
299 MsgUnitWrapper mu = new MsgUnitWrapper(glob, createMsgUnit(false, 0), storageId);
300 long sizeEmpty = mu.getSizeInBytes();
301
302 MsgUnitWrapper[] queueEntries = {
303 new MsgUnitWrapper(glob, createMsgUnit(false, 0), storageId),
304 new MsgUnitWrapper(glob, createMsgUnit(false, 0), storageId),
305 new MsgUnitWrapper(glob, createMsgUnit(false, 0), storageId),
306 // Each above entry has 3,311 bytes = 9,922, the next one has 9,932 bytes
307 // so when it is entered two of the above need to be swapped away
308 // as maxBytes=13,244
309 new MsgUnitWrapper(glob, createMsgUnit(false, 2*sizeEmpty-1), storageId),
310 new MsgUnitWrapper(glob, createMsgUnit(false, 0), storageId)};
311
312 final long maxBytesCache = 4*sizeEmpty;
313 prop.setMaxBytes(1000000);
314 prop.setMaxBytesCache(maxBytesCache);
315 assertEquals(ME+": Wrong capacity", 1000000, prop.getMaxBytes());
316 assertEquals(ME+": Wrong cache capacity", maxBytesCache, prop.getMaxBytesCache());
317 i_map.initialize(storageId, prop);
318 assertEquals(ME+": Wrong queue ID", storageId, i_map.getStorageId());
319
320 long numOfBytes = 0;
321 for(int i=0; i<queueEntries.length; i++) {
322 i_map.put(queueEntries[i]);
323 numOfBytes += queueEntries[i].getSizeInBytes();
324 }
325
326 assertEquals(ME+": Wrong size", queueEntries.length, i_map.getNumOfEntries());
327 assertEquals(ME+": Wrong bytes", numOfBytes, i_map.getNumOfBytes());
328
329 System.out.println("***" + ME + " [SUCCESS]");
330 i_map.clear();
331 i_map.shutdown();
332 }
333 catch(XmlBlasterException e) {
334 log.severe("Exception thrown: " + e.getMessage());
335 fail(ME + ": Exception thrown: " + e.getMessage());
336 }
337 }
338
339
340 //------------------------------------
341 public void testGetMsg() {
342
343 String queueType = "unknown";
344 try {
345 QueuePropertyBase prop = new MsgUnitStoreProperty(glob, "/node/test");
346 queueType = this.currMap.toString();
347 StorageId queueId = new StorageId("msgUnitStore", "MapPlugin/getMsg");
348 this.currMap.initialize(queueId, prop);
349 this.currMap.clear();
350 assertEquals(ME + "wrong size before starting ", 0, this.currMap.getNumOfEntries());
351 getMsg(this.currMap);
352 }
353 catch (XmlBlasterException ex) {
354 log.severe("Exception when testing getMsg probably due to failed initialization of the queue " + queueType + ": " + ex.getMessage());
355 }
356
357 }
358
359 /**
360 * Tests get() and get(int num) and remove()
361 * For a discussion of the sorting order see Javadoc of this class
362 */
363 private void getMsg(I_Map i_map) {
364 ME = "I_MapTest.getMsg(" + i_map.getStorageId() + ")[" + i_map.getClass().getName() + "]";
365 System.out.println("***" + ME);
366 try {
367 //========== Test 1: get()
368 {
369 MsgUnitWrapper[] queueEntries = {
370 new MsgUnitWrapper(glob, createMsgUnit(false), i_map.getStorageId()),
371 new MsgUnitWrapper(glob, createMsgUnit(true), i_map.getStorageId()),
372 new MsgUnitWrapper(glob, createMsgUnit(true), i_map.getStorageId())
373 };
374 for(int i=0; i<queueEntries.length; i++) {
375 i_map.put(queueEntries[i]);
376 log.info("#" + i + " id=" + queueEntries[i].getUniqueId() + " numSizeBytes()=" + queueEntries[i].getSizeInBytes());
377 }
378 log.info("storage bytes sum=" + i_map.getNumOfBytes() + " with persistent bytes=" + i_map.getNumOfPersistentBytes());
379
380 assertEquals("", 3, i_map.getNumOfEntries());
381 assertEquals("", 2, i_map.getNumOfPersistentEntries());
382
383 for (int ii=0; ii<10; ii++) {
384 I_MapEntry result = i_map.get(queueEntries[0].getUniqueId());
385 assertTrue("Missing entry", result != null);
386 assertEquals(ME+": Wrong result", queueEntries[0].getUniqueId(), result.getUniqueId());
387
388 result = i_map.get(queueEntries[1].getUniqueId());
389 assertTrue("Missing entry", result != null);
390 assertEquals(ME+": Wrong result", queueEntries[1].getUniqueId(), result.getUniqueId());
391
392 result = i_map.get(queueEntries[2].getUniqueId());
393 assertTrue("Missing entry", result != null);
394 assertEquals(ME+": Wrong result", queueEntries[2].getUniqueId(), result.getUniqueId());
395 }
396 assertEquals("", 3, i_map.getNumOfEntries());
397 assertEquals("", 2, i_map.getNumOfPersistentEntries());
398
399 log.info("storage before remove [0], bytes sum=" + i_map.getNumOfBytes() + " with persistent bytes=" + i_map.getNumOfPersistentBytes());
400 i_map.remove(queueEntries[0]); // Remove one
401 log.info("storage after remove [0], bytes sum=" + i_map.getNumOfBytes() + " with persistent bytes=" + i_map.getNumOfPersistentBytes());
402 ArrayList list = new ArrayList();
403 list.add(queueEntries[1]);
404 list.add(queueEntries[2]);
405 this.checkSizeAndEntries(" getMsg() ", list, i_map);
406
407 for (int ii=0; ii<10; ii++) {
408 I_MapEntry result = i_map.get(queueEntries[1].getUniqueId());
409 assertTrue("Missing entry", result != null);
410 assertEquals(ME+": Wrong result", queueEntries[1].getUniqueId(), result.getUniqueId());
411 }
412 i_map.remove(queueEntries[1].getUniqueId()); // Remove one
413 assertEquals("", 1, i_map.getNumOfEntries());
414 assertEquals("", 1, i_map.getNumOfPersistentEntries());
415
416 for (int ii=0; ii<10; ii++) {
417 I_MapEntry result = i_map.get(queueEntries[2].getUniqueId());
418 assertTrue("Missing entry", result != null);
419 assertEquals(ME+": Wrong result", queueEntries[2].getUniqueId(), result.getUniqueId());
420 }
421 i_map.remove(queueEntries[2]); // Remove one
422 for (int ii=0; ii<10; ii++) {
423 I_MapEntry result = i_map.get(queueEntries[0].getUniqueId());
424 assertTrue("Unexpected entry", result == null);
425 }
426 assertEquals("", 0, i_map.getNumOfEntries());
427 assertEquals("", 0, i_map.getNumOfPersistentEntries());
428 log.info("#1 Success, get()");
429 }
430
431 System.out.println("***" + ME + " [SUCCESS]");
432 i_map.clear();
433 i_map.shutdown();
434 }
435 catch(XmlBlasterException e) {
436 e.printStackTrace();
437 fail(ME + ": Exception thrown: " + e.getMessage());
438 }
439 }
440
441
442
443 //------------------------------------
444 public void testGetAllMsgs() {
445
446 String queueType = "unknown";
447 try {
448 QueuePropertyBase prop = new MsgUnitStoreProperty(glob, "/node/test");
449 queueType = this.currMap.toString();
450 StorageId queueId = new StorageId("msgUnitStore", "MapPlugin/getAllMsgs");
451 this.currMap.initialize(queueId, prop);
452 this.currMap.clear();
453 assertEquals(ME + "wrong size before starting ", 0, this.currMap.getNumOfEntries());
454 getAllMsgs(this.currMap);
455 }
456 catch (XmlBlasterException ex) {
457 log.severe("Exception when testing getAllMsgs probably due to failed initialization of the queue " + queueType + ": " + ex.getMessage());
458 }
459
460 }
461
462 /**
463 * Tests get() and get(int num) and remove()
464 * NOTE: Currently the MapPlugin returns getAll() sorted (it uses a TreeMap)
465 * But we haven't yet forced this in the I_Map#getAll() Javadoc!
466 * This test assumes sorting order and needs to be changed if we once
467 * decide to specify the exact behaviour in I_Map#getAll() javadoc
468 */
469 private void getAllMsgs(I_Map i_map) {
470 ME = "I_MapTest.getAllMsgs(" + i_map.getStorageId() + ")[" + i_map.getClass().getName() + "]";
471 System.out.println("***" + ME);
472 try {