XmlBlaster Logo

REQUIREMENT

client.c.queue

XmlBlaster Logo


Type NEW
Priority MEDIUM
Status CLOSED
Topic XmlBlaster provides a persistent queue implementation for ANSI C
Des
cription

We use the relational database SQLite3 for our queue implementation.

Persistent Queue Features

  • Support on many OS like Windows, Windows-CE / Pocket PC, UNIX, Linux and the code is easy portable to embedded devices
  • Compiles with C or with C++ compiler
  • The access library and the callback library is thread safe (no global variables).
  • The code is tested on memory leaks with valgrind and partially statically checked with flawfinder.
  • Can be used standalone outside of xmlBlaster

C-API

The programming interface from C is struct I_QueueStruct declared in the file QueueInterface.h.

Here is a usage code sample:

ExceptionStruct exception;
QueueEntryArr *entries = 0;
QueueProperties queueProperties;
I_Queue *queueP = 0;
const char *dbName = "xmlBlasterClient-C.db"
const *dummy = "something";

...
  /* Configure the queue */
  memset(&queueProperties, 0, sizeof QueueProperties);
  strncpy(queueProperties.dbName, dbName, QUEUE_DBNAME_MAX);
  strncpy(queueProperties.queueName = "connection_clientJoe", QUEUE_ID_MAX);
  strncpy(queueProperties.tablePrefix="XB_", QUEUE_PREFIX_MAX);
  queueProperties.maxNumOfEntries = 10000000L;
  queueProperties.maxNumOfBytes = 1000000000LL;
  queueProperties.logFp = loggingFp;
  queueProperties.logLevel = XMLBLASTER_LOG_TRACE;
  queueProperties.userObject = (void *)dummy;

  /* Create the queue */
  queueP = createQueue(&queueProperties, &exception);

   
  /* Create an entry */
  QueueEntry queueEntry;
  memset(&queueEntry, 0, sizeof (QueueEntry));

  queueEntry.priority = 5;
  queueEntry.isPersistent = true;
  queueEntry.uniqueId = 1081492136826000000ll;
  strncpy(queueEntry.embeddedType = "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);

  queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
  queueEntry.embeddedBlob.data = (char *)"some blob";
  queueEntry.embeddedBlob.dataLen = strlen(queueEntry.embeddedBlob.data);

  /* Fill the queue */
  queueP->put(queueP, &queueEntry, &exception);

  /* Access the entry */
  entries = queueP->peekWithSamePriority(queueP, -1, -1, &exception);

  /* close the queue */
  queueP->shutdown(&queueP, &exception);
     

For a more complete usage example please see the test cases in TestQueue.c

Setup

To use the persistent queue you need to download SQLite3 and compile it for your platform, our C compilation needs sqlite3.h for compilation and libsqlite3.so for linking (on Windows it is a dll). It is recommended to use at least version 3.6.14 of sqlite to prevent flaws, but older versions should also work. There is also the possibility to use the deprecated SQLite 2 interface.

Compilation with ant

First add those lines to your xmlBlaster/build.properties (or add it to HOME/build.properties which has precedence or add it on command line with a leading -D option which is strongest). If you prefere SQLite2 over SQLite3, use XMLBLASTER_PERSISTENT_QUEUE=1 instead.

# UNIX example:
XMLBLASTER_PERSISTENT_QUEUE_SQLITE3=1
sqlite.include.dir=/opt/sqlite3-bin/include
sqlite.lib.dir=/opt/sqlite3-bin/lib
     

This expects /opt/sqlite3-bin/include/sqlite3.h and /opt/sqlite-bin/lib/libsqlite3.so* for the above UNIX example. (in SQLite2, the files are called sqlite.h and libsqlite.so* respectively.

NOTE: For Windows you can download the ready compiled sqlite library (see http://sqlite.org/download.html). Creation of an import library from the zip file for MS Visual C++ is achieved by the following command:

LIB /DEF:sqlite3.def
     

This creates the files sqlite3.lib and sqlite3.exp files. The sqlite3.lib can then be used to link your programs against the SQLite3 DLL.

# Windows example:
XMLBLASTER_PERSISTENT_QUEUE_SQLITE3=1
sqlite.include.dir=C:/sqlite3
sqlite.lib.dir=C:/sqlite3
     

and compile it:

build c-delete c
     
or similiar (with verbose switched on):
build -verbose -DXMLBLASTER_PERSISTENT_QUEUE_SQLITE3=1 -Dsqlite.include.dir= ... c
     

Compilation manually

Here are some complete examples how to compile a simple C client 'manually':

cd xmlBlaster/src/c/util/queue

Linux C:
 export LD_LIBRARY_PATH=/opt/sqlite3-bin/lib
 gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLite3Queue SQLite3Queue.c
     ../helper.c -I/opt/sqlite3-bin/include -L/opt/sqlite3-bin/lib -lsqlite3

Linux C++:

Mac OSX:
 export DYLD_LIBRARY_PATH=/opt/sqlite3-bin/lib

Windows:

Solaris:
     
Example
C

C usage examples

See the test file xmlBlaster/testsuite/src/c/TestQueue.c.

Example
any

sqlite3 command line example

From time to time you may want to look into the C client queue, here are some command line examples:

sqlite xmlBlasterClientCpp.db


sqlite> .schema XB_ENTRIES
CREATE TABLE XB_ENTRIES (dataId bigint , queueName text , prio integer, flag text,
             durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));
CREATE INDEX XB_ENTRIES_IDX ON XB_ENTRIES (prio);


sqlite> .mode lines


sqlite> select * from xb_entries;
   dataId = 1097512282290000000
queueName = connection_clientPublisher1
     prio = 5
     flag = MSG_RAW|publish
  durable = T
 byteSize = 302
     blob = ? ?;pnr= ??;rs`sd?hcNJ.= ...


sqlite> select dataId, queueName, prio, flag, durable, byteSize from xb_entries;
   dataId = 1097512282290000000
queueName = connection_clientPublisher1
     prio = 5
     flag = MSG_RAW|publish
  durable = T
 byteSize = 302

   dataId = 1097512331701000000
queueName = connection_clientPublisher1
     prio = 5
     flag = MSG_RAW|publish
  durable = T
 byteSize = 257


sqlite> .quit
      
Configure

These configurations are tested:

No OS Compiler xml Blaster Vers. Thread library Date Author Comment
1 Linux 2.4.21 g++ and gcc 3.3.1 0.9+ pthread 2004-04-09 Marcel build -DXMLBLASTER_PERSISTENT_QUEUE=1 c-delete c
2 Linux 2.4.21 Intel(R) C++ Compiler for 32-bit, Version 8.0 (icc) 0.9+ pthread 2004-04-22 Marcel build -Duse-icc=true -DXMLBLASTER_PERSISTENT_QUEUE=1 c-delete c
3 Linux 2.6.29 gcc 4.3.3 1.6.4+ pthread 2009-05-19 Adrian build -DXMLBLASTER_PERSISTENT_QUEUE_SQLITE3=1 c-delete c
4 WindowsXP VC++ 7 (Jan.2003) 0.901+ pthreads-win32 2004-05-06 Marcel build -Duse-msvc=true -DXMLBLASTER_PERSISTENT_QUEUE=1 c-delete c
5 Windows Vista VC++ 9 1.6.4+ pthreads-win32 2009-05-19 Adrian build -Duse-msvc=true -DXMLBLASTER_PERSISTENT_QUEUE_SQLITE3=1 c-delete c

These parameters allow to configure the C-client on command line or over the environment (with lower priority):

Property Default / Example Description Impl
-queue/connection/url /var/xmlBlasterClient.db The location and file name of the database (for SQLite) yes
-queue/connection/tablePrefix XB_ The prefix used for database table names yes
-queue/connection/maxEntries 2147483647 The maximum allowed number of messages yes
-queue/connection/maxBytes 2147483647 The maximum allowed bytes of all messages in the queue yes

Testing

The testsuite resides in xmlBlaster/testsuite/src/c to compile it use ant:

cd xmlBlaster
build  -DXMLBLASTER_PERSISTENT_QUEUE_SQLITE3=1 c-test
         

To run the tests invoke

TestQueue
        

NOTE: Configuration parameters are specified on command line (-someValue 17) or in the xmlBlaster.properties file (someValue=17). See requirement "util.property" for details.
Columns named Impl tells you if the feature is implemented.
Columns named Hot tells you if the configuration is changeable in hot operation.

Todo

The persistent queue implementation is finished and tested, but we need to:

  • Embed the queue into C client library (see first try in XmlBlasterConnectionUnparsed.c)
  • Deliver sqlite library (.dll,.so,.sl) in directory xmlBlaster/lib at least for Linux/Win for ease of use?
  • Adding mutex to every function call?
    Currently you need to add your own mutex to the function calls to support multi threaded access.
  • Improve BLOB performance and max BLOB size
    The message itself is serialized (see SOCKET protocol) and stored as a BLOB in the database. SQLite has limited BLOB support, the default max size is 1 MB which can be extended to 16 MB. Further, BLOB access is very slow, here is a statement from greg@ag-software.com: "Suppose you have a 1MB row in SQLite and you want to read the whole thing. SQLite must first ask for the 1st 1K page and wait for it to be retrieved. Then it asks for the 2nd 1K page and waits for it. And so forth for all 1000+ pages. If each page retrieval requires 1 rotation of the disk, that's about 8.5 seconds on a 7200RPM disk drive."

See CODE util/queue/QueueInterface.h
See TestQueue.c
See http://www.sqlite.org/
See SQLite performance
See SQLite compile
See REQ Queue dump formating identical to SOCKET serialization
See REQ client.cpp
See REQ client.cpp.queue
See TEST TestQueue

This page is generated from the requirement XML file xmlBlaster/doc/requirements/client.c.queue.xml

Back to overview