1 /*--UNFINISHED SEE TODOS--------------------------------------------------------------------------
   2 Name:      SQLite3Queue.c
   3 Project:   xmlBlaster.org
   4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
   5 Comment:   A persistent queue implementation based on the SQLite relational database
   6            Depends only on I_Queue.h and ../helper.c and ../helper.h (which includes basicDefs.h)
   7            and can easily be used outside of xmlBlaster.
   8            Further you need sqlite.h and the sqlite library (dll,so,sl)
   9 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info> 's brother
  10 Date:      04/2004
  11 Compile:   Compiles at least on Windows, Linux, Solaris. Further porting should be simple.
  12            Needs pthread.h but not the pthread library (for exact times)
  13 
  14             export LD_LIBRARY_PATH=/opt/sqlite-bin/lib
  15             gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLite3Queue SQLiteQueue.c ../helper.c -I/opt/sqlite-bin/include -L/opt/sqlite-bin/lib -lsqlite3
  16             (use optionally  -ansi -pedantic -Wno-long-long
  17             (Intel C: icc -wd981 ...)
  18 
  19            Compile inside xmlBlaster:
  20             build -DXMLBLASTER_PERSISTENT_QUEUE=true c-delete c
  21            expects xmlBlaster/src/c/util/queue/sqlite.h and xmlBlaster/lib/libsqlite.so
  22 
  23            Testcompile on Windows
  24 
  25                                 create sqlite3.lib from sqlite3.def via:
  26                                  lib /DEF:sqlite3.def
  27 
  28            ( /I\c\sqlite3 says where sqlite3.h resides ):
  29                           cl /MD /DQUEUE_MAIN /DDLL_IGNORE /DXB_NO_PTHREADS /DSQLITE3=1 /D_WINDOWS /I\c\sqlite3 /I..\.. Sqlite3Queue.c ..\helper.c /link \pialibs\sqlite3.lib
  30 
  31 Table layout XB_ENTRIES:
  32            dataId bigint
  33            queueName text
  34            prio integer
  35            flag text
  36            durable char(1)
  37            byteSize bigint
  38            blob bytea
  39            PRIMARY KEY (dataId, queueName)
  40 
  41 Todo:      Tuning:
  42             - Add prio to PRIMARY KEY
  43             - In persistentQueuePeekWithSamePriority() add queueName to statement as it never changes
  44 
  45 @see:      http://www.sqlite.org/
  46 @see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.c.queue.html
  47 @see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.html
  48 @see:      http://www.sqlite.org/threadsafe.html sqlite3 default is thread-safe (serialized)
  49 Testsuite: xmlBlaster/testsuite/src/c/TestQueue.c
  50 -----------------------------------------------------------------------------*/
  51 #include <stdio.h>
  52 #include <string.h>
  53 #include <malloc.h>
  54 #if !defined(_WINDOWS)
  55 # include <unistd.h>   /* unlink() */
  56 # include <errno.h>    /* unlink() */
  57 #endif
  58 #include "util/queue/QueueInterface.h"
  59 
  60 /*#ifdef QUEUE_MAIN
  61 # ifdef Dll_Export
  62 #  undef Dll_Export
  63 # endif
  64 # define Dll_Export
  65 #endif*/
  66 
  67 # include "sqlite3.h"
  68 static void xb_sqlite_free(char * pdata) { sqlite3_free(pdata); }
  69 
  70 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception);
  71 static const QueueProperties *getProperties(I_Queue *queueP);
  72 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception);
  73 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception);
  74 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception);
  75 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception);
  76 static int32_t getNumOfEntries(I_Queue *queueP);
  77 static int32_t getMaxNumOfEntries(I_Queue *queueP);
  78 static int64_t getNumOfBytes(I_Queue *queueP);
  79 static int64_t getMaxNumOfBytes(I_Queue *queueP);
  80 static bool persistentQueueEmpty(I_Queue *queueP);
  81 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception);
  82 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception);
  83 static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception);
  84 static bool createTables(I_Queue *queueP, ExceptionStruct *exception);
  85 static bool execSilent(I_Queue *queueP, const char *sqlStatement, const char *comment, ExceptionStruct *exception);
  86 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, sqlite3_stmt **ppVm, const char *queryString, ExceptionStruct *exception);
  87 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception);
  88 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception);
  89 static void freeQueueEntryData(QueueEntry *queueEntry);
  90 
  91 /* For manual error checking */
  92 static const char *errLink = "http://www.sqlite.org/c3ref/c_abort.html";
  93 
  94 /* The tmp_hlp_struct; is needed because a forward declaration of an anonymous struct is not possible. */
  95 struct TmpHelper;
  96 typedef struct TmpHelper TmpHelper;
  97 typedef bool ( * ParseDataFp)(I_Queue *queueP, size_t currIndex, TmpHelper *helper, sqlite3_stmt *pVm, ExceptionStruct *exception);
  98 /**
  99  * Used temporary to shorten arglists.
 100  */
 101 struct TmpHelper{
 102    QueueEntryArr **queueEntryArrPP;
 103    int32_t currEntries;
 104    int64_t currBytes;
 105    int32_t maxNumOfEntries; /** The max wanted number of entries for this peek() */
 106    int64_t maxNumOfBytes;   /** The max wanted bytes during peek() */
 107    ParseDataFp parseDataFp;
 108 };
 109 
 110 static int32_t getResultRows(I_Queue *queueP, const char *methodName, sqlite3_stmt *pVm, TmpHelper *helper, bool finalize, ExceptionStruct *exception);
 111 /* Shortcut for:
 112     if (queueP && queueP->log) queueP->log(queueP, XMLBLASTER_LOG_TRACE, XMLBLASTER_LOG_TRACE, __FILE__, "Persistent queue is created");
 113    is
 114     LOG __FILE__, "Persistent queue is created");
 115 */
 116 #define LOG if (queueP && queueP->log) queueP->log(queueP, queueP->logLevel, XMLBLASTER_LOG_TRACE,
 117 
 118 #define LEN512 512  /* ISO C90 forbids variable-size array: const int LEN512=512; */
 119 #define LEN256 256  /* ISO C90 forbids variable-size array: const int LEN256=256; */
 120 
 121 #define DBNAME_MAX 128
 122 #define ID_MAX 256
 123 
 124 
 125 /**
 126  * Holds Prepared statements for better performance.
 127  * @see http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html
 128  */
 129 typedef struct DbInfoStruct {
 130    QueueProperties prop;         /** Meta information */
 131    size_t numOfEntries;          /** Cache for current number of entries */
 132    int64_t numOfBytes;           /** Cache for current number of bytes */
 133    sqlite3 *db;                   /** Database handle for SQLite */
 134    sqlite3_stmt *pVm_put;           /** SQLite virtual machine to hold a prepared query */
 135    sqlite3_stmt *pVm_peekWithSamePriority;
 136    sqlite3_stmt *pVm_fillCache;
 137 } DbInfo;
 138 
 139 static char int64Str_[INT64_STRLEN_MAX];
 140 static char * const int64Str = int64Str_;   /* to make the pointer address const */
 141 
 142 /** Column index into XB_ENTRIES table */
 143 enum {
 144    XB_ENTRIES_DATA_ID = 0,
 145    XB_ENTRIES_QUEUE_NAME,
 146    XB_ENTRIES_PRIO,
 147    XB_ENTRIES_TYPE_NAME,
 148    XB_ENTRIES_PERSISTENT,
 149    XB_ENTRIES_SIZE_IN_BYTES,
 150    XB_ENTRIES_BLOB
 151 };
 152 
 153 
 154 /**
 155  * Create a new persistent queue instance.
 156  * <br />
 157  * @return NULL if bootstrapping failed. If not NULL you need to free() it when you are done
 158  *         usually by calling shutdown().
 159  * @throws exception
 160  */
 161 Dll_Export I_Queue *createQueue(const QueueProperties* queueProperties, ExceptionStruct *exception)
 162 {
 163    bool stateOk = true;
 164    I_Queue *queueP = (I_Queue *)calloc(1, sizeof(I_Queue));
 165    if (queueP == 0) return queueP;
 166    queueP->isInitialized = false;
 167    queueP->initialize = persistentQueueInitialize;
 168    queueP->getProperties = getProperties;
 169    queueP->put = persistentQueuePut;
 170    queueP->peekWithSamePriority = persistentQueuePeekWithSamePriority;
 171    queueP->randomRemove = persistentQueueRandomRemove;
 172    queueP->clear = persistentQueueClear;
 173    queueP->getNumOfEntries = getNumOfEntries;
 174    queueP->getMaxNumOfEntries = getMaxNumOfEntries;
 175    queueP->getNumOfBytes = getNumOfBytes;
 176    queueP->getMaxNumOfBytes = getMaxNumOfBytes;
 177    queueP->empty = persistentQueueEmpty;
 178    queueP->shutdown = persistentQueueShutdown;
 179    queueP->destroy = persistentQueueDestroy;
 180    queueP->privateObject = calloc(1, sizeof(DbInfo));
 181    {
 182       DbInfo *dbInfo = (DbInfo *)queueP->privateObject;
 183       dbInfo->numOfEntries = -1;
 184       dbInfo->numOfBytes = -1;
 185    }
 186    stateOk = queueP->initialize(queueP, queueProperties, exception);
 187    if (stateOk) {
 188       LOG __FILE__, "Persistent queue SQLite version " SQLITE_VERSION " is created");
 189    }
 190    else {
 191       ExceptionStruct ex;
 192       queueP->shutdown(&queueP, &ex);
 193       if (*ex.errorCode != 0) {
 194          embedException(exception, ex.errorCode, ex.message, exception);
 195       }
 196       queueP = 0;
 197    }
 198    return queueP;
 199 }
 200 
 201 /** Access the DB handle, queueP pointer is not checked */
 202 static _INLINE_FUNC DbInfo *getDbInfo(I_Queue *queueP) {
 203    return (queueP==0) ? 0 : (DbInfo *)(queueP->privateObject);
 204 }
 205 
 206 /**
 207  * Access the queue configuration.
 208  * @param queueP The this pointer
 209  * @return Read only access, 0 on error
 210  */
 211 static const QueueProperties *getProperties(I_Queue *queueP)
 212 {
 213    ExceptionStruct exception;
 214    if (checkArgs(queueP, "getProperties", false, &exception) == false ) return 0;
 215    return &getDbInfo(queueP)->prop;
 216 }
 217 
 218 /**
 219  */
 220 static void freeQueue(I_Queue **queuePP)
 221 {
 222    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
 223    if (queueP == 0) {
 224       fprintf(stderr, "[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to freeQueue()\n", __FILE__, __LINE__);
 225       return;
 226    }
 227 
 228    LOG __FILE__, "freeQueue() called");
 229 
 230    if (queueP->privateObject) {
 231       free(queueP->privateObject);
 232       queueP->privateObject = 0;
 233    }
 234 
 235    free(queueP);
 236    *queuePP = 0;
 237 }
 238 
 239 /**
 240  * Called internally by createQueue().
 241  * @param queueP The this pointer
 242  * @param queueProperties The configuration
 243  * @param exception Can contain error information (out parameter)
 244  * @return true on success
 245  */
 246 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception)
 247 {
 248    char *errMsg = 0;
 249    bool retOk;
 250    sqlite3 *db = 0;
 251    DbInfo *dbInfo = 0;
 252 
 253    if (checkArgs(queueP, "initialize", false, exception) == false ) return false;
 254    if (queueProperties == 0) {
 255       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 256       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 257                "[%.100s:%d] Please provide a valid QueueProperties pointer to initialize()", __FILE__, __LINE__);
 258       /* LOG __FILE__, "%s: %s", exception->errorCode, exception->message); */
 259       fprintf(stderr, "[%s:%d] %s: %s", __FILE__, __LINE__, exception->errorCode, exception->message);
 260       return false;
 261    }
 262 
 263    queueP->log = queueProperties->logFp;
 264    queueP->logLevel = queueProperties->logLevel;
 265    queueP->userObject = queueProperties->userObject;
 266 
 267    if (*queueProperties->dbName == 0 || *queueProperties->queueName == 0 ||
 268        queueProperties->maxNumOfEntries == 0 || queueProperties->maxNumOfBytes == 0) {
 269       char dbName[QUEUE_DBNAME_MAX];
 270       char queueName[QUEUE_ID_MAX];
 271       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 272       if (queueProperties->dbName == 0)
 273          strncpy0(dbName, "NULL", QUEUE_DBNAME_MAX);
 274       else
 275          strncpy0(dbName, queueProperties->dbName, QUEUE_DBNAME_MAX);
 276       if (queueProperties->queueName == 0)
 277          strncpy0(queueName, "NULL", QUEUE_ID_MAX);
 278       else
 279          strncpy0(queueName, queueProperties->queueName, QUEUE_ID_MAX);
 280       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 281                "[%.100s:%d] Please provide a proper initialized QueueProperties pointer to initialize(): dbName='%s', queueName='%s',"
 282                " maxNumOfEntries=%ld, maxNumOfBytes=%ld", __FILE__, __LINE__,
 283                dbName, queueName, (long)queueProperties->maxNumOfEntries, (long)queueProperties->maxNumOfBytes);
 284       LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
 285       return false;
 286    }
 287 
 288    dbInfo = getDbInfo(queueP);
 289    memcpy(&dbInfo->prop, queueProperties, sizeof(QueueProperties));
 290 
 291    /* Never trust a queue property you haven't overflowed yourself :-) */
 292    dbInfo->prop.dbName[QUEUE_DBNAME_MAX-1] = 0;
 293    dbInfo->prop.queueName[QUEUE_ID_MAX-1] = 0;
 294    dbInfo->prop.tablePrefix[QUEUE_PREFIX_MAX-1] = 0;
 295 
 296    LOG __FILE__, "dbName          = %s", dbInfo->prop.dbName);
 297    LOG __FILE__, "queueName       = %s", dbInfo->prop.queueName);
 298    LOG __FILE__, "tablePrefix     = %s", dbInfo->prop.tablePrefix);
 299    LOG __FILE__, "maxNumOfEntries = %ld",dbInfo->prop.maxNumOfEntries);
 300    LOG __FILE__, "maxNumOfBytes   = %ld",(long)dbInfo->prop.maxNumOfBytes);
 301    /*LOG __FILE__, "logFp           = %d", (int)dbInfo->prop.logFp);*/
 302    LOG __FILE__, "logLevel        = %d", (int)dbInfo->prop.logLevel);
 303    /*LOG __FILE__, "userObject      = %d", (void*)dbInfo->prop.userObject);*/
 304 
 305    if (sqlite3_open_v2(dbInfo->prop.dbName, &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX, 0) != SQLITE_OK || db == 0) {
 306       queueP->isInitialized = false;
 307       if(queueP->log) {
 308          if (errMsg) {
 309             LOG __FILE__, "%s", errMsg);
 310          }
 311          else {
 312             LOG __FILE__, "Unable to open database '%s'", dbInfo->prop.dbName);
 313          }
 314       }
 315       else {
 316         if (errMsg)
 317            fprintf(stderr,"[%s] %s\n", __FILE__, errMsg);
 318         else
 319            fprintf(stderr,"[%s] Unable to open database %s\n", __FILE__, dbInfo->prop.dbName);
 320       }
 321       strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 322       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 323                "[%.100s:%d] Creating SQLiteQueue '%s' failed: %s", __FILE__, __LINE__, dbInfo->prop.dbName, (errMsg==0)?"":errMsg);
 324       if (errMsg != 0) xb_sqlite_free(errMsg);
 325       return false;
 326    }
 327 
 328    dbInfo->db = db;
 329    queueP->isInitialized = true;
 330 
 331    retOk = createTables(queueP, exception);
 332 
 333    fillCache(queueP, exception);
 334 
 335    LOG __FILE__, "initialize(%s) %s", dbInfo->prop.dbName, retOk?"successful":"failed");
 336    return true;
 337 }
 338 
 339 /**
 340  * Create the necessary DB table if not already existing.
 341  * @param queueP
 342  * @param exception Can contain error information (out parameter)
 343  * @return true on success
 344  */
 345 static bool createTables(I_Queue *queueP, ExceptionStruct *exception)
 346 {
 347    char queryString[LEN512];
 348    bool retOk;
 349    const char *tablePrefix = ((DbInfo *)(queueP->privateObject))->prop.tablePrefix;
 350 
 351    SNPRINTF(queryString, LEN512, "CREATE TABLE %.20sENTRIES (dataId bigint , queueName text , prio integer, flag text, durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));",
 352            tablePrefix);
 353    retOk = execSilent(queueP, queryString, "Creating ENTRIES table", exception);
 354 
 355    SNPRINTF(queryString, LEN512, "CREATE INDEX %.20sENTRIES_IDX ON %.20sENTRIES (prio);",
 356            tablePrefix, tablePrefix);
 357    retOk = execSilent(queueP, queryString, "Creating PRIO index", exception);
 358    return retOk;
 359 }
 360 
 361 /**
 362  * Invoke SQL query.
 363  * @param queueP Is not checked, must not be 0
 364  * @param queryString The SQL to execute
 365  * @param comment For logging or exception text
 366  * @param exception Can contain error information (out parameter)
 367  * @return true on success
 368  */
 369 static bool execSilent(I_Queue *queueP, const char *queryString, const char *comment, ExceptionStruct *exception)
 370 {
 371    int rc = 0;
 372    char *errMsg = 0;
 373    bool retOk;
 374    DbInfo *dbInfo = getDbInfo(queueP);
 375 
 376    rc = sqlite3_exec(dbInfo->db, queryString, NULL, NULL, &errMsg);
 377    switch (rc) {
 378       case SQLITE_OK:
 379          LOG __FILE__, "SQL '%s' success", comment);
 380          retOk = true;
 381          break;
 382       default:
 383          if (errMsg && strstr(errMsg, "already exists")) {
 384             LOG __FILE__, "OK, '%s' [%d]: %s", comment, rc, (errMsg==0)?"":errMsg);
 385             retOk = true;
 386          }
 387          else if (rc == SQLITE_CONSTRAINT && errMsg && strstr(errMsg, " not unique")) {
 388             LOG __FILE__, "OK, '%s' entry existed already [%d]: %s %s", comment, rc, (errMsg==0)?"":errMsg);
 389             retOk = true;
 390          }
 391          else {
 392             LOG __FILE__, "SQL error '%s' [%d]: %s %s", comment, rc, (errMsg==0)?"":errMsg);
 393             strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 394             SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 395                      "[%.100s:%d] SQL error '%s' [%d]: %s", __FILE__, __LINE__, comment, rc, (errMsg==0)?"":errMsg);
 396             retOk = false;
 397          }
 398          break;
 399    }
 400    if (errMsg != 0) xb_sqlite_free(errMsg);
 401    return retOk;
 402 }
 403 
 404 /**
 405  * @param queueP The queue instance
 406  * @param queueEntry The entry
 407  * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 408  */
 409 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception)
 410 {
 411    int rc = 0;
 412    bool stateOk = true;
 413    DbInfo *dbInfo;
 414    char embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN]; /* To protect against buffer overflow */
 415 
 416    if (checkArgs(queueP, "put", true, exception) == false ) return;
 417    if (queueEntry == 0) {
 418       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 419       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 420                "[%.100s:%d] Please provide a valid queueEntry pointer to function put()", __FILE__, __LINE__);
 421       return;
 422    }
 423    if (queueEntry->uniqueId == 0) {
 424       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 425       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 426                "[%.100s:%d] Please provide a valid queueEntry->uniqueId to function put()", __FILE__, __LINE__);
 427       return;
 428    }
 429    if (*queueEntry->embeddedType == 0) {
 430       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 431       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 432                "[%.100s:%d] Please provide a valid queueEntry->embeddedType to function put()", __FILE__, __LINE__);
 433       return;
 434    }
 435    strncpy0(embeddedType, queueEntry->embeddedType, QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
 436 
 437    if (queueEntry->embeddedBlob.dataLen > 0 && queueEntry->embeddedBlob.data == 0) {
 438       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 439       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 440                "[%.100s:%d] Please provide a valid queueEntry->embeddedBlob to function put()", __FILE__, __LINE__);
 441       return;
 442    }
 443 
 444    dbInfo = getDbInfo(queueP);
 445 
 446    if (dbInfo->numOfEntries >= (size_t)dbInfo->prop.maxNumOfEntries) {
 447       strncpy0(exception->errorCode, "resource.overflow.queue.entries", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 448       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 449                "[%.100s:%d] The maximum number of queue entries = %d is exhausted", __FILE__, __LINE__, dbInfo->prop.maxNumOfEntries);
 450       return;
 451    }
 452    if (dbInfo->numOfBytes >= dbInfo->prop.maxNumOfBytes) {
 453       strncpy0(exception->errorCode, "resource.overflow.queue.bytes", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 454       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 455                "[%.100s:%d] The maximum queue size of %s bytes is exhausted", __FILE__, __LINE__, int64ToStr(int64Str, dbInfo->prop.maxNumOfBytes));
 456       return;
 457    }
 458 
 459 
 460    if (dbInfo->pVm_put == 0) {  /* Compile prepared query only once */
 461       char queryString[LEN256];    /*INSERT INTO XB_ENTRIES VALUES ( 1081317015888000000, 'xmlBlaster_192_168_1_4_3412', 'topicStore_xmlBlaster_192_168_1_4_3412', 5, 'TOPIC_XML', 'T', 670, '\\254...')*/
 462       SNPRINTF(queryString, LEN256, "INSERT INTO %.20sENTRIES VALUES ( ?, ?, ?, ?, ?, ?, ?);", dbInfo->prop.tablePrefix);
 463       stateOk = compilePreparedQuery(queueP, "put", &dbInfo->pVm_put, queryString, exception);
 464    }
 465 
 466    if (stateOk) { /* set prepared statement tokens */
 467           int index = 0;
 468           rc = SQLITE_OK;
 469           if(rc == SQLITE_OK) rc = sqlite3_bind_int64(dbInfo->pVm_put , ++index, queueEntry->uniqueId);
 470           if(rc == SQLITE_OK) rc = sqlite3_bind_text(dbInfo->pVm_put, ++index, dbInfo->prop.queueName, strlen(dbInfo->prop.queueName), SQLITE_STATIC);
 471           if(rc == SQLITE_OK) rc = sqlite3_bind_int64(dbInfo->pVm_put, ++index, queueEntry->priority);
 472           if(rc == SQLITE_OK) rc = sqlite3_bind_text(dbInfo->pVm_put, ++index, embeddedType, strlen(embeddedType), SQLITE_STATIC);
 473           if(rc == SQLITE_OK) rc = sqlite3_bind_text(dbInfo->pVm_put, ++index, queueEntry->isPersistent?"T":"F", 1, SQLITE_STATIC);
 474           if(rc == SQLITE_OK) rc = sqlite3_bind_int64(dbInfo->pVm_put, ++index, queueEntry->embeddedBlob.dataLen);
 475           if(rc == SQLITE_OK) rc = sqlite3_bind_blob(dbInfo->pVm_put, ++index, queueEntry->embeddedBlob.data, (int)queueEntry->embeddedBlob.dataLen, SQLITE_STATIC);
 476 
 477       if (rc != SQLITE_OK) {
 478              switch(rc) {
 479                  case SQLITE_RANGE:
 480                         strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 481                         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] put(%s) SQL error: %d index out of range", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc );
 482                         LOG __FILE__, "put(%s) SQL error: %d index out of range", int64ToStr(int64Str, queueEntry->uniqueId), rc); break;
 483                  case SQLITE_NOMEM:
 484                         LOG __FILE__, "put(%s) SQL error: %d out of memory", int64ToStr(int64Str, queueEntry->uniqueId), rc);
 485                         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] put(%s) SQL error: %d out of memory", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc );
 486                         strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); break;
 487                  case SQLITE_MISUSE:
 488                         LOG __FILE__, "put(%s) SQL error: %d misuse: virtual machine not valid", int64ToStr(int64Str, queueEntry->uniqueId), rc);
 489                         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] put(%s) SQL error: %d misuse: virtual machine not valid", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc );
 490                         strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); break;
 491                  default:
 492                         LOG __FILE__, "put(%s) SQL error: %d undefined error", int64ToStr(int64Str, queueEntry->uniqueId), rc);
 493                         SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] put(%s) SQL error: %d undefined error", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc );
 494                         strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN); break;
 495 
 496                  }
 497          stateOk = false;
 498       }
 499    }
 500 
 501    if (stateOk) { /* start the query, process results */
 502       int countRows = getResultRows(queueP, "put", dbInfo->pVm_put, 0, false, exception);
 503       stateOk = countRows >= 0;
 504    }
 505 
 506    if (stateOk) {
 507       dbInfo->numOfEntries += 1;
 508       dbInfo->numOfBytes += ((queueEntry->sizeInBytes > 0) ? queueEntry->sizeInBytes : queueEntry->embeddedBlob.dataLen);
 509    }
 510 
 511    LOG __FILE__, "put(%s) %s", int64ToStr(int64Str, queueEntry->uniqueId), stateOk ? "done" : "failed");
 512 }
 513 
 514 
 515 /**
 516  * Compile a prepared query.
 517  * No parameters are checked, they must be valid
 518  * @param queueP The queue instance to use
 519  * @param methodName A nice string for logging
 520  * @param ppVm The virtual machine will be initialized if still 0
 521  * @param queryString
 522  * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 523  * @return false on error and exception->errorCode is not null
 524  */
 525 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName,
 526                     sqlite3_stmt **ppVm, const char *queryString, ExceptionStruct *exception)
 527 {
 528    int iRetry, numRetry=100;
 529    int rc = 0;
 530    const char *pzTail = 0;   /* OUT: uncompiled tail of zSql */
 531    bool stateOk = true;
 532    DbInfo *dbInfo = getDbInfo(queueP);
 533 
 534    if (*ppVm == 0) {  /* Compile prepared  query */
 535       for (iRetry = 0; iRetry < numRetry; iRetry++) {
 536          rc = sqlite3_prepare_v2(dbInfo->db, queryString, strlen(queryString), ppVm, &pzTail);
 537          switch (rc) {
 538             case SQLITE_BUSY:
 539                if (iRetry == (numRetry-1)) {
 540                   strncpy0(exception->errorCode, "resource.db.block", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 541                   SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 542                            "[%.100s:%d] SQL error #%d resource busy in %s()", __FILE__, __LINE__, rc, methodName);
 543                }
 544                LOG __FILE__, "%s() Sleeping as other thread holds DB", methodName );
 545                sleepMillis(10);
 546                break;
 547             case SQLITE_OK:
 548                iRetry = numRetry; /* We're done */
 549                LOG __FILE__, "%s() Pre-compiled prepared query '%s'", methodName, queryString);
 550                break;
 551             default:
 552                LOG __FILE__, "SQL error #%d in %s(). See %s for details.", rc, methodName, errLink);
 553                strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 554                SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 555                         "[%.100s:%d] SQL error #%d in %s(). See %s for details.", __FILE__, __LINE__, rc, methodName, errLink);
 556                iRetry = numRetry; /* We're done */
 557                stateOk = false;
 558                break;
 559          }
 560       }
 561    }
 562    if (*ppVm == 0) stateOk = false;
 563    return stateOk;
 564 }
 565 
 566 /**
 567  * For each SQL result row parse it into a QueueEntry.
 568  * No parameters are checked, they must be valid
 569  * Implements a ParseDataFp (function pointer)
 570  * @param queueP The 'this' pointer
 571  * @param currIndex
 572  * @param TmpHelper
 573  * @param sqlite3 statement
 574  * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 575  * @return false on error and exception->errorCode is not null
 576  */
 577 static bool parseQueueEntryArr(I_Queue *queueP, size_t currIndex, TmpHelper *helper,
 578                                sqlite3_stmt *pVm, ExceptionStruct *exception)
 579 {
 580    bool doContinue = true;
 581    int numAssigned;
 582    bool stateOk = true;
 583    QueueEntry *queueEntry = 0;
 584    QueueEntryArr *queueEntryArr;
 585    QueueEntryArr **queueEntryArrPP = helper->queueEntryArrPP;
 586 
 587    if (currIndex == 0) {
 588       helper->currEntries = 0;
 589       helper->currBytes = 0;
 590    }
 591 
 592    if (*queueEntryArrPP == 0) {
 593       *queueEntryArrPP = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));
 594       if (helper->maxNumOfEntries == 0) {
 595          doContinue = false;
 596          return doContinue;
 597       }
 598    }
 599    queueEntryArr = *queueEntryArrPP;
 600 
 601    if (queueEntryArr->len == 0) {
 602       queueEntryArr->len = 10;
 603       queueEntryArr->queueEntryArr = (QueueEntry *)calloc(queueEntryArr->len, sizeof(QueueEntry));
 604    }
 605    else if (currIndex >= queueEntryArr->len) {
 606       queueEntryArr->len += 10;
 607       queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, queueEntryArr->len * sizeof(QueueEntry));
 608    }
 609    queueEntry = &queueEntryArr->queueEntryArr[currIndex];
 610    memset(queueEntry, 0, sizeof(QueueEntry));
 611 
 612    queueEntry->uniqueId = sqlite3_column_int64(pVm, XB_ENTRIES_DATA_ID);
 613    stateOk = queueEntry->uniqueId == 0 ? false : true;
 614    if (!stateOk) {
 615       LOG __FILE__, "peekWithSamePriority() ERROR: Can't parse sqlite3_column_int64(pVm, 0) '%.20s' to uniqueId, ignoring entry.", sqlite3_column_text(pVm, XB_ENTRIES_DATA_ID));
 616       strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 617       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 618                "[%.100s:%d] peekWithSamePriority() ERROR: Can't parse qlite3_column_int64(pVm, 0) '%.20s' col=%s to uniqueId, ignoring entry.", __FILE__, __LINE__, (char*)sqlite3_column_text(pVm, XB_ENTRIES_DATA_ID), sqlite3_column_name(pVm, XB_ENTRIES_DATA_ID));
 619       doContinue = false;
 620       return doContinue;
 621    }
 622 
 623    LOG __FILE__, "peekWithSamePriority(%s) currIndex=%d", int64ToStr(int64Str, queueEntry->uniqueId), currIndex);
 624    numAssigned = sscanf((const char*)sqlite3_column_text(pVm, XB_ENTRIES_PRIO), "%hd", &queueEntry->priority);
 625    if (numAssigned != 1) {
 626       LOG __FILE__, "peekWithSamePriority(%s) ERROR: Can't parse sqlite3_column_int64(pVm, XB_ENTRIES_PRIO) '%.20s' to priority, setting it to NORM", int64ToStr(int64Str, queueEntry->uniqueId), sqlite3_column_text(pVm, XB_ENTRIES_PRIO));
 627       queueEntry->priority = 4;
 628    }
 629    strncpy0(queueEntry->embeddedType, (const char*)sqlite3_column_text(pVm, XB_ENTRIES_TYPE_NAME), QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
 630 
 631    queueEntry->isPersistent = *sqlite3_column_text(pVm, XB_ENTRIES_PERSISTENT) == 'T' ? true : false;
 632 
 633    queueEntry->embeddedBlob.dataLen = (size_t)sqlite3_column_int64(pVm, XB_ENTRIES_SIZE_IN_BYTES);
 634 
 635         /* sqlite3_column_bytes() can be used to get the length */
 636     queueEntry->embeddedBlob.data = (char *)malloc(queueEntry->embeddedBlob.dataLen);
 637         memcpy(queueEntry->embeddedBlob.data, (char *)sqlite3_column_blob(pVm, XB_ENTRIES_BLOB), queueEntry->embeddedBlob.dataLen);
 638 
 639    helper->currEntries += 1;
 640    helper->currBytes += queueEntry->embeddedBlob.dataLen;
 641 
 642    /* Limit the number of entries */
 643    if ((helper->maxNumOfEntries != -1 && helper->currEntries >= helper->maxNumOfEntries) ||
 644        (helper->maxNumOfBytes != -1 && helper->currBytes >= helper->maxNumOfBytes)) {
 645       /* sqlite_interrupt(dbInfo->db); -> sets rc==SQLITE_ERROR on next sqlite-step() which i can't distinguish from a real error */
 646       doContinue = false;
 647    }
 648 
 649    return doContinue;
 650 }
 651 
 652 /**
 653  * Execute the query and get the query result.
 654  * No parameters are checked, they must be valid
 655  * @param queueP  The this pointer
 656  * @param methodName The method called
 657  * @param pVm sqlite virtual machine
 658  * @param helper for smaller arglist
 659  * @param finalize true to call sqlite_finalize which deletes the virtual machine,
 660  *                 false to call  sqlite_reset to reuse the prepared query
 661  * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 662  * @return < 0 on error and exception->errorCode is not null
 663  *         otherwise the number of successfully parsed rows is returned
 664  * @todo For INSERT and DELETE return the number of touched entries !!!
 665  */
 666 static int32_t getResultRows(I_Queue *queueP, const char *methodName, sqlite3_stmt *pVm, TmpHelper *helper, bool finalize, ExceptionStruct *exception)
 667 {
 668    int32_t currIndex = 0;
 669    bool done = false;
 670    bool stateOk = true;
 671    int rc;
 672 
 673    while (!done) {
 674 
 675       rc = sqlite3_step(pVm);
 676       switch(rc){
 677         case SQLITE_DONE:
 678                 done = true;
 679                 break;
 680                 case SQLITE_BUSY:
 681                         LOG __FILE__, "%s() Sleeping as other thread holds DB.", methodName);
 682                         sleepMillis(10);
 683                 break;
 684                 case SQLITE_ROW:
 685                 {
 686                         bool doContinue = true;
 687                         if(helper != 0) {
 688                                 doContinue = helper->parseDataFp(queueP, currIndex, helper, pVm, exception);
 689 
 690                                 stateOk = *exception->errorCode == 0;
 691                         }
 692                         currIndex++;
 693                         if(!stateOk || !doContinue) done = true;
 694                 }
 695                 break;
 696                 case SQLITE_ERROR:
 697                         LOG __FILE__, "%s() SQL execution problem [sqlCode=%d], entry already exists", methodName, rc);
 698                         done = true;
 699                         stateOk = false;
 700                 break;
 701                 case SQLITE_SCHEMA:
 702                         LOG __FILE__, "%s() Sql execution problem [sqlCode=%d], inconsistent schema", methodName, rc);
 703                         /* no break */
 704                 case SQLITE_MISUSE:
 705                 default:
 706             LOG __FILE__, "%s() SQL execution problem [sqlCode=%d]. See %s for details", methodName, rc, errLink);
 707             done = true;
 708             stateOk = false;
 709          break;
 710       }
 711 
 712    }
 713    LOG __FILE__, "%s() Processed %lu entries.", methodName, (unsigned long)currIndex);
 714 
 715    if (finalize) {
 716       sqlite3_finalize(pVm);
 717       if (rc != SQLITE_OK && rc != SQLITE_DONE) {
 718 /*        LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled.", rc, sqlite_errmsg( )); */
 719           LOG __FILE__, "WARN: getResultRows() sqlCode=%d is not handled. See %s for details", rc, errLink );
 720       }
 721    }
 722    else { /* Reset prepared statement */
 723       rc = sqlite3_reset(pVm);
 724       if (rc == SQLITE_SCHEMA) {
 725 /*         LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled", rc, sqlite_error_string(rc) ); */
 726          LOG __FILE__, "WARN: getResultRows() sqlCode=%d is not handled. See %s for details", rc, errLink );
 727       }
 728    }
 729 
 730    return stateOk ? currIndex : (-1)*rc;
 731 }
 732 
 733 /**
 734  * Access queue entries without removing them.
 735  * @param queueP the this pointer
 736  * @param maxNumOfEntries
 737  * @param maxNumOfBytes
 738  * @param Exception struct
 739  * @return queueEntryArr
 740  */
 741 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception)
 742 {
 743    int rc = 0;
 744    bool stateOk = true;
 745    DbInfo *dbInfo;
 746    QueueEntryArr *queueEntryArr = 0;
 747 
 748    if (checkArgs(queueP, "peekWithSamePriority", true, exception) == false ) return 0;
 749 
 750    LOG __FILE__, "peekWithSamePriority(maxNumOfEntries=%d, maxNumOfBytes=%s) ...", (int)maxNumOfEntries, int64ToStr(int64Str, maxNumOfBytes));
 751 
 752    dbInfo = getDbInfo(queueP);
 753 
 754    if (dbInfo->pVm_peekWithSamePriority == 0) {  /* Compile prepared  query */
 755       char queryString[LEN512];
 756       /*"SELECT * FROM XB_ENTRIES where queueName='connection_clientJoe' and prio=(select max(prio) from XB_ENTRIES where queueName='connection_clientJoe') ORDER BY dataId ASC";*/
 757       SNPRINTF(queryString, LEN512,
 758            "SELECT * FROM %.20sENTRIES where queueName=?"
 759            " and prio=(select max(prio) from %.20sENTRIES where queueName=?)"
 760            " ORDER BY dataId ASC",
 761            dbInfo->prop.tablePrefix, dbInfo->prop.tablePrefix);
 762       stateOk = compilePreparedQuery(queueP, "peekWithSamePriority",
 763                     &dbInfo->pVm_peekWithSamePriority , queryString, exception);
 764    }
 765 
 766    if (stateOk) { /* set prepared statement tokens */
 767       int index = 0;
 768 
 769       rc = SQLITE_OK;
 770 
 771       if (rc == SQLITE_OK) rc = sqlite3_bind_text(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, strlen(dbInfo->prop.queueName), SQLITE_STATIC);
 772       if (rc == SQLITE_OK) rc = sqlite3_bind_text(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, strlen(dbInfo->prop.queueName), SQLITE_STATIC);
 773 
 774       switch (rc) {
 775          case SQLITE_OK:
 776             LOG __FILE__, "peekWithSamePriority() Bound to prepared statement [sqlCode=%d]", rc);
 777             break;
 778 
 779                  case SQLITE_RANGE:
 780                     strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 781                     SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] peekWithSamePriority() SQL error: %d index out of range", __FILE__, __LINE__, rc );
 782                     LOG __FILE__, "peekWithSamePriority() SQL error: %d index out of range", rc);
 783                     stateOk = false;
 784                     break;
 785                  case SQLITE_NOMEM:
 786                     LOG __FILE__, "peekWithSamePriority() SQL error: %d out of memory", rc);
 787                     SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] peekWithSamePriority() SQL error: %d out of memory", __FILE__, __LINE__, rc );
 788                     strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 789                     stateOk = false;
 790                     break;
 791                  case SQLITE_MISUSE:
 792                     LOG __FILE__, "peekWithSamePriority() SQL error: %d misuse: virtual machine not valid", rc);
 793                     SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] peekWithSamePriority() SQL error: %d misuse: virtual machine not valid", __FILE__, __LINE__, rc );
 794                     strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 795                     stateOk = false;
 796                     break;
 797                  default:
 798                     LOG __FILE__, "peekWithSamePriority() SQL error: %d undefined error", rc);
 799                     SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN, "[%.100s:%d] peekWithSamePriority() SQL error: %d undefined error", __FILE__, __LINE__, rc );
 800                     strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 801                         stateOk = false;
 802                         break;
 803       }
 804    }
 805 
 806    if (stateOk) { /* start the query */
 807       TmpHelper helper;
 808       int32_t currIndex = 0;
 809       helper.queueEntryArrPP = &queueEntryArr;
 810       helper.maxNumOfEntries = maxNumOfEntries;
 811       helper.maxNumOfBytes = maxNumOfBytes;
 812       helper.parseDataFp = parseQueueEntryArr;
 813       currIndex = getResultRows(queueP, "peekWithSamePriority", dbInfo->pVm_peekWithSamePriority, &helper, false, exception);
 814       stateOk = currIndex >= 0;
 815       if (!stateOk) {
 816          if (queueEntryArr) {
 817             free(queueEntryArr->queueEntryArr);
 818             queueEntryArr->len = 0;
 819          }
 820       }
 821       else {
 822          if (!queueEntryArr)
 823             queueEntryArr = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));
 824          else if ((size_t)currIndex < queueEntryArr->len) {
 825             queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, currIndex * sizeof(QueueEntry));
 826             queueEntryArr->len = currIndex;
 827          }
 828       }
 829    }
 830 
 831    LOG __FILE__, "peekWithSamePriority() %s", stateOk ? "done" : "failed");
 832    return queueEntryArr;
 833 }
 834 
 835 /**
 836  * Removes the given entries from persistence.
 837  * @return The number of removed entries
 838  */
 839 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception)
 840 {
 841    bool stateOk = true;
 842    int64_t numOfBytes = 0;
 843    int32_t countDeleted = 0;
 844    sqlite3_stmt *pVm = 0;
 845    DbInfo *dbInfo;
 846    if (checkArgs(queueP, "randomRemove", true, exception) == false || queueEntryArr == 0 ||
 847                  queueEntryArr->len == 0 || queueEntryArr->queueEntryArr == 0)
 848       return 0;
 849 
 850    LOG __FILE__, "randomRemove(%d) ...", (int)queueEntryArr->len);
 851 
 852    dbInfo = getDbInfo(queueP);
 853 
 854    {
 855       size_t i;
 856       const size_t qLen = 128 + 2*ID_MAX + queueEntryArr->len*(INT64_STRLEN_MAX+6);
 857       char *queryString = (char *)calloc(qLen, sizeof(char));
 858       /*  DELETE FROM xb_entries WHERE queueName = 'connection_clientJoe' AND dataId in ( 1081492136876000000, 1081492136856000000 ); */
 859       SNPRINTF(queryString, qLen,
 860            "DELETE FROM %.20sENTRIES WHERE queueName='%s'"
 861            " AND dataId in ( ",
 862            dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
 863 
 864       for (i=0; i<queueEntryArr->len; i++) {
 865          strncat0(queryString, int64ToStr(int64Str, queueEntryArr->queueEntryArr[i].uniqueId), INT64_STRLEN_MAX+1);
 866          if (i<(queueEntryArr->len-1)) strncat0(queryString, ",", 2);
 867          numOfBytes += ((queueEntryArr->queueEntryArr[i].sizeInBytes > 0) ? queueEntryArr->queueEntryArr[i].sizeInBytes : queueEntryArr->queueEntryArr[i].embeddedBlob.dataLen);
 868       }
 869       strncat0(queryString, " )", 3);
 870       stateOk = compilePreparedQuery(queueP, "randomRemove", &pVm, queryString, exception);
 871       free(queryString);
 872    }
 873 
 874 
 875    if (stateOk) { /* start the query */
 876       int32_t currIndex = getResultRows(queueP, "randomRemove", pVm, 0, true, exception);
 877       stateOk = currIndex >= 0;
 878    }
 879 
 880    if (stateOk) {
 881       countDeleted = (int32_t)sqlite3_changes(dbInfo->db); /* This function returns the number of database rows that were changed (or inserted or deleted) by the most recently completed
 882                                                               INSERT, UPDATE, or DELETE statement.
 883                                                               Only changes that are directly specified by the INSERT, UPDATE, or DELETE statement are counted.
 884                                                               Auxiliary changes caused by triggers are not counted.
 885                                                               Use the sqlite3_total_changes() function to find the total number of changes including changes caused by triggers.*/
 886       if (countDeleted < 0 || (size_t)countDeleted != queueEntryArr->len) {
 887          fillCache(queueP, exception); /* calculate numOfBytes again */
 888       }
 889       else {
 890          dbInfo->numOfEntries -= queueEntryArr->len;
 891          dbInfo->numOfBytes -= numOfBytes;
 892       }
 893    }
 894 
 895    return countDeleted;
 896 }
 897 
 898 /**
 899  * Destroy all entries in queue and releases all resources in memory and on HD.
 900  */
 901 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception)
 902 {
 903    bool stateOk = true;
 904    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
 905    if (checkArgs(queueP, "destroy", false, exception) == false ) return false;
 906    shutdownInternal(queuePP, exception);
 907 
 908    {
 909       DbInfo *dbInfo = getDbInfo(queueP);
 910       const char *dbName = dbInfo->prop.dbName;
 911       stateOk = unlink(dbName) == 0; /* Delete old db file */
 912       if (!stateOk) {
 913          strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 914          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 915                   "[%.100s:%d] destroy() ERROR: Can't destroy database '%s', errno=%d.", __FILE__, __LINE__, dbName, errno);
 916       }
 917    }
 918 
 919    freeQueue(queuePP);
 920 
 921    return stateOk;
 922 }
 923 
 924 /**
 925  * Destroy all entries in queue.
 926  */
 927 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception)
 928 {
 929    int stateOk = true;
 930    char queryString[LEN256];
 931    sqlite3_stmt *pVm = 0;
 932    DbInfo *dbInfo;
 933    if (checkArgs(queueP, "clear", true, exception) == false) return false;
 934    dbInfo = getDbInfo(queueP);
 935 
 936    SNPRINTF(queryString, LEN256, "DELETE FROM %.20sENTRIES", dbInfo->prop.tablePrefix);
 937    stateOk = compilePreparedQuery(queueP, "clear", &pVm, queryString, exception);
 938 
 939    if (stateOk) {
 940       int32_t currIndex = getResultRows(queueP, "clear", pVm, 0, true, exception);
 941       stateOk = currIndex >= 0;
 942    }
 943 
 944    if (stateOk) {
 945       dbInfo->numOfEntries = 0;
 946       dbInfo->numOfBytes = 0;
 947    }
 948 
 949    LOG __FILE__, "clear() done");
 950    return stateOk;
 951 }
 952 
 953 /**
 954  * Parse response of "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
 955  */
 956 static bool parseCacheInfo(I_Queue *queueP, size_t currIndex, TmpHelper* helper, sqlite3_stmt *pVm, ExceptionStruct *exception)
 957 {
 958    int64_t ival = 0;
 959    DbInfo *dbInfo = getDbInfo(queueP);
 960    ival = sqlite3_column_int64(pVm, 0);
 961    dbInfo->numOfEntries = (int32_t)ival;
 962    dbInfo->numOfBytes = sqlite3_column_int64(pVm, 1);
 963    return true;
 964 }
 965 
 966 /**
 967  * Reload cached information from database.
 968  * @param queueP The this pointer
 969  * @param exception Returns error
 970  * @return false on error
 971  */
 972 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception)
 973 {
 974    bool stateOk = true;
 975    DbInfo *dbInfo = 0;
 976 
 977    char queryString[LEN512]; /* "SELECT count(dataId) FROM XB_ENTRIES where queueName='connection_clientJoe'" */
 978 
 979    if (checkArgs(queueP, "fillCache", true, exception) == false ) return true;
 980    dbInfo = getDbInfo(queueP);
 981 
 982    SNPRINTF(queryString, LEN512,
 983             "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
 984             dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
 985    stateOk = compilePreparedQuery(queueP, "fillCache",
 986                   &dbInfo->pVm_fillCache, queryString, exception);
 987 
 988    if (stateOk) { /* start the query, calls parseCacheInfo() */
 989       TmpHelper helper;
 990       int32_t currIndex;
 991       helper.parseDataFp = parseCacheInfo;
 992       currIndex = getResultRows (queueP, "fillCache", dbInfo->pVm_fillCache, &helper, false, exception);
 993       stateOk = currIndex > 0;
 994    }
 995 
 996    LOG __FILE__, "fillCache() numOfEntries=%d numOfBytes=%s", dbInfo->numOfEntries, int64ToStr(int64Str, dbInfo->numOfBytes));
 997    return stateOk;
 998 }
 999 
1000 static bool persistentQueueEmpty(I_Queue *queueP)
1001 {
1002    return getNumOfEntries(queueP) <= 0;
1003 }
1004 
1005 static int32_t getNumOfEntries(I_Queue *queueP)
1006 {
1007    DbInfo *dbInfo;
1008    bool stateOk = true;
1009    ExceptionStruct exception;
1010    if (checkArgs(queueP, "getNumOfEntries", false, &exception) == false ) return -1;
1011    dbInfo = getDbInfo(queueP);
1012    if (dbInfo->numOfEntries == -1) {
1013       stateOk = fillCache(queueP, &exception);
1014    }
1015    return (stateOk) ? (int32_t)dbInfo->numOfEntries : -1;
1016 }
1017 
1018 static int32_t getMaxNumOfEntries(I_Queue *queueP)
1019 {
1020    DbInfo *dbInfo;
1021    ExceptionStruct exception;
1022    if (checkArgs(queueP, "getMaxNumOfEntries", false, &exception) == false ) return -1;
1023    dbInfo = getDbInfo(queueP);
1024    return dbInfo->prop.maxNumOfEntries;
1025 }
1026 
1027 static int64_t getNumOfBytes(I_Queue *queueP)
1028 {
1029    DbInfo *dbInfo;
1030    ExceptionStruct exception;
1031    bool stateOk = true;
1032    if (checkArgs(queueP, "getNumOfBytes", false, &exception) == false ) return -1;
1033    dbInfo = getDbInfo(queueP);
1034    if (dbInfo->numOfBytes == -1) {
1035       stateOk = fillCache(queueP, &exception);
1036    }
1037    return (stateOk) ? dbInfo->numOfBytes : -1;
1038 }
1039 
1040 static int64_t getMaxNumOfBytes(I_Queue *queueP)
1041 {
1042    DbInfo *dbInfo;
1043    ExceptionStruct exception;
1044    if (checkArgs(queueP, "getMaxNumOfBytes", false, &exception) == false ) return -1;
1045    dbInfo = getDbInfo(queueP);
1046    return dbInfo->prop.maxNumOfBytes;
1047 }
1048 
1049 /**
1050  * Shutdown without destroying any entry.
1051  * Clears all open DB resources.
1052  */
1053 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception)
1054 {
1055    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
1056    if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
1057    shutdownInternal(queuePP, exception);
1058    freeQueue(queuePP);
1059 }
1060 
1061 /**
1062  * Shutdown used internally without calling freeQueue().
1063  */
1064 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception)
1065 {
1066    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
1067    if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
1068    {
1069       DbInfo *dbInfo = getDbInfo(queueP);
1070       queueP->isInitialized = false;
1071       if(dbInfo) {
1072          if (dbInfo->pVm_put) {
1073                       sqlite3_finalize(dbInfo->pVm_put);
1074             dbInfo->pVm_put = 0;
1075          }
1076          if (dbInfo->pVm_peekWithSamePriority) {
1077                       sqlite3_finalize(dbInfo->pVm_peekWithSamePriority);
1078             dbInfo->pVm_peekWithSamePriority = 0;
1079          }
1080          if (dbInfo->pVm_fillCache) {
1081                       sqlite3_finalize(dbInfo->pVm_fillCache);
1082            dbInfo->pVm_fillCache = 0;
1083          }
1084          if (dbInfo->db) {
1085             sqlite3_close(dbInfo->db);
1086             dbInfo->db = 0;
1087          }
1088          LOG __FILE__, "shutdown() done");
1089       }
1090    }
1091 }
1092 
1093 /**
1094  * Frees everything inside QueueEntryArr and the struct QueueEntryArr itself
1095  * @param queueEntryArr The struct to free, passing NULL is OK
1096  */
1097 Dll_Export void freeQueueEntryArr(QueueEntryArr *queueEntryArr)
1098 {
1099    if (queueEntryArr == (QueueEntryArr *)0) return;
1100    freeQueueEntryArrInternal(queueEntryArr);
1101    free(queueEntryArr);
1102 }
1103 
1104 /**
1105  * Frees everything inside QueueEntryArr but NOT the struct QueueEntryArr itself
1106  * @param queueEntryArr The struct internals to free, passing NULL is OK
1107  */
1108 Dll_Export void freeQueueEntryArrInternal(QueueEntryArr *queueEntryArr)
1109 {
1110    size_t i;
1111    if (queueEntryArr == (QueueEntryArr *)0) return;
1112    for (i=0; i<queueEntryArr->len; i++) {
1113       freeQueueEntryData(&queueEntryArr->queueEntryArr[i]);
1114    }
1115    free(queueEntryArr->queueEntryArr);
1116    queueEntryArr->len = 0;
1117 }
1118 
1119 /**
1120  * Does not free the queueEntry itself
1121  */
1122 static void freeQueueEntryData(QueueEntry *queueEntry)
1123 {
1124    if (queueEntry == (QueueEntry *)0) return;
1125    if (queueEntry->embeddedBlob.data != 0) {
1126       free((char *)queueEntry->embeddedBlob.data);
1127       queueEntry->embeddedBlob.data = 0;
1128    }
1129    queueEntry->embeddedBlob.dataLen = 0;
1130 }
1131 
1132 /**
1133  * Frees the internal blob and the queueEntry itself.
1134  * @param queueEntry Its memory is freed, it is not usable anymore after this call
1135  */
1136 Dll_Export void freeQueueEntry(QueueEntry *queueEntry)
1137 {
1138    if (queueEntry == (QueueEntry *)0) return;
1139    freeQueueEntryData(queueEntry);
1140    free(queueEntry);
1141 }
1142 
1143 /**
1144  * NOTE: You need to free the returned pointer with xmlBlasterFree() (which calls free())!
1145  *
1146  * @param queueEntry The data to put to the queue
1147  * @param maxContentDumpLen for -1 get the complete content, else limit the
1148  *        content to the given number of bytes
1149  * @return A ASCII XML formatted entry or NULL if out of memory
1150  */
1151 Dll_Export char *queueEntryToXml(QueueEntry *queueEntry, int maxContentDumpLen)
1152 {
1153    if (queueEntry == (QueueEntry *)0) return 0;
1154    {
1155    char *contentStr = strFromBlobAlloc(queueEntry->embeddedBlob.data, queueEntry->embeddedBlob.dataLen);
1156    const size_t blobLen = (maxContentDumpLen >= 0) ? maxContentDumpLen : queueEntry->embeddedBlob.dataLen;
1157    const size_t len = 200 + QUEUE_ENTRY_EMBEDDEDTYPE_LEN + blobLen;
1158    char *xml = (char *)calloc(len, sizeof(char));
1159    if (xml == 0) {
1160       free(contentStr);
1161       return 0;
1162    }
1163    if (maxContentDumpLen == 0)
1164       *contentStr = 0;
1165    else if (maxContentDumpLen > 0 && queueEntry->embeddedBlob.dataLen > 5 &&
1166             (size_t)maxContentDumpLen < (queueEntry->embeddedBlob.dataLen-5))
1167       strcpy(contentStr+maxContentDumpLen, " ...");
1168 
1169    SNPRINTF(xml, len, "\n <QueueEntry id='%s' priority='%hd' persistent='%s' type='%s'>"
1170                       "\n  <content size='%lu'><![CDATA[%s]]></content>"
1171                       "\n <QueueEntry>",
1172                         int64ToStr(int64Str, queueEntry->uniqueId), queueEntry->priority,
1173                         queueEntry->isPersistent?"true":"false",
1174                         queueEntry->embeddedType,
1175                         (unsigned long)queueEntry->embeddedBlob.dataLen, contentStr);
1176    free(contentStr);
1177    return xml;
1178    }
1179 }
1180 
1181 Dll_Export void freeEntryDump(char *entryDump)
1182 {
1183    if (entryDump) free(entryDump);
1184 }
1185 
1186 /**
1187  * Checks the given arguments to be valid.
1188  * @param queueP The queue instance
1189  * @param methodName For logging
1190  * @param checkIsConnected If true does check the connection state as well
1191  * @param exception Transporting errors
1192  * @return false if the parameters are not usable,
1193  *         in this case 'exception' is filled with detail informations
1194  */
1195 static bool checkArgs(I_Queue *queueP, const char *methodName,
1196                       bool checkIsConnected, ExceptionStruct *exception)
1197 {
1198    if (queueP == 0) {
1199       if (exception == 0) {
1200          printf("[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to %s()\n",
1201                   __FILE__, __LINE__, methodName);
1202       }
1203       else {
1204          strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
1205          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
1206                   "[%.100s:%d] Please provide a valid I_Queue pointer to %.16s()",
1207                    __FILE__, __LINE__, methodName);
1208          LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
1209       }
1210       return false;
1211    }
1212 
1213    if (exception == 0) {
1214       LOG __FILE__, "[%s:%d] Please provide valid exception pointer to %s()", __FILE__, __LINE__, methodName);
1215       return false;
1216    }
1217 
1218    if (checkIsConnected) {
1219       if (queueP->privateObject==0 ||
1220           ((DbInfo *)(queueP->privateObject))->db==0 ||
1221           !queueP->isInitialized) {
1222          strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
1223          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
1224                   "[%.100s:%d] Not connected to database, %s() failed",
1225                    __FILE__, __LINE__, methodName);
1226          LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
1227          return false;
1228       }
1229    }
1230 
1231    initializeExceptionStruct(exception);
1232 
1233    LOG __FILE__, "%s() entering ...", methodName);
1234 
1235    return true;
1236 }
1237 
1238 /*=================== TESTCODE =======================*/
1239 # ifdef QUEUE_MAIN
1240 #include <stdio.h>
1241 static void testRun(int argc, char **argv) {
1242    ExceptionStruct exception;
1243    QueueEntryArr *entries = 0;
1244    QueueProperties queueProperties;
1245    I_Queue *queueP = 0;
1246 
1247    memset(&queueProperties, 0, sizeof(QueueProperties));
1248    strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
1249    strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
1250    strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
1251    queueProperties.maxNumOfEntries = 10000000L;
1252    queueProperties.maxNumOfBytes = 1000000000LL;
1253    queueProperties.logFp = xmlBlasterDefaultLogging;
1254    queueProperties.logLevel = XMLBLASTER_LOG_TRACE;
1255    queueProperties.userObject = 0;
1256 
1257    queueP = createQueue(&queueProperties, &exception);
1258    /* DbInfo *dbInfo = (DbInfo *)queueP->privateObject; */
1259    if (argc || argv) {} /* to avoid compiler warning */
1260 
1261    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1262 
1263    {
1264       int64_t idArr[] =   { 1081492136826000000ll, 1081492136856000000ll, 1081492136876000000ll };
1265       int16_t prioArr[] = { 5                    , 1                    , 5 };
1266       char *data[] =      { "Hello"              , " World"             , "!!!" };
1267       size_t i;
1268       for (i=0; i<sizeof(idArr)/sizeof(int64_t); i++) {
1269          QueueEntry queueEntry;
1270          memset(&queueEntry, 0, sizeof(QueueEntry));
1271          queueEntry.priority = prioArr[i];
1272          queueEntry.isPersistent = true;
1273          queueEntry.uniqueId = idArr[i];
1274          strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
1275          queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
1276          queueEntry.embeddedBlob.data = data[i];
1277          queueEntry.embeddedBlob.dataLen = strlen(queueEntry.embeddedBlob.data);
1278 
1279          queueP->put(queueP, &queueEntry, &exception);
1280          if (*exception.errorCode != 0) {
1281             LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1282          }
1283       }
1284    }
1285 
1286    entries = queueP->peekWithSamePriority(queueP, -1, 6, &exception);
1287    if (*exception.errorCode != 0) {
1288       LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1289    }
1290    if (entries != 0) {
1291       size_t i;
1292       printf("testRun after peekWithSamePriority() dump %lu entries:\n", (unsigned long)entries->len);
1293       for (i=0; i<entries->len; i++) {
1294          QueueEntry *queueEntry = &entries->queueEntryArr[i];
1295          char *dump = queueEntryToXml(queueEntry, 200);
1296          printf("%s\n", dump);
1297          free(dump);
1298       }
1299    }
1300 
1301    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1302    queueP->randomRemove(queueP, entries, &exception);
1303    if (*exception.errorCode != 0) {
1304       LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1305    }
1306 
1307    freeQueueEntryArr(entries);
1308    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1309 
1310    queueP->clear(queueP, &exception);
1311    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1312 
1313    queueP->shutdown(&queueP, &exception);
1314 }
1315 
1316 int main(int argc, char **argv) {
1317    int i;
1318    for (i=0; i<1; i++) {
1319       testRun(argc, argv);
1320    }
1321    return 0;
1322 }
1323 #endif /*QUEUE_MAIN*/
1324 /*=================== TESTCODE =======================*/


syntax highlighted by Code2HTML, v. 0.9.1