1 /*----------------------------------------------------------------------------
   2 Name:      SQLiteQueue.c
   3 Project:   xmlBlaster.org
   4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
   5 Comment:   A persistent queue implementation based on the SQLite relational database
   6            Depends only on I_Queue.h and ../helper.c and ../helper.h (which includes basicDefs.h)
   7            and can easily be used outside of xmlBlaster.
   8            Further you need sqlite.h and the sqlite library (dll,so,sl)
   9 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
  10 Date:      04/2004
  11 Compile:   Compiles at least on Windows, Linux, Solaris. Further porting should be simple.
  12            Needs pthread.h but not the pthread library (for exact times)
  13 
  14             export LD_LIBRARY_PATH=/opt/sqlite-bin/lib
  15        gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLiteQueue SQLiteQueue.c ../helper.c ../Timeout.c ../Timestampc.c -I/opt/sqlite-bin/include -L/opt/sqlite-bin/lib -lsqlite -lpthread
  16             (use optionally  -ansi -pedantic -Wno-long-long
  17             (Intel C: icc -wd981 ...)
  18 
  19            Compile inside xmlBlaster:
  20             build -DXMLBLASTER_PERSISTENT_QUEUE=true c-delete c
  21            expects xmlBlaster/src/c/util/queue/sqlite.h and xmlBlaster/lib/libsqlite.so
  22 
  23 
  24            Compile Test main()
  25             Replace /I\c\sqlite for your needs ( says where sqlite.h resides ):
  26                                 and \pialibs\sqlite.lib as well ( sqlite.lib is created from sqlite.def via lib /DEF:sqlite.def)
  27            cl /MD /DQUEUE_MAIN /DDLL_IGNORE /DXB_NO_PTHREADS /D_WINDOWS /I\c\sqlite /I..\.. SqliteQueue.c ..\helper.c /link \pialibs\sqlite.lib
  28 
  29 
  30 Table layout XB_ENTRIES:
  31            dataId bigint
  32            queueName text
  33            prio integer
  34            flag text
  35            durable char(1)
  36            byteSize bigint
  37            blob bytea
  38            PRIMARY KEY (dataId, queueName)
  39 
  40 Todo:      Tuning:
  41             - Add prio to PRIMARY KEY
  42             - In persistentQueuePeekWithSamePriority() add queueName to statement as it never changes
  43 
  44 @see:      http://www.sqlite.org/
  45 @see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.c.queue.html
  46 @see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.html
  47 Testsuite: xmlBlaster/testsuite/src/c/TestQueue.c
  48 -----------------------------------------------------------------------------*/
  49 #if defined(__IPhoneOS__)
  50 #include <stdio.h> /* to avoid empty file compiler warning */
  51 #endif
  52 #if !defined(__IPhoneOS__)
  53 #include <stdio.h>
  54 #include <string.h>
  55 
  56 #include <malloc.h>
  57 #if !defined(_WINDOWS)
  58 # include <unistd.h>   /* unlink() */
  59 # include <errno.h>    /* unlink() */
  60 #endif
  61 #include "util/queue/QueueInterface.h"
  62 #include "sqlite.h"
  63 
  64 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception);
  65 static const QueueProperties *getProperties(I_Queue *queueP);
  66 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception);
  67 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception);
  68 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception);
  69 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception);
  70 static int32_t getNumOfEntries(I_Queue *queueP);
  71 static int32_t getMaxNumOfEntries(I_Queue *queueP);
  72 static int64_t getNumOfBytes(I_Queue *queueP);
  73 static int64_t getMaxNumOfBytes(I_Queue *queueP);
  74 static bool persistentQueueEmpty(I_Queue *queueP);
  75 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception);
  76 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception);
  77 static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception);
  78 static bool createTables(I_Queue *queueP, ExceptionStruct *exception);
  79 static bool execSilent(I_Queue *queueP, const char *sqlStatement, const char *comment, ExceptionStruct *exception);
  80 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception);
  81 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception);
  82 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception);
  83 static void freeQueueEntryData(QueueEntry *queueEntry);
  84 
  85 /**
  86  * Called for each SQL result row and does the specific result parsing depending on the query. 
  87  * @param userP Pointer on a data struct which contains the parsed data
  88  * @return true->to continue, false->to break execution or on error exception->errorCode is not null
  89  */
  90 typedef bool ( * ParseDataFp)(I_Queue *queueP, size_t currIndex, void *userP,
  91                                const char **pazValue, const char **pazColName, ExceptionStruct *exception);
  92 static int32_t getResultRows(I_Queue *queueP, const char *methodName,
  93                              sqlite_vm *pVm, 
  94                              ParseDataFp parseDataFp, void *userP, bool finalize,
  95                              ExceptionStruct *exception);
  96 
  97 /* Shortcut for:
  98     if (queueP->log) queueP->log(queueP, XMLBLASTER_LOG_TRACE, XMLBLASTER_LOG_TRACE, __FILE__, "Persistent queue is created");
  99    is
 100     LOG __FILE__, "Persistent queue is created");
 101 */
 102 #define LOG if (queueP && queueP->log) queueP->log(queueP, queueP->logLevel, XMLBLASTER_LOG_TRACE, 
 103 
 104 #define LEN512 512  /* ISO C90 forbids variable-size array: const int LEN512=512; */
 105 #define LEN256 256  /* ISO C90 forbids variable-size array: const int LEN256=256; */
 106 
 107 #define DBNAME_MAX 128
 108 #define ID_MAX 256
 109 /**
 110  * Holds Prepared statements for better performance. 
 111  * @see http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html
 112  */
 113 typedef struct DbInfoStruct {
 114    QueueProperties prop;         /** Meta information */
 115    size_t numOfEntries;          /** Cache for current number of entries */
 116    int64_t numOfBytes;           /** Cache for current number of bytes */
 117    sqlite *db;                   /** Database handle for SQLite */
 118    sqlite_vm *pVm_put;           /** SQLite virtual machine to hold a prepared query */
 119    sqlite_vm *pVm_peekWithSamePriority;
 120    sqlite_vm *pVm_fillCache;
 121 } DbInfo;
 122 
 123 /**
 124  * Used temporary for peekWithSamePriority(). 
 125  */
 126 typedef struct {
 127    QueueEntryArr **queueEntryArrPP;
 128    int32_t currEntries;
 129    int64_t currBytes;
 130    int32_t maxNumOfEntries; /** The max wanted number of entries for this peek() */
 131    int64_t maxNumOfBytes;   /** The max wanted bytes during peek() */
 132 } TmpHelper;
 133 
 134 static char int64Str_[INT64_STRLEN_MAX];
 135 static char * const int64Str = int64Str_;   /* to make the pointer address const */
 136 
 137 /** Column index into XB_ENTRIES table */
 138 enum {
 139    XB_ENTRIES_DATA_ID = 0,
 140    XB_ENTRIES_QUEUE_NAME,
 141    XB_ENTRIES_PRIO,
 142    XB_ENTRIES_TYPE_NAME,
 143    XB_ENTRIES_PERSISTENT,
 144    XB_ENTRIES_SIZE_IN_BYTES,
 145    XB_ENTRIES_BLOB
 146 };
 147 
 148 
 149 /**
 150  * Create a new persistent queue instance. 
 151  * <br />
 152  * @return NULL if bootstrapping failed. If not NULL you need to free() it when you are done
 153  *         usually by calling shutdown().
 154  * @throws exception
 155  */
 156 Dll_Export I_Queue *createQueue(const QueueProperties* queueProperties, ExceptionStruct *exception)
 157 {
 158    bool stateOk = true;
 159    I_Queue *queueP = (I_Queue *)calloc(1, sizeof(I_Queue));
 160    if (queueP == 0) return queueP;
 161    queueP->isInitialized = false;
 162    queueP->initialize = persistentQueueInitialize;
 163    queueP->getProperties = getProperties;
 164    queueP->put = persistentQueuePut;
 165    queueP->peekWithSamePriority = persistentQueuePeekWithSamePriority;
 166    queueP->randomRemove = persistentQueueRandomRemove;
 167    queueP->clear = persistentQueueClear;
 168    queueP->getNumOfEntries = getNumOfEntries;
 169    queueP->getMaxNumOfEntries = getMaxNumOfEntries;
 170    queueP->getNumOfBytes = getNumOfBytes;
 171    queueP->getMaxNumOfBytes = getMaxNumOfBytes;
 172    queueP->empty = persistentQueueEmpty;
 173    queueP->shutdown = persistentQueueShutdown;
 174    queueP->destroy = persistentQueueDestroy;
 175    queueP->privateObject = calloc(1, sizeof(DbInfo));
 176    {
 177       DbInfo *dbInfo = (DbInfo *)queueP->privateObject;
 178       dbInfo->numOfEntries = -1;
 179       dbInfo->numOfBytes = -1;
 180    }
 181    stateOk = queueP->initialize(queueP, queueProperties, exception);
 182    if (stateOk) {
 183       LOG __FILE__, "Persistent queue SQLite version " SQLITE_VERSION " is created");
 184    }
 185    else {
 186       ExceptionStruct ex;
 187       queueP->shutdown(&queueP, &ex);
 188       if (*ex.errorCode != 0) {
 189          embedException(exception, ex.errorCode, ex.message, exception);
 190       }
 191       queueP = 0;
 192    }
 193    return queueP;
 194 }
 195 
 196 /** Access the DB handle, queueP pointer is not checked */
 197 static _INLINE_FUNC DbInfo *getDbInfo(I_Queue *queueP) {
 198    return (queueP==0) ? 0 : (DbInfo *)(queueP->privateObject);
 199 }
 200 
 201 /**
 202  * Access the queue configuration. 
 203  * @param queueP The this pointer
 204  * @return Read only access, 0 on error
 205  */
 206 static const QueueProperties *getProperties(I_Queue *queueP)
 207 {
 208    ExceptionStruct exception;
 209    if (checkArgs(queueP, "getProperties", false, &exception) == false ) return 0;
 210    return &getDbInfo(queueP)->prop;
 211 }
 212 
 213 /**
 214  */
 215 static void freeQueue(I_Queue **queuePP)
 216 {
 217    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
 218    if (queueP == 0) {
 219       fprintf(stderr, "[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to freeQueue()\n", __FILE__, __LINE__);
 220       return;
 221    }
 222 
 223    LOG __FILE__, "freeQueue() called");
 224 
 225    if (queueP->privateObject) {
 226       free(queueP->privateObject);
 227       queueP->privateObject = 0;
 228    }
 229 
 230    free(queueP);
 231    *queuePP = 0;
 232 }
 233 
 234 /**
 235  * Called internally by createQueue(). 
 236  * @param queueP The this pointer
 237  * @param queueProperties The configuration
 238  * @param exception Can contain error information (out parameter)
 239  * @return true on success
 240  */
 241 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception)
 242 {
 243    char *errMsg = 0;
 244    bool retOk;
 245    const int OPEN_RW = 0;
 246    sqlite *db;
 247    DbInfo *dbInfo;
 248 
 249    if (checkArgs(queueP, "initialize", false, exception) == false ) return false;
 250    if (queueProperties == 0) {
 251       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 252       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 253                "[%.100s:%d] Please provide a valid QueueProperties pointer to initialize()", __FILE__, __LINE__);
 254       /* LOG __FILE__, "%s: %s", exception->errorCode, exception->message); */
 255       fprintf(stderr, "[%s:%d] %s: %s", __FILE__, __LINE__, exception->errorCode, exception->message);
 256       return false;
 257    }
 258 
 259    queueP->log = queueProperties->logFp;
 260    queueP->logLevel = queueProperties->logLevel;
 261    queueP->userObject = queueProperties->userObject;
 262 
 263    if (*queueProperties->dbName == 0 || *queueProperties->queueName == 0 ||
 264        queueProperties->maxNumOfEntries == 0 || queueProperties->maxNumOfBytes == 0) {
 265       char dbName[QUEUE_DBNAME_MAX];
 266       char queueName[QUEUE_ID_MAX];
 267       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 268       if (queueProperties->dbName == 0)
 269          strncpy0(dbName, "NULL", QUEUE_DBNAME_MAX);
 270       else
 271          strncpy0(dbName, queueProperties->dbName, QUEUE_DBNAME_MAX);
 272       if (queueProperties->queueName == 0)
 273          strncpy0(queueName, "NULL", QUEUE_ID_MAX);
 274       else
 275          strncpy0(queueName, queueProperties->queueName, QUEUE_ID_MAX);
 276       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 277                "[%.100s:%d] Please provide a proper initialized QueueProperties pointer to initialize(): dbName='%s', queueName='%s',"
 278                " maxNumOfEntries=%ld, maxNumOfBytes=%ld", __FILE__, __LINE__,
 279                dbName, queueName, (long)queueProperties->maxNumOfEntries, (long)queueProperties->maxNumOfBytes);
 280       LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
 281       return false;
 282    }
 283 
 284    dbInfo = getDbInfo(queueP);
 285    memcpy(&dbInfo->prop, queueProperties, sizeof(QueueProperties));
 286 
 287    /* Never trust a queue property you haven't overflowed yourself :-) */
 288    dbInfo->prop.dbName[QUEUE_DBNAME_MAX-1] = 0;
 289    dbInfo->prop.queueName[QUEUE_ID_MAX-1] = 0;
 290    dbInfo->prop.tablePrefix[QUEUE_PREFIX_MAX-1] = 0;
 291 
 292    LOG __FILE__, "dbName          = %s", dbInfo->prop.dbName);
 293    LOG __FILE__, "queueName       = %s", dbInfo->prop.queueName);
 294    LOG __FILE__, "tablePrefix     = %s", dbInfo->prop.tablePrefix);
 295    LOG __FILE__, "maxNumOfEntries = %ld",dbInfo->prop.maxNumOfEntries);
 296    LOG __FILE__, "maxNumOfBytes   = %ld",(long)dbInfo->prop.maxNumOfBytes);
 297    /*LOG __FILE__, "logFp           = %d", (int)dbInfo->prop.logFp);*/
 298    LOG __FILE__, "logLevel        = %d", (int)dbInfo->prop.logLevel);
 299    /*LOG __FILE__, "userObject      = %d", (void*)dbInfo->prop.userObject);*/
 300 
 301    db = sqlite_open(dbInfo->prop.dbName, OPEN_RW, &errMsg);
 302    dbInfo->db = db;
 303 
 304    if (db==0) {
 305       queueP->isInitialized = false;
 306       if(queueP->log) {
 307          if (errMsg) {
 308             LOG __FILE__, "%s", errMsg);
 309          }
 310          else {
 311             LOG __FILE__, "Unable to open database '%s'", dbInfo->prop.dbName);
 312          }
 313       }
 314       else {
 315         if (errMsg)
 316            fprintf(stderr,"[%s] %s\n", __FILE__, errMsg);
 317         else
 318            fprintf(stderr,"[%s] Unable to open database %s\n", __FILE__, dbInfo->prop.dbName);
 319       }
 320       strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 321       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 322                "[%.100s:%d] Creating SQLiteQueue '%s' failed: %s", __FILE__, __LINE__, dbInfo->prop.dbName, (errMsg==0)?"":errMsg);
 323       if (errMsg != 0) sqlite_freemem(errMsg);
 324       return false;
 325    }
 326 
 327    queueP->isInitialized = true;
 328 
 329    retOk = createTables(queueP, exception);
 330 
 331    fillCache(queueP, exception);
 332 
 333    LOG __FILE__, "initialize(%s) %s", dbInfo->prop.dbName, retOk?"successful":"failed");
 334    return true;
 335 }
 336 
 337 /**
 338  * Create the necessary DB table if not already existing. 
 339  * @param queueP 
 340  * @param exception Can contain error information (out parameter)
 341  * @return true on success
 342  */
 343 static bool createTables(I_Queue *queueP, ExceptionStruct *exception)
 344 {
 345    char queryString[LEN512];
 346    bool retOk;
 347    const char *tablePrefix = ((DbInfo *)(queueP->privateObject))->prop.tablePrefix;
 348 
 349    SNPRINTF(queryString, LEN512, "CREATE TABLE %.20sENTRIES (dataId bigint , queueName text , prio integer, flag text, durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));",
 350            tablePrefix);
 351    retOk = execSilent(queueP, queryString, "Creating ENTRIES table", exception);
 352 
 353    SNPRINTF(queryString, LEN512, "CREATE INDEX %.20sENTRIES_IDX ON %.20sENTRIES (prio);",
 354            tablePrefix, tablePrefix);
 355    retOk = execSilent(queueP, queryString, "Creating PRIO index", exception);
 356    return retOk;
 357 }
 358 
 359 /**
 360  * Invoke SQL query. 
 361  * @param queueP Is not checked, must not be 0
 362  * @param queryString The SQL to execute
 363  * @param comment For logging or exception text
 364  * @param exception Can contain error information (out parameter)
 365  * @return true on success
 366  */
 367 static bool execSilent(I_Queue *queueP, const char *queryString, const char *comment, ExceptionStruct *exception)
 368 {
 369    int rc = 0;
 370    char *errMsg = 0;
 371    bool retOk;
 372    DbInfo *dbInfo = getDbInfo(queueP);
 373 
 374    rc = sqlite_exec(dbInfo->db, queryString, NULL, NULL, &errMsg);
 375    switch (rc) {
 376       case SQLITE_OK:
 377          LOG __FILE__, "SQL '%s' success", comment);
 378          retOk = true;
 379          break;
 380       default:
 381          if (errMsg && strstr(errMsg, "already exists")) {
 382             LOG __FILE__, "OK, '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
 383             retOk = true;
 384          }
 385          else if (rc == SQLITE_CONSTRAINT && errMsg && strstr(errMsg, " not unique")) {
 386             LOG __FILE__, "OK, '%s' entry existed already [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
 387             retOk = true;
 388          }
 389          else {
 390             LOG __FILE__, "SQL error '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
 391             strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 392             SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 393                      "[%.100s:%d] SQL error '%s' [%d]: %s %s", __FILE__, __LINE__, comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
 394             retOk = false;
 395          }
 396          break;
 397    }
 398    if (errMsg != 0) sqlite_freemem(errMsg);
 399    return retOk;
 400 }
 401 
 402 /*
 403  * This is the callback routine that the SQLite library
 404  * invokes for each row of a query result.
 405 static int callback(void *pArg, int nArg, char **azArg, char **azCol){
 406    int i;
 407    struct callback_data *p = (struct callback_data*)pArg;
 408    int w = 5;
 409    if (p==0) {} // Suppress compiler warning
 410    if( azArg==0 ) return 0;
 411    for(i=0; i<nArg; i++){
 412       int len = strlen(azCol[i]);
 413       if( len>w ) w = len;
 414    }
 415    printf("\n");
 416    for(i=0; i<nArg; i++){
 417       printf("%*s = %s\n", w, azCol[i], azArg[i] ? azArg[i] : "NULL");
 418    }
 419   return 0;
 420 }
 421 */
 422 
 423 /**
 424  * @param queueP The queue instance
 425  * @param queueEntry The entry
 426  * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 427  */
 428 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception)
 429 {
 430    int rc = 0;
 431    bool stateOk = true;
 432    DbInfo *dbInfo;
 433    char embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN]; /* To protect against buffer overflow */
 434 
 435    if (checkArgs(queueP, "put", true, exception) == false ) return;
 436    if (queueEntry == 0) {
 437       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 438       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 439                "[%.100s:%d] Please provide a valid queueEntry pointer to function put()", __FILE__, __LINE__);
 440       return;
 441    }
 442    if (queueEntry->uniqueId == 0) {
 443       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 444       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 445                "[%.100s:%d] Please provide a valid queueEntry->uniqueId to function put()", __FILE__, __LINE__);
 446       return;
 447    }
 448    if (*queueEntry->embeddedType == 0) {
 449       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 450       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 451                "[%.100s:%d] Please provide a valid queueEntry->embeddedType to function put()", __FILE__, __LINE__);
 452       return;
 453    }
 454    strncpy0(embeddedType, queueEntry->embeddedType, QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
 455 
 456    if (queueEntry->embeddedBlob.dataLen > 0 && queueEntry->embeddedBlob.data == 0) {
 457       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 458       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 459                "[%.100s:%d] Please provide a valid queueEntry->embeddedBlob to function put()", __FILE__, __LINE__);
 460       return;
 461    }
 462 
 463    dbInfo = getDbInfo(queueP);
 464 
 465    if ((int64_t)dbInfo->numOfEntries >= dbInfo->prop.maxNumOfEntries) {
 466       strncpy0(exception->errorCode, "resource.overflow.queue.entries", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 467       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 468                "[%.100s:%d] The maximum number of queue entries = %d is exhausted", __FILE__, __LINE__, dbInfo->prop.maxNumOfEntries);
 469       return;
 470    }
 471    if (dbInfo->numOfBytes >= dbInfo->prop.maxNumOfBytes) {
 472       strncpy0(exception->errorCode, "resource.overflow.queue.bytes", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 473       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 474                "[%.100s:%d] The maximum queue size of %s bytes is exhausted", __FILE__, __LINE__, int64ToStr(int64Str, dbInfo->prop.maxNumOfBytes));
 475       return;
 476    }
 477 
 478 
 479    if (dbInfo->pVm_put == 0) {  /* Compile prepared query only once */
 480       char queryString[LEN256];    /*INSERT INTO XB_ENTRIES VALUES ( 1081317015888000000, 'xmlBlaster_192_168_1_4_3412', 'topicStore_xmlBlaster_192_168_1_4_3412', 5, 'TOPIC_XML', 'T', 670, '\\254...')*/
 481       SNPRINTF(queryString, LEN256, "INSERT INTO %.20sENTRIES VALUES ( ?, ?, ?, ?, ?, ?, ?)", dbInfo->prop.tablePrefix);
 482       stateOk = compilePreparedQuery(queueP, "put", &dbInfo->pVm_put, queryString, exception);
 483    }
 484 
 485    if (stateOk) { /* set prepared statement tokens */
 486       char intStr[INT64_STRLEN_MAX];
 487       int index = 0;
 488       const int len = -1; /* Calculated by sqlite_bind */
 489       rc = SQLITE_OK;
 490 
 491       int64ToStr(intStr, queueEntry->uniqueId);
 492       /*LOG __FILE__, "put uniqueId as string '%s'", intStr);*/
 493       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
 494       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, dbInfo->prop.queueName, len, false);
 495       SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", queueEntry->priority);
 496       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
 497       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, embeddedType, len, false);
 498       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, queueEntry->isPersistent?"T":"F", len, false);
 499       SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", (int32_t)queueEntry->embeddedBlob.dataLen);
 500       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
 501       if (rc == SQLITE_OK) {
 502          /* As SQLite does only store strings we encode our blob to a string */
 503          size_t estimatedSize = 2 +(257 * queueEntry->embeddedBlob.dataLen )/254;
 504          unsigned char *out = (unsigned char *)malloc(estimatedSize*sizeof(char));
 505          int encodedSize = sqlite_encode_binary((const unsigned char *)queueEntry->embeddedBlob.data,
 506                               (int)queueEntry->embeddedBlob.dataLen, out);
 507          rc = sqlite_bind(dbInfo->pVm_put, ++index, (const char *)out, encodedSize+1, true);
 508          free(out);
 509       }
 510 
 511       if (rc != SQLITE_OK) {
 512          LOG __FILE__, "put(%s) SQL error: %d %s", int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc));
 513          strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 514          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 515                   "[%.100s:%d] put(%s) SQL error: %d %s", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc));
 516          stateOk = false;
 517       }
 518    }
 519 
 520    if (stateOk) { /* start the query, process results */
 521       int countRows = getResultRows(queueP, "put", dbInfo->pVm_put, 0, 0, false, exception);
 522       stateOk = countRows >= 0;
 523    }
 524 
 525    if (stateOk) {
 526       dbInfo->numOfEntries += 1;
 527       dbInfo->numOfBytes += ((queueEntry->sizeInBytes > 0) ? queueEntry->sizeInBytes : queueEntry->embeddedBlob.dataLen);
 528    }
 529 
 530    LOG __FILE__, "put(%s) %s", int64ToStr(int64Str, queueEntry->uniqueId), stateOk ? "done" : "failed");
 531 }
 532 
 533 
 534 /**
 535  * Compile a prepared query. 
 536  * No parameters are checked, they must be valid
 537  * @param queueP The queue instance to use
 538  * @param methodName A nice string for logging
 539  * @param ppVm The virtual machine will be initialized if still 0
 540  * @param queryString
 541  * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 542  * @return false on error and exception->errorCode is not null
 543  */
 544 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName,
 545                     sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception)
 546 {
 547    int iRetry, numRetry=100;
 548    char *errMsg = 0;
 549    int rc = 0;
 550    const char *pzTail = 0;   /* OUT: uncompiled tail of zSql */
 551    bool stateOk = true;
 552    DbInfo *dbInfo = getDbInfo(queueP);
 553 
 554    if (*ppVm == 0) {  /* Compile prepared  query */
 555       for (iRetry = 0; iRetry < numRetry; iRetry++) {
 556          rc = sqlite_compile(dbInfo->db, queryString, &pzTail, ppVm, &errMsg);
 557          switch (rc) {
 558             case SQLITE_BUSY:
 559                if (iRetry == (numRetry-1)) {
 560                   strncpy0(exception->errorCode, "resource.db.block", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 561                   SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 562                            "[%.100s:%d] SQL error #%d in %s(): %s %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
 563                }
 564                LOG __FILE__, "%s() Sleeping as other thread holds DB %s", methodName, (errMsg==0)?"":errMsg);
 565                if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
 566                sleepMillis(10);
 567                break;
 568             case SQLITE_OK:
 569                iRetry = numRetry; /* We're done */
 570                LOG __FILE__, "%s() Pre-compiled prepared query '%s'", methodName, queryString);
 571                if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
 572                break;
 573             default:
 574                LOG __FILE__, "SQL error #%d %s in %s(): %s: %s", rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
 575                strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 576                SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 577                         "[%.100s:%d] SQL error #%d %s in %s(): %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
 578                iRetry = numRetry; /* We're done */
 579                if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
 580                stateOk = false;
 581                break;
 582          }
 583       }
 584    }
 585    if (*ppVm == 0) stateOk = false;
 586    return stateOk;
 587 }
 588 
 589 /**
 590  * For each SQL result row parse it into a QueueEntry. 
 591  * No parameters are checked, they must be valid
 592  * Implements a ParseDataFp (function pointer)
 593  * @param queueP The 'this' pointer
 594  * @param currIndex
 595  * @param userP
 596  * @param pazValue
 597  * @param pazColName
 598  * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 599  * @return false on error and exception->errorCode is not null
 600  */
 601 static bool parseQueueEntryArr(I_Queue *queueP, size_t currIndex, void *userP,
 602                                const char **pazValue, const char **pazColName, ExceptionStruct *exception)
 603 {
 604    bool doContinue = true;
 605    int numAssigned;
 606    bool stateOk = true;
 607    int decodeSize = 0;
 608    QueueEntry *queueEntry = 0;
 609    QueueEntryArr *queueEntryArr;
 610    TmpHelper *helper = (TmpHelper*)userP;
 611    QueueEntryArr **queueEntryArrPP = helper->queueEntryArrPP;
 612 
 613    if (currIndex == 0) {
 614       helper->currEntries = 0;
 615       helper->currBytes = 0;
 616    }
 617 
 618    if (*queueEntryArrPP == 0) {
 619       *queueEntryArrPP = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));;
 620       if (helper->maxNumOfEntries == 0) {
 621          doContinue = false;
 622          return doContinue;
 623       }
 624    }
 625    queueEntryArr = *queueEntryArrPP;
 626 
 627    if (queueEntryArr->len == 0) {
 628       queueEntryArr->len = 10;
 629       queueEntryArr->queueEntryArr = (QueueEntry *)calloc(queueEntryArr->len, sizeof(QueueEntry));
 630    }
 631    else if (currIndex >= queueEntryArr->len) {
 632       queueEntryArr->len += 10;
 633       queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, queueEntryArr->len * sizeof(QueueEntry));
 634    }
 635    queueEntry = &queueEntryArr->queueEntryArr[currIndex];
 636    memset(queueEntry, 0, sizeof(QueueEntry));
 637 
 638    stateOk = strToInt64(&queueEntry->uniqueId, pazValue[XB_ENTRIES_DATA_ID]);
 639    if (!stateOk) {
 640       LOG __FILE__, "peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' to uniqueId, ignoring entry.", pazValue[XB_ENTRIES_DATA_ID]);
 641       strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 642       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 643                "[%.100s:%d] peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' col=%s to uniqueId, ignoring entry.", __FILE__, __LINE__, pazValue[XB_ENTRIES_DATA_ID], pazColName[XB_ENTRIES_DATA_ID]);
 644       doContinue = false;
 645       return doContinue;
 646    }
 647 
 648    LOG __FILE__, "peekWithSamePriority(%s) currIndex=%d", int64ToStr(int64Str, queueEntry->uniqueId), currIndex);
 649    /* strncpy0(dbInfo->prop.queueName, pazValue[2], ID_MAX); */
 650    numAssigned = sscanf(pazValue[XB_ENTRIES_PRIO], "%hd", &queueEntry->priority);
 651    if (numAssigned != 1) {
 652       LOG __FILE__, "peekWithSamePriority(%s) ERROR: Can't parse pazValue[XB_ENTRIES_PRIO] '%.20s' to priority, setting it to NORM", int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_PRIO]);
 653       queueEntry->priority = 4;
 654    }
 655    strncpy0(queueEntry->embeddedType, pazValue[XB_ENTRIES_TYPE_NAME], QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
 656    queueEntry->isPersistent = *pazValue[XB_ENTRIES_PERSISTENT] == 'T' ? true : false;
 657    {
 658       int64_t ival = 0;
 659       stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_SIZE_IN_BYTES]);
 660       queueEntry->embeddedBlob.dataLen = (size_t)ival;
 661    }
 662 
 663    /* TODO!!! in Java the length is the size in RAM and not strlen(data) */
 664    /* queueEntry->embeddedBlob.data = (char *)malloc(queueEntry->embeddedBlob.dataLen*sizeof(char)); */
 665    queueEntry->embeddedBlob.data = (char *)malloc(strlen(pazValue[XB_ENTRIES_BLOB])*sizeof(char)); /* we spoil some 2 % */
 666    decodeSize = sqlite_decode_binary((const unsigned char *)pazValue[XB_ENTRIES_BLOB], (unsigned char *)queueEntry->embeddedBlob.data);
 667    if (decodeSize == -1 || (size_t)decodeSize != queueEntry->embeddedBlob.dataLen) {
 668       *(queueEntry->embeddedBlob.data + strlen(pazValue[XB_ENTRIES_BLOB]) - 1) = 0; 
 669       LOG __FILE__, "peekWithSamePriority(%s) ERROR: Returned blob encoded='%s', decodeSize=%d"
 670                         " but expected decoded len=%d: '%s'",
 671                     int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_BLOB], decodeSize,
 672                     queueEntry->embeddedBlob.dataLen, queueEntry->embeddedBlob.data);
 673    }
 674 
 675    helper->currEntries += 1;
 676    helper->currBytes += queueEntry->embeddedBlob.dataLen;
 677 
 678    /* Limit the number of entries */
 679    if ((helper->maxNumOfEntries != -1 && helper->currEntries >= helper->maxNumOfEntries) ||
 680        (helper->maxNumOfBytes != -1 && helper->currBytes >= helper->maxNumOfBytes)) {
 681       /* sqlite_interrupt(dbInfo->db); -> sets rc==SQLITE_ERROR on next sqlite-step() which i can't distinguish from a real error */
 682       doContinue = false;
 683    }
 684 
 685    return doContinue;
 686 }
 687 
 688 /**
 689  * Execute the query and get the query result. 
 690  * No parameters are checked, they must be valid
 691  * @param queueP  The this pointer
 692  * @param methodName The method called
 693  * @param pVm sqlite virtual machine
 694  * @param parseDataFp The function which is called for each SQL result row
 695  *                    or 0 if no function shall be called
 696  * @param userP The pointer which is passed to parseDataFp
 697  * @param finalize true to call sqlite_finalize which deletes the virtual machine,
 698  *                 false to call  sqlite_reset to reuse the prepared query
 699  * @param exception The exception is set to *exception->errorCode==0 on success, else to != 0
 700  * @return < 0 on error and exception->errorCode is not null
 701  *         otherwise the number of successfully parsed rows is returned
 702  * @todo For INSERT and DELETE return the number of touched entries !!!
 703  */
 704 static int32_t getResultRows(I_Queue *queueP, const char *methodName,
 705                              sqlite_vm *pVm, 
 706                              ParseDataFp parseDataFp, void *userP,
 707                              bool finalize,
 708                              ExceptionStruct *exception)
 709 {
 710    char *errMsg = 0;
 711    int32_t currIndex = 0;
 712    int numCol = 0;
 713    const char **pazValue = 0;
 714    const char **pazColName = 0;
 715    bool done = false;
 716    bool stateOk = true;
 717    int rc;
 718    while (!done) {
 719       rc = sqlite_step(pVm, &numCol, &pazValue, &pazColName);
 720       switch( rc ){
 721          case SQLITE_DONE:
 722             done = true;
 723          break;
 724          case SQLITE_BUSY:
 725             LOG __FILE__, "%s() Sleeping as other thread holds DB.", methodName);
 726             sleepMillis(10);
 727          break;
 728          case SQLITE_ROW:
 729          {
 730             bool doContinue = true;
 731             if (parseDataFp) {
 732                /* @return true->to continue, false->to break execution or on error exception->errorCode is not null */
 733                doContinue = parseDataFp(queueP, currIndex, userP, pazValue, pazColName, exception);
 734                stateOk = *exception->errorCode == 0;
 735             }
 736             else {
 737                /*
 738                printf("RESULT[%d]\n", iRow);
 739                for (iCol = 0; iCol < numCol; iCol++) {
 740                   printf("%10.10s = %s\n", pazColName[iCol], pazValue[iCol]);
 741                }
 742                */
 743             }
 744             currIndex++;
 745             if (!stateOk || !doContinue) done = true;
 746          }
 747          break;
 748          case SQLITE_ERROR:   /* If exists already */
 749             LOG __FILE__, "%s() SQL execution problem [sqlCode=%d], entry already exists", methodName, rc);
 750             done = true;
 751             stateOk = false;
 752          break;
 753          case SQLITE_MISUSE:
 754          default:
 755             LOG __FILE__, "%s() SQL execution problem [sqlCode=%d %s]", methodName, rc, sqlite_error_string(rc));
 756             done = true;
 757             stateOk = false;
 758          break;
 759       }
 760    }
 761    LOG __FILE__, "%s() Processed %lu entries.", methodName, (unsigned long)currIndex);
 762 
 763    if (finalize) {
 764       sqlite_finalize(pVm, &errMsg);
 765       if (rc != SQLITE_OK && rc != SQLITE_DONE) {
 766          LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled. %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg);
 767       }
 768       if (errMsg != 0) sqlite_freemem(errMsg);
 769    }
 770    else { /* Reset prepared statement */
 771       rc = sqlite_reset(pVm, &errMsg);
 772       if (rc == SQLITE_SCHEMA) {
 773          LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg);
 774       }
 775       if (errMsg != 0) sqlite_freemem(errMsg);
 776    }
 777 
 778    return stateOk ? currIndex : (-1)*rc;
 779 }
 780 
 781 /**
 782  * Access queue entries without removing them. 
 783  */
 784 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception)
 785 {
 786    int rc = 0;
 787    bool stateOk = true;
 788    DbInfo *dbInfo;
 789    QueueEntryArr *queueEntryArr = 0;
 790 
 791    if (checkArgs(queueP, "peekWithSamePriority", true, exception) == false ) return 0;
 792 
 793    LOG __FILE__, "peekWithSamePriority(maxNumOfEntries=%d, maxNumOfBytes=%s) ...", (int)maxNumOfEntries, int64ToStr(int64Str, maxNumOfBytes));
 794 
 795    dbInfo = getDbInfo(queueP);
 796 
 797    if (dbInfo->pVm_peekWithSamePriority == 0) {  /* Compile prepared  query */
 798       char queryString[LEN512];
 799       /*"SELECT * FROM XB_ENTRIES where queueName='connection_clientJoe' and prio=(select max(prio) from XB_ENTRIES where queueName='connection_clientJoe') ORDER BY dataId ASC";*/
 800       SNPRINTF(queryString, LEN512,
 801            "SELECT * FROM %.20sENTRIES where queueName=?"
 802            " and prio=(select max(prio) from %.20sENTRIES where queueName=?)"
 803            " ORDER BY dataId ASC",
 804            dbInfo->prop.tablePrefix, dbInfo->prop.tablePrefix);
 805       stateOk = compilePreparedQuery(queueP, "peekWithSamePriority",
 806                     &dbInfo->pVm_peekWithSamePriority , queryString, exception);
 807    }
 808 
 809    if (stateOk) { /* set prepared statement tokens */
 810       int index = 0;
 811       int len = -1; /* Calculated by sqlite_bind */
 812       rc = SQLITE_OK;
 813 
 814       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);
 815       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_peekWithSamePriority, ++index, dbInfo->prop.queueName, len, false);
 816 
 817       switch (rc) {
 818          case SQLITE_OK:
 819             LOG __FILE__, "peekWithSamePriority() Bound to prepared statement [sqlCode=%d]", rc);
 820             break;
 821          default:
 822             LOG __FILE__, "peekWithSamePriority() SQL error: %d %s", rc, sqlite_error_string(rc));
 823             strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 824             SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 825                      "[%.100s:%d] peekWithSamePriority() SQL error: %d %s", __FILE__, __LINE__, rc, sqlite_error_string(rc));
 826             stateOk = false;
 827             break;
 828       }
 829    }
 830 
 831    if (stateOk) { /* start the query */
 832       TmpHelper helper;
 833       int32_t currIndex = 0;
 834       helper.queueEntryArrPP = &queueEntryArr;
 835       helper.maxNumOfEntries = maxNumOfEntries;
 836       helper.maxNumOfBytes = maxNumOfBytes;
 837       currIndex = getResultRows(queueP, "peekWithSamePriority",
 838                         dbInfo->pVm_peekWithSamePriority, parseQueueEntryArr,
 839                         &helper, false, exception);
 840       stateOk = currIndex >= 0;
 841       if (!stateOk) {
 842          if (queueEntryArr) {
 843             free(queueEntryArr->queueEntryArr);
 844             queueEntryArr->len = 0;
 845          }
 846       }
 847       else {
 848          if (!queueEntryArr)
 849             queueEntryArr = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));
 850          else if ((size_t)currIndex < queueEntryArr->len) {
 851             queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, currIndex * sizeof(QueueEntry));
 852             queueEntryArr->len = currIndex; 
 853          }
 854       }
 855    }
 856 
 857    LOG __FILE__, "peekWithSamePriority() %s", stateOk ? "done" : "failed");
 858    return queueEntryArr;
 859 }
 860 
 861 /**
 862  * Removes the given entries from persistence. 
 863  * @return The number of removed entries
 864  */
 865 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception)
 866 {
 867    bool stateOk = true;
 868    int64_t numOfBytes = 0;
 869    int32_t countDeleted = 0;
 870    sqlite_vm *pVm = 0;
 871    DbInfo *dbInfo;
 872    if (checkArgs(queueP, "randomRemove", true, exception) == false || queueEntryArr == 0 ||
 873                  queueEntryArr->len == 0 || queueEntryArr->queueEntryArr == 0)
 874       return 0;
 875 
 876    LOG __FILE__, "randomRemove(%d) ...", (int)queueEntryArr->len);
 877 
 878    dbInfo = getDbInfo(queueP);
 879 
 880    {
 881       size_t i;
 882       const size_t qLen = 128 + 2*ID_MAX + queueEntryArr->len*(INT64_STRLEN_MAX+6);
 883       char *queryString = (char *)calloc(qLen, sizeof(char));
 884       /*  DELETE FROM xb_entries WHERE queueName = 'connection_clientJoe' AND dataId in ( 1081492136876000000, 1081492136856000000 ); */
 885       SNPRINTF(queryString, qLen, 
 886            "DELETE FROM %.20sENTRIES WHERE queueName='%s'"
 887            " AND dataId in ( ",
 888            dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
 889 
 890       for (i=0; i<queueEntryArr->len; i++) {
 891          strcat(queryString, int64ToStr(int64Str, queueEntryArr->queueEntryArr[i].uniqueId));
 892          if (i<(queueEntryArr->len-1)) strcat(queryString, ",");
 893          numOfBytes += ((queueEntryArr->queueEntryArr[i].sizeInBytes > 0) ? queueEntryArr->queueEntryArr[i].sizeInBytes : queueEntryArr->queueEntryArr[i].embeddedBlob.dataLen);
 894       }
 895       strcat(queryString, " )");
 896       stateOk = compilePreparedQuery(queueP, "randomRemove", &pVm, queryString, exception);
 897       free(queryString);
 898    }
 899 
 900 
 901    if (stateOk) { /* start the query */
 902       int32_t currIndex = getResultRows(queueP, "randomRemove",
 903                               pVm, 0, 0, true, exception);
 904       stateOk = currIndex >= 0;
 905    }
 906 
 907    if (stateOk) {
 908       countDeleted = (int32_t)sqlite_last_statement_changes(dbInfo->db);
 909       if (countDeleted < 0 || (size_t)countDeleted != queueEntryArr->len) {
 910          fillCache(queueP, exception); /* calculate numOfBytes again */
 911       }
 912       else {
 913          dbInfo->numOfEntries -= queueEntryArr->len;
 914          dbInfo->numOfBytes -= numOfBytes;
 915       }
 916    }
 917 
 918    return countDeleted;
 919 }
 920 
 921 /**
 922  * Destroy all entries in queue and releases all resources in memory and on HD. 
 923  */
 924 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception)
 925 {
 926    bool stateOk = true;
 927    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
 928    if (checkArgs(queueP, "destroy", false, exception) == false ) return false;
 929 
 930    shutdownInternal(queuePP, exception);
 931 
 932    {
 933       DbInfo *dbInfo = getDbInfo(queueP);
 934       const char *dbName = dbInfo->prop.dbName;
 935       stateOk = unlink(dbName) == 0; /* Delete old db file */
 936       if (!stateOk) {
 937          strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 938          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 939                   "[%.100s:%d] destroy() ERROR: Can't destroy database '%s', errno=%d.", __FILE__, __LINE__, dbName, errno);
 940       }
 941    }
 942 
 943    freeQueue(queuePP);
 944    
 945    return stateOk;
 946 }
 947 
 948 /**
 949  * Destroy all entries in queue. 
 950  */
 951 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception)
 952 {
 953    bool stateOk = true;
 954    char queryString[LEN256];
 955    sqlite_vm *pVm = 0;
 956    DbInfo *dbInfo;
 957    if (checkArgs(queueP, "clear", true, exception) == false) return false;
 958    dbInfo = getDbInfo(queueP);
 959 
 960    SNPRINTF(queryString, LEN256, "DELETE FROM %.20sENTRIES", dbInfo->prop.tablePrefix);
 961    stateOk = compilePreparedQuery(queueP, "clear", &pVm, queryString, exception);
 962 
 963    if (stateOk) {
 964       int32_t currIndex = getResultRows(queueP, "clear", pVm, 0, 0, true, exception);
 965       stateOk = currIndex >= 0;
 966    }
 967 
 968    if (stateOk) {
 969       dbInfo->numOfEntries = 0;
 970       dbInfo->numOfBytes = 0;
 971    }
 972 
 973    LOG __FILE__, "clear() done");
 974    return stateOk;
 975 }
 976 
 977 /**
 978  * Parse response of "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
 979  */
 980 static bool parseCacheInfo(I_Queue *queueP, size_t currIndex, void *userP,
 981                            const char **pazValue, const char **pazColName, ExceptionStruct *exception)
 982 {
 983    int64_t ival = 0;
 984    bool stateOk;
 985    DbInfo *dbInfo = getDbInfo(queueP);
 986 
 987    stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_DATA_ID]);
 988    if (!stateOk) {
 989       strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 990       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
 991                "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfEntries, ignoring entry.", __FILE__, __LINE__, pazColName[XB_ENTRIES_DATA_ID], pazValue[XB_ENTRIES_DATA_ID]);
 992       return false;
 993    }
 994    dbInfo->numOfEntries = (int32_t)ival;
 995 
 996    stateOk = strToInt64(&dbInfo->numOfBytes, pazValue[1]);
 997    if (!stateOk) {
 998       strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
 999       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
1000                "[%.100s:%d] parseCacheInfo() ERROR: Can't parse %s='%.20s' to numOfBytes, ignoring entry.", __FILE__, __LINE__, pazColName[1], pazValue[1]);
1001       if (currIndex) {} /* Just to avoid compiler warning about unused variable */
1002       if (userP) {};
1003       return false;
1004    }
1005 
1006    return true;
1007 }
1008 
1009 /**
1010  * Reload cached information from database. 
1011  * @param queueP The this pointer
1012  * @param exception Returns error
1013  * @return false on error
1014  */
1015 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception)
1016 {
1017    bool stateOk = true;
1018    DbInfo *dbInfo = 0;
1019 
1020    char queryString[LEN512]; /* "SELECT count(dataId) FROM XB_ENTRIES where queueName='connection_clientJoe'" */
1021 
1022    if (checkArgs(queueP, "fillCache", true, exception) == false ) return true;
1023    dbInfo = getDbInfo(queueP);
1024 
1025    SNPRINTF(queryString, LEN512, 
1026             "SELECT count(dataId), sum(byteSize) FROM %.20sENTRIES where queueName='%s'",
1027             dbInfo->prop.tablePrefix, dbInfo->prop.queueName);
1028    stateOk = compilePreparedQuery(queueP, "fillCache",
1029                   &dbInfo->pVm_fillCache, queryString, exception);
1030 
1031    if (stateOk) { /* start the query, calls parseCacheInfo() */
1032       int32_t currIndex = getResultRows(queueP, "fillCache",
1033                               dbInfo->pVm_fillCache, parseCacheInfo,
1034                               0, false, exception);
1035       stateOk = currIndex > 0;
1036    }
1037 
1038    LOG __FILE__, "fillCache() numOfEntries=%d numOfBytes=%s", dbInfo->numOfEntries, int64ToStr(int64Str, dbInfo->numOfBytes));
1039    return stateOk;
1040 }
1041 
1042 static bool persistentQueueEmpty(I_Queue *queueP)
1043 {
1044    return getNumOfEntries(queueP) <= 0;
1045 }
1046 
1047 static int32_t getNumOfEntries(I_Queue *queueP)
1048 {
1049    DbInfo *dbInfo;
1050    bool stateOk = true;
1051    ExceptionStruct exception;
1052    if (checkArgs(queueP, "getNumOfEntries", false, &exception) == false ) return -1;
1053    dbInfo = getDbInfo(queueP);
1054    if (dbInfo->numOfEntries == -1) {
1055       stateOk = fillCache(queueP, &exception);
1056    }
1057    return (stateOk) ? (int32_t)dbInfo->numOfEntries : -1;
1058 }
1059 
1060 static int32_t getMaxNumOfEntries(I_Queue *queueP)
1061 {
1062    DbInfo *dbInfo;
1063    ExceptionStruct exception;
1064    if (checkArgs(queueP, "getMaxNumOfEntries", false, &exception) == false ) return -1;
1065    dbInfo = getDbInfo(queueP);
1066    return dbInfo->prop.maxNumOfEntries;
1067 }
1068 
1069 static int64_t getNumOfBytes(I_Queue *queueP)
1070 {
1071    DbInfo *dbInfo;
1072    ExceptionStruct exception;
1073    bool stateOk = true;
1074    if (checkArgs(queueP, "getNumOfBytes", false, &exception) == false ) return -1;
1075    dbInfo = getDbInfo(queueP);
1076    if (dbInfo->numOfBytes == -1) {
1077       stateOk = fillCache(queueP, &exception);
1078    }
1079    return (stateOk) ? dbInfo->numOfBytes : -1;
1080 }
1081 
1082 static int64_t getMaxNumOfBytes(I_Queue *queueP)
1083 {
1084    DbInfo *dbInfo;
1085    ExceptionStruct exception;
1086    if (checkArgs(queueP, "getMaxNumOfBytes", false, &exception) == false ) return -1;
1087    dbInfo = getDbInfo(queueP);
1088    return dbInfo->prop.maxNumOfBytes;
1089 }
1090 
1091 /**
1092  * Shutdown without destroying any entry. 
1093  * Clears all open DB resources.
1094  */
1095 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception)
1096 {
1097    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
1098    if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
1099    shutdownInternal(queuePP, exception);
1100    freeQueue(queuePP);
1101 }
1102 
1103 /**
1104  * Shutdown used internally without calling freeQueue(). 
1105  */
1106 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception)
1107 {
1108    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
1109    if (checkArgs(queueP, "shutdown", false, exception) == false ) return;
1110    {
1111       DbInfo *dbInfo = getDbInfo(queueP);
1112       queueP->isInitialized = false;
1113       if(dbInfo) {
1114          if (dbInfo->pVm_put) {
1115             char *errMsg = 0;
1116             /*int rc =*/ sqlite_finalize(dbInfo->pVm_put, &errMsg);
1117             if (errMsg != 0) sqlite_freemem(errMsg);
1118             dbInfo->pVm_put = 0;
1119          }
1120          if (dbInfo->pVm_peekWithSamePriority) {
1121             char *errMsg = 0;
1122             sqlite_finalize(dbInfo->pVm_peekWithSamePriority, &errMsg);
1123             if (errMsg != 0) sqlite_freemem(errMsg);
1124             dbInfo->pVm_peekWithSamePriority = 0;
1125          }
1126          if (dbInfo->pVm_fillCache) {
1127             char *errMsg = 0;
1128             sqlite_finalize(dbInfo->pVm_fillCache, &errMsg);
1129             if (errMsg != 0) sqlite_freemem(errMsg);
1130             dbInfo->pVm_fillCache = 0;
1131          }
1132          if (dbInfo->db) {
1133             sqlite_close(dbInfo->db);
1134             dbInfo->db = 0;
1135          }
1136          LOG __FILE__, "shutdown() done");
1137       }
1138    }
1139 }
1140 
1141 /**
1142  * Frees everything inside QueueEntryArr and the struct QueueEntryArr itself
1143  * @param queueEntryArr The struct to free, passing NULL is OK
1144  */
1145 Dll_Export void freeQueueEntryArr(QueueEntryArr *queueEntryArr)
1146 {
1147    if (queueEntryArr == (QueueEntryArr *)0) return;
1148    freeQueueEntryArrInternal(queueEntryArr);
1149    free(queueEntryArr);
1150 }
1151 
1152 /**
1153  * Frees everything inside QueueEntryArr but NOT the struct QueueEntryArr itself
1154  * @param queueEntryArr The struct internals to free, passing NULL is OK
1155  */
1156 Dll_Export void freeQueueEntryArrInternal(QueueEntryArr *queueEntryArr)
1157 {
1158    size_t i;
1159    if (queueEntryArr == (QueueEntryArr *)0) return;
1160    for (i=0; i<queueEntryArr->len; i++) {
1161       freeQueueEntryData(&queueEntryArr->queueEntryArr[i]);
1162    }
1163    free(queueEntryArr->queueEntryArr);
1164    queueEntryArr->len = 0;
1165 }
1166 
1167 /**
1168  * Does not free the queueEntry itself
1169  */
1170 static void freeQueueEntryData(QueueEntry *queueEntry)
1171 {
1172    if (queueEntry == (QueueEntry *)0) return;
1173    if (queueEntry->embeddedBlob.data != 0) {
1174       free((char *)queueEntry->embeddedBlob.data);
1175       queueEntry->embeddedBlob.data = 0;
1176    }
1177    queueEntry->embeddedBlob.dataLen = 0;
1178 }
1179 
1180 /**
1181  * Frees the internal blob and the queueEntry itself. 
1182  * @param queueEntry Its memory is freed, it is not usable anymore after this call
1183  */
1184 Dll_Export void freeQueueEntry(QueueEntry *queueEntry)
1185 {
1186    if (queueEntry == (QueueEntry *)0) return;
1187    freeQueueEntryData(queueEntry);
1188    free(queueEntry);
1189 }
1190 
1191 /**
1192  * NOTE: You need to free the returned pointer with xmlBlasterFree() (which calls free())!
1193  *
1194  * @param queueEntry The data to put to the queue
1195  * @param maxContentDumpLen for -1 get the complete content, else limit the
1196  *        content to the given number of bytes
1197  * @return A ASCII XML formatted entry or NULL if out of memory
1198  */
1199 Dll_Export char *queueEntryToXml(QueueEntry *queueEntry, int maxContentDumpLen)
1200 {
1201    if (queueEntry == (QueueEntry *)0) return 0;
1202    {
1203    char *contentStr = strFromBlobAlloc(queueEntry->embeddedBlob.data, queueEntry->embeddedBlob.dataLen);
1204    const size_t blobLen = (maxContentDumpLen >= 0) ? maxContentDumpLen : queueEntry->embeddedBlob.dataLen;
1205    const size_t len = 200 + QUEUE_ENTRY_EMBEDDEDTYPE_LEN + blobLen;
1206    char *xml = (char *)calloc(len, sizeof(char));
1207    if (xml == 0) {
1208       free(contentStr);
1209       return 0;
1210    }
1211    if (maxContentDumpLen == 0)
1212       *contentStr = 0;
1213    else if (maxContentDumpLen > 0 && queueEntry->embeddedBlob.dataLen > 5 &&
1214             (size_t)maxContentDumpLen < (queueEntry->embeddedBlob.dataLen-5))
1215       strcpy(contentStr+maxContentDumpLen, " ...");
1216 
1217    SNPRINTF(xml, len, "\n <QueueEntry id='%s' priority='%hd' persistent='%s' type='%s'>"
1218                       "\n  <content size='%lu'><![CDATA[%s]]></content>"
1219                       "\n <QueueEntry>",
1220                         int64ToStr(int64Str, queueEntry->uniqueId), queueEntry->priority,
1221                         queueEntry->isPersistent?"true":"false",
1222                         queueEntry->embeddedType,
1223                         (unsigned long)queueEntry->embeddedBlob.dataLen, contentStr);
1224    free(contentStr);
1225    return xml;
1226    }
1227 }
1228 
1229 Dll_Export void freeEntryDump(char *entryDump)
1230 {
1231    if (entryDump) free(entryDump);
1232 }
1233 
1234 /**
1235  * Checks the given arguments to be valid.
1236  * @param queueP The queue instance
1237  * @param methodName For logging
1238  * @param checkIsConnected If true does check the connection state as well 
1239  * @param exception Transporting errors 
1240  * @return false if the parameters are not usable,
1241  *         in this case 'exception' is filled with detail informations
1242  */
1243 static bool checkArgs(I_Queue *queueP, const char *methodName,
1244                       bool checkIsConnected, ExceptionStruct *exception)
1245 {
1246    if (queueP == 0) {
1247       if (exception == 0) {
1248          printf("[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to %s()\n",
1249                   __FILE__, __LINE__, methodName);
1250       }
1251       else {
1252          strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
1253          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
1254                   "[%.100s:%d] Please provide a valid I_Queue pointer to %.16s()",
1255                    __FILE__, __LINE__, methodName);
1256          LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
1257       }
1258       return false;
1259    }
1260 
1261    if (exception == 0) {
1262       LOG __FILE__, "[%s:%d] Please provide valid exception pointer to %s()", __FILE__, __LINE__, methodName);
1263       return false;
1264    }
1265 
1266    if (checkIsConnected) {
1267       if (queueP->privateObject==0 ||
1268           ((DbInfo *)(queueP->privateObject))->db==0 ||
1269           !queueP->isInitialized) {
1270          strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
1271          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
1272                   "[%.100s:%d] Not connected to database, %s() failed",
1273                    __FILE__, __LINE__, methodName);
1274          LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
1275          return false;
1276       }
1277    }
1278 
1279    initializeExceptionStruct(exception);
1280 
1281    LOG __FILE__, "%s() entering ...", methodName);
1282 
1283    return true;
1284 }
1285 
1286 /*=================== TESTCODE =======================*/
1287 # ifdef QUEUE_MAIN
1288 /*
1289  NOTE:
1290     This code may be totally outdated, for current examples please use:
1291     xmlBlaster/testsuite/src/c/TestQueue.c
1292 */
1293 #include <stdio.h>
1294 static void testRun(int argc, char **argv) {
1295    ExceptionStruct exception;
1296    QueueEntryArr *entries = 0;
1297    QueueProperties queueProperties;
1298    I_Queue *queueP = 0;
1299 
1300    memset(&queueProperties, 0, sizeof(QueueProperties));
1301    strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
1302    strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
1303    strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
1304    queueProperties.maxNumOfEntries = 10000000L;
1305    queueProperties.maxNumOfBytes = 1000000000LL;
1306    queueProperties.logFp = xmlBlasterDefaultLogging;
1307    queueProperties.logLevel = XMLBLASTER_LOG_TRACE;
1308    queueProperties.userObject = 0;
1309 
1310    queueP = createQueue(&queueProperties, &exception);
1311    /* DbInfo *dbInfo = (DbInfo *)queueP->privateObject; */
1312    if (argc || argv) {} /* to avoid compiler warning */
1313 
1314    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1315 
1316    {
1317       int64_t idArr[] =   { 1081492136826000000ll, 1081492136856000000ll, 1081492136876000000ll };
1318       int16_t prioArr[] = { 5                    , 1                    , 5 };
1319       char *data[] =      { "Hello"              , " World"             , "!!!" };
1320       size_t i;
1321       for (i=0; i<sizeof(idArr)/sizeof(int64_t); i++) {
1322          QueueEntry queueEntry;
1323          memset(&queueEntry, 0, sizeof(QueueEntry));
1324          queueEntry.priority = prioArr[i];
1325          queueEntry.isPersistent = true;
1326          queueEntry.uniqueId = idArr[i];
1327          strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
1328          queueEntry.embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN-1] = 0;
1329          queueEntry.embeddedBlob.data = data[i];
1330          queueEntry.embeddedBlob.dataLen = strlen(queueEntry.embeddedBlob.data);
1331 
1332          queueP->put(queueP, &queueEntry, &exception);
1333          if (*exception.errorCode != 0) {
1334             LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1335          }
1336       }
1337    }
1338    
1339    entries = queueP->peekWithSamePriority(queueP, -1, 6, &exception);
1340    if (*exception.errorCode != 0) {
1341       LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1342    }
1343    if (entries != 0) {
1344       size_t i;
1345       printf("testRun after peekWithSamePriority() dump %lu entries:\n", (unsigned long)entries->len);
1346       for (i=0; i<entries->len; i++) {
1347          QueueEntry *queueEntry = &entries->queueEntryArr[i];
1348          char *dump = queueEntryToXml(queueEntry, 200);
1349          printf("%s\n", dump);
1350          free(dump);
1351       }
1352    }
1353 
1354    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1355    queueP->randomRemove(queueP, entries, &exception);
1356    if (*exception.errorCode != 0) {
1357       LOG __FILE__, "TEST FAILED: [%s] %s\n", exception.errorCode, exception.message);
1358    }
1359 
1360    freeQueueEntryArr(entries);
1361    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1362    
1363    queueP->clear(queueP, &exception);
1364    printf("Queue numOfEntries=%d, numOfBytes=%s, empty=%s\n", queueP->getNumOfEntries(queueP), int64ToStr(int64Str, queueP->getNumOfBytes(queueP)), queueP->empty(queueP) ? "true" : "false");
1365 
1366    queueP->shutdown(&queueP, &exception);
1367 }
1368 
1369 int main(int argc, char **argv) {
1370    int i;
1371    for (i=0; i<1; i++) {
1372       testRun(argc, argv);
1373    }
1374    return 0;
1375 }
1376 #endif /*QUEUE_MAIN*/
1377 #endif /* (__IPhoneOS__) */
1378 /*=================== TESTCODE =======================*/


syntax highlighted by Code2HTML, v. 0.9.1