util/queue/SQLiteQueue.c

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002 Name:      SQLiteQueue.c
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   A persistent queue implementation based on the SQLite relational database
00006            Depends only on I_Queue.h and ../helper.c and ../helper.h (which includes basicDefs.h)
00007            and can easily be used outside of xmlBlaster.
00008            Further you need sqlite.h and the sqlite library (dll,so,sl)
00009 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
00010 Date:      04/2004
00011 Compile:   Compiles at least on Windows, Linux, Solaris. Further porting should be simple.
00012            Needs pthread.h but not the pthread library (for exact times)
00013 
00014             export LD_LIBRARY_PATH=/opt/sqlite-bin/lib
00015             gcc -g -Wall -DQUEUE_MAIN=1 -I../../ -o SQLiteQueue SQLiteQueue.c ../helper.c -I/opt/sqlite-bin/include -L/opt/sqlite-bin/lib -lsqlite
00016             (use optionally  -ansi -pedantic -Wno-long-long
00017             (Intel C: icc -wd981 ...)
00018 
00019            Compile inside xmlBlaster:
00020             build -DXMLBLASTER_PERSISTENT_QUEUE=true c-delete c
00021            expects xmlBlaster/src/c/util/queue/sqlite.h and xmlBlaster/lib/libsqlite.so
00022 
00023 
00024            Compile Test main()
00025             Replace /I\c\sqlite for your needs ( says where sqlite.h resides ):
00026                                 and \pialibs\sqlite.lib as well ( sqlite.lib is created from sqlite.def via lib /DEF:sqlite.def)
00027            cl /MD /DQUEUE_MAIN /DDLL_IGNORE /DXB_NO_PTHREADS /D_WINDOWS /I\c\sqlite /I..\.. SqliteQueue.c ..\helper.c /link \pialibs\sqlite.lib
00028 
00029 
00030 Table layout XB_ENTRIES:
00031            dataId bigint
00032            queueName text
00033            prio integer
00034            flag text
00035            durable char(1)
00036            byteSize bigint
00037            blob bytea
00038            PRIMARY KEY (dataId, queueName)
00039 
00040 Todo:      Tuning:
00041             - Add prio to PRIMARY KEY
00042             - In persistentQueuePeekWithSamePriority() add queueName to statement as it never changes
00043 
00044 @see:      http://www.sqlite.org/
00045 @see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.c.queue.html
00046 @see:      http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.html
00047 Testsuite: xmlBlaster/testsuite/src/c/TestQueue.c
00048 -----------------------------------------------------------------------------*/
00049 #include <stdio.h>
00050 #include <string.h>
00051 #include <malloc.h>
00052 #if !defined(_WINDOWS)
00053 # include <unistd.h>   /* unlink() */
00054 # include <errno.h>    /* unlink() */
00055 #endif
00056 #include "util/queue/QueueInterface.h"
00057 #include "sqlite.h"
00058 
00059 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception);
00060 static const QueueProperties *getProperties(I_Queue *queueP);
00061 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception);
00062 static QueueEntryArr *persistentQueuePeekWithSamePriority(I_Queue *queueP, int32_t maxNumOfEntries, int64_t maxNumOfBytes, ExceptionStruct *exception);
00063 static int32_t persistentQueueRandomRemove(I_Queue *queueP, const QueueEntryArr *queueEntryArr, ExceptionStruct *exception);
00064 static bool persistentQueueClear(I_Queue *queueP, ExceptionStruct *exception);
00065 static int32_t getNumOfEntries(I_Queue *queueP);
00066 static int32_t getMaxNumOfEntries(I_Queue *queueP);
00067 static int64_t getNumOfBytes(I_Queue *queueP);
00068 static int64_t getMaxNumOfBytes(I_Queue *queueP);
00069 static bool persistentQueueEmpty(I_Queue *queueP);
00070 static void persistentQueueShutdown(I_Queue **queuePP, ExceptionStruct *exception);
00071 static bool persistentQueueDestroy(I_Queue **queuePP, ExceptionStruct *exception);
00072 static bool checkArgs(I_Queue *queueP, const char *methodName, bool checkIsConnected, ExceptionStruct *exception);
00073 static bool createTables(I_Queue *queueP, ExceptionStruct *exception);
00074 static bool execSilent(I_Queue *queueP, const char *sqlStatement, const char *comment, ExceptionStruct *exception);
00075 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName, sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception);
00076 static bool fillCache(I_Queue *queueP, ExceptionStruct *exception);
00077 static void shutdownInternal(I_Queue **queuePP, ExceptionStruct *exception);
00078 static void freeQueueEntryData(QueueEntry *queueEntry);
00079 
00085 typedef bool ( * ParseDataFp)(I_Queue *queueP, size_t currIndex, void *userP,
00086                                const char **pazValue, const char **pazColName, ExceptionStruct *exception);
00087 static int32_t getResultRows(I_Queue *queueP, const char *methodName,
00088                              sqlite_vm *pVm, 
00089                              ParseDataFp parseDataFp, void *userP, bool finalize,
00090                              ExceptionStruct *exception);
00091 
00092 /* Shortcut for:
00093     if (queueP->log) queueP->log(queueP, XMLBLASTER_LOG_TRACE, XMLBLASTER_LOG_TRACE, __FILE__, "Persistent queue is created");
00094    is
00095     LOG __FILE__, "Persistent queue is created");
00096 */
00097 #define LOG if (queueP && queueP->log) queueP->log(queueP, queueP->logLevel, XMLBLASTER_LOG_TRACE, 
00098 
00099 #define LEN512 512  /* ISO C90 forbids variable-size array: const int LEN512=512; */
00100 #define LEN256 256  /* ISO C90 forbids variable-size array: const int LEN256=256; */
00101 
00102 #define DBNAME_MAX 128
00103 #define ID_MAX 256
00104 
00108 typedef struct DbInfoStruct {
00109    QueueProperties prop;         
00110    size_t numOfEntries;          
00111    int64_t numOfBytes;           
00112    sqlite *db;                   
00113    sqlite_vm *pVm_put;           
00114    sqlite_vm *pVm_peekWithSamePriority;
00115    sqlite_vm *pVm_fillCache;
00116 } DbInfo;
00117 
00121 typedef struct {
00122    QueueEntryArr **queueEntryArrPP;
00123    int32_t currEntries;
00124    int64_t currBytes;
00125    int32_t maxNumOfEntries; 
00126    int64_t maxNumOfBytes;   
00127 } TmpHelper;
00128 
00129 static char int64Str_[INT64_STRLEN_MAX];
00130 static char * const int64Str = int64Str_;   /* to make the pointer address const */
00131 
00133 enum {
00134    XB_ENTRIES_DATA_ID = 0,
00135    XB_ENTRIES_QUEUE_NAME,
00136    XB_ENTRIES_PRIO,
00137    XB_ENTRIES_TYPE_NAME,
00138    XB_ENTRIES_PERSISTENT,
00139    XB_ENTRIES_SIZE_IN_BYTES,
00140    XB_ENTRIES_BLOB
00141 };
00142 
00143 
00151 Dll_Export I_Queue *createQueue(const QueueProperties* queueProperties, ExceptionStruct *exception)
00152 {
00153    bool stateOk = true;
00154    I_Queue *queueP = (I_Queue *)calloc(1, sizeof(I_Queue));
00155    if (queueP == 0) return queueP;
00156    queueP->isInitialized = false;
00157    queueP->initialize = persistentQueueInitialize;
00158    queueP->getProperties = getProperties;
00159    queueP->put = persistentQueuePut;
00160    queueP->peekWithSamePriority = persistentQueuePeekWithSamePriority;
00161    queueP->randomRemove = persistentQueueRandomRemove;
00162    queueP->clear = persistentQueueClear;
00163    queueP->getNumOfEntries = getNumOfEntries;
00164    queueP->getMaxNumOfEntries = getMaxNumOfEntries;
00165    queueP->getNumOfBytes = getNumOfBytes;
00166    queueP->getMaxNumOfBytes = getMaxNumOfBytes;
00167    queueP->empty = persistentQueueEmpty;
00168    queueP->shutdown = persistentQueueShutdown;
00169    queueP->destroy = persistentQueueDestroy;
00170    queueP->privateObject = calloc(1, sizeof(DbInfo));
00171    {
00172       DbInfo *dbInfo = (DbInfo *)queueP->privateObject;
00173       dbInfo->numOfEntries = -1;
00174       dbInfo->numOfBytes = -1;
00175    }
00176    stateOk = queueP->initialize(queueP, queueProperties, exception);
00177    if (stateOk) {
00178       LOG __FILE__, "Persistent queue SQLite version " SQLITE_VERSION " is created");
00179    }
00180    else {
00181       ExceptionStruct ex;
00182       queueP->shutdown(&queueP, &ex);
00183       if (*ex.errorCode != 0) {
00184          embedException(exception, ex.errorCode, ex.message, exception);
00185       }
00186       queueP = 0;
00187    }
00188    return queueP;
00189 }
00190 
00192 static _INLINE_FUNC DbInfo *getDbInfo(I_Queue *queueP) {
00193    return (queueP==0) ? 0 : (DbInfo *)(queueP->privateObject);
00194 }
00195 
00201 static const QueueProperties *getProperties(I_Queue *queueP)
00202 {
00203    ExceptionStruct exception;
00204    if (checkArgs(queueP, "getProperties", false, &exception) == false ) return 0;
00205    return &getDbInfo(queueP)->prop;
00206 }
00207 
00210 static void freeQueue(I_Queue **queuePP)
00211 {
00212    I_Queue *queueP = (queuePP == 0) ? 0 : *queuePP;
00213    if (queueP == 0) {
00214       fprintf(stderr, "[%s:%d] [user.illegalArgument] Please provide a valid I_Queue pointer to freeQueue()\n", __FILE__, __LINE__);
00215       return;
00216    }
00217 
00218    LOG __FILE__, "freeQueue() called");
00219 
00220    if (queueP->privateObject) {
00221       free(queueP->privateObject);
00222       queueP->privateObject = 0;
00223    }
00224 
00225    free(queueP);
00226    *queuePP = 0;
00227 }
00228 
00236 static bool persistentQueueInitialize(I_Queue *queueP, const QueueProperties *queueProperties, ExceptionStruct *exception)
00237 {
00238    char *errMsg = 0;
00239    bool retOk;
00240    const int OPEN_RW = 0;
00241    sqlite *db;
00242    DbInfo *dbInfo;
00243 
00244    if (checkArgs(queueP, "initialize", false, exception) == false ) return false;
00245    if (queueProperties == 0) {
00246       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00247       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00248                "[%.100s:%d] Please provide a valid QueueProperties pointer to initialize()", __FILE__, __LINE__);
00249       /* LOG __FILE__, "%s: %s", exception->errorCode, exception->message); */
00250       fprintf(stderr, "[%s:%d] %s: %s", __FILE__, __LINE__, exception->errorCode, exception->message);
00251       return false;
00252    }
00253 
00254    queueP->log = queueProperties->logFp;
00255    queueP->logLevel = queueProperties->logLevel;
00256    queueP->userObject = queueProperties->userObject;
00257 
00258    if (*queueProperties->dbName == 0 || *queueProperties->queueName == 0 ||
00259        queueProperties->maxNumOfEntries == 0 || queueProperties->maxNumOfBytes == 0) {
00260       char dbName[QUEUE_DBNAME_MAX];
00261       char queueName[QUEUE_ID_MAX];
00262       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00263       if (queueProperties->dbName == 0)
00264          strncpy0(dbName, "NULL", QUEUE_DBNAME_MAX);
00265       else
00266          strncpy0(dbName, queueProperties->dbName, QUEUE_DBNAME_MAX);
00267       if (queueProperties->queueName == 0)
00268          strncpy0(queueName, "NULL", QUEUE_ID_MAX);
00269       else
00270          strncpy0(queueName, queueProperties->queueName, QUEUE_ID_MAX);
00271       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00272                "[%.100s:%d] Please provide a proper initialized QueueProperties pointer to initialize(): dbName='%s', queueName='%s',"
00273                " maxNumOfEntries=%ld, maxNumOfBytes=%ld", __FILE__, __LINE__,
00274                dbName, queueName, (long)queueProperties->maxNumOfEntries, (long)queueProperties->maxNumOfBytes);
00275       LOG __FILE__, "%s: %s", exception->errorCode, exception->message);
00276       return false;
00277    }
00278 
00279    dbInfo = getDbInfo(queueP);
00280    memcpy(&dbInfo->prop, queueProperties, sizeof(QueueProperties));
00281 
00282    /* Never trust a queue property you haven't overflowed yourself :-) */
00283    dbInfo->prop.dbName[QUEUE_DBNAME_MAX-1] = 0;
00284    dbInfo->prop.queueName[QUEUE_ID_MAX-1] = 0;
00285    dbInfo->prop.tablePrefix[QUEUE_PREFIX_MAX-1] = 0;
00286 
00287    LOG __FILE__, "dbName          = %s", dbInfo->prop.dbName);
00288    LOG __FILE__, "queueName       = %s", dbInfo->prop.queueName);
00289    LOG __FILE__, "tablePrefix     = %s", dbInfo->prop.tablePrefix);
00290    LOG __FILE__, "maxNumOfEntries = %ld",dbInfo->prop.maxNumOfEntries);
00291    LOG __FILE__, "maxNumOfBytes   = %ld",(long)dbInfo->prop.maxNumOfBytes);
00292    /*LOG __FILE__, "logFp           = %d", (int)dbInfo->prop.logFp);*/
00293    LOG __FILE__, "logLevel        = %d", (int)dbInfo->prop.logLevel);
00294    /*LOG __FILE__, "userObject      = %d", (void*)dbInfo->prop.userObject);*/
00295 
00296    db = sqlite_open(dbInfo->prop.dbName, OPEN_RW, &errMsg);
00297    dbInfo->db = db;
00298 
00299    if (db==0) {
00300       queueP->isInitialized = false;
00301       if(queueP->log) {
00302          if (errMsg) {
00303             LOG __FILE__, "%s", errMsg);
00304          }
00305          else {
00306             LOG __FILE__, "Unable to open database '%s'", dbInfo->prop.dbName);
00307          }
00308       }
00309       else {
00310         if (errMsg)
00311            fprintf(stderr,"[%s] %s\n", __FILE__, errMsg);
00312         else
00313            fprintf(stderr,"[%s] Unable to open database %s\n", __FILE__, dbInfo->prop.dbName);
00314       }
00315       strncpy0(exception->errorCode, "resource.db.unavailable", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00316       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00317                "[%.100s:%d] Creating SQLiteQueue '%s' failed: %s", __FILE__, __LINE__, dbInfo->prop.dbName, (errMsg==0)?"":errMsg);
00318       if (errMsg != 0) sqlite_freemem(errMsg);
00319       return false;
00320    }
00321 
00322    queueP->isInitialized = true;
00323 
00324    retOk = createTables(queueP, exception);
00325 
00326    fillCache(queueP, exception);
00327 
00328    LOG __FILE__, "initialize(%s) %s", dbInfo->prop.dbName, retOk?"successful":"failed");
00329    return true;
00330 }
00331 
00338 static bool createTables(I_Queue *queueP, ExceptionStruct *exception)
00339 {
00340    char queryString[LEN512];
00341    bool retOk;
00342    const char *tablePrefix = ((DbInfo *)(queueP->privateObject))->prop.tablePrefix;
00343 
00344    SNPRINTF(queryString, LEN512, "CREATE TABLE %.20sENTRIES (dataId bigint , queueName text , prio integer, flag text, durable char(1), byteSize bigint, blob bytea, PRIMARY KEY (dataId, queueName));",
00345            tablePrefix);
00346    retOk = execSilent(queueP, queryString, "Creating ENTRIES table", exception);
00347 
00348    SNPRINTF(queryString, LEN512, "CREATE INDEX %.20sENTRIES_IDX ON %.20sENTRIES (prio);",
00349            tablePrefix, tablePrefix);
00350    retOk = execSilent(queueP, queryString, "Creating PRIO index", exception);
00351    return retOk;
00352 }
00353 
00362 static bool execSilent(I_Queue *queueP, const char *queryString, const char *comment, ExceptionStruct *exception)
00363 {
00364    int rc = 0;
00365    char *errMsg = 0;
00366    bool retOk;
00367    DbInfo *dbInfo = getDbInfo(queueP);
00368 
00369    rc = sqlite_exec(dbInfo->db, queryString, NULL, NULL, &errMsg);
00370    switch (rc) {
00371       case SQLITE_OK:
00372          LOG __FILE__, "SQL '%s' success", comment);
00373          retOk = true;
00374          break;
00375       default:
00376          if (errMsg && strstr(errMsg, "already exists")) {
00377             LOG __FILE__, "OK, '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
00378             retOk = true;
00379          }
00380          else if (rc == SQLITE_CONSTRAINT && errMsg && strstr(errMsg, " not unique")) {
00381             LOG __FILE__, "OK, '%s' entry existed already [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
00382             retOk = true;
00383          }
00384          else {
00385             LOG __FILE__, "SQL error '%s' [%d]: %s %s", comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
00386             strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00387             SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00388                      "[%.100s:%d] SQL error '%s' [%d]: %s %s", __FILE__, __LINE__, comment, rc, sqlite_error_string(rc), (errMsg==0)?"":errMsg);
00389             retOk = false;
00390          }
00391          break;
00392    }
00393    if (errMsg != 0) sqlite_freemem(errMsg);
00394    return retOk;
00395 }
00396 
00397 /*
00398  * This is the callback routine that the SQLite library
00399  * invokes for each row of a query result.
00400 static int callback(void *pArg, int nArg, char **azArg, char **azCol){
00401    int i;
00402    struct callback_data *p = (struct callback_data*)pArg;
00403    int w = 5;
00404    if (p==0) {} // Suppress compiler warning
00405    if( azArg==0 ) return 0;
00406    for(i=0; i<nArg; i++){
00407       int len = strlen(azCol[i]);
00408       if( len>w ) w = len;
00409    }
00410    printf("\n");
00411    for(i=0; i<nArg; i++){
00412       printf("%*s = %s\n", w, azCol[i], azArg[i] ? azArg[i] : "NULL");
00413    }
00414   return 0;
00415 }
00416 */
00417 
00423 static void persistentQueuePut(I_Queue *queueP, const QueueEntry *queueEntry, ExceptionStruct *exception)
00424 {
00425    int rc = 0;
00426    bool stateOk = true;
00427    DbInfo *dbInfo;
00428    char embeddedType[QUEUE_ENTRY_EMBEDDEDTYPE_LEN]; /* To protect against buffer overflow */
00429 
00430    if (checkArgs(queueP, "put", true, exception) == false ) return;
00431    if (queueEntry == 0) {
00432       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00433       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00434                "[%.100s:%d] Please provide a valid queueEntry pointer to function put()", __FILE__, __LINE__);
00435       return;
00436    }
00437    if (queueEntry->uniqueId == 0) {
00438       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00439       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00440                "[%.100s:%d] Please provide a valid queueEntry->uniqueId to function put()", __FILE__, __LINE__);
00441       return;
00442    }
00443    if (*queueEntry->embeddedType == 0) {
00444       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00445       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00446                "[%.100s:%d] Please provide a valid queueEntry->embeddedType to function put()", __FILE__, __LINE__);
00447       return;
00448    }
00449    strncpy0(embeddedType, queueEntry->embeddedType, QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
00450 
00451    if (queueEntry->embeddedBlob.dataLen > 0 && queueEntry->embeddedBlob.data == 0) {
00452       strncpy0(exception->errorCode, "user.illegalArgument", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00453       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00454                "[%.100s:%d] Please provide a valid queueEntry->embeddedBlob to function put()", __FILE__, __LINE__);
00455       return;
00456    }
00457 
00458    dbInfo = getDbInfo(queueP);
00459 
00460    if ((int64_t)dbInfo->numOfEntries >= dbInfo->prop.maxNumOfEntries) {
00461       strncpy0(exception->errorCode, "resource.overflow.queue.entries", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00462       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00463                "[%.100s:%d] The maximum number of queue entries = %d is exhausted", __FILE__, __LINE__, dbInfo->prop.maxNumOfEntries);
00464       return;
00465    }
00466    if (dbInfo->numOfBytes >= dbInfo->prop.maxNumOfBytes) {
00467       strncpy0(exception->errorCode, "resource.overflow.queue.bytes", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00468       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00469                "[%.100s:%d] The maximum queue size of %s bytes is exhausted", __FILE__, __LINE__, int64ToStr(int64Str, dbInfo->prop.maxNumOfBytes));
00470       return;
00471    }
00472 
00473 
00474    if (dbInfo->pVm_put == 0) {  /* Compile prepared query only once */
00475       char queryString[LEN256];    /*INSERT INTO XB_ENTRIES VALUES ( 1081317015888000000, 'xmlBlaster_192_168_1_4_3412', 'topicStore_xmlBlaster_192_168_1_4_3412', 5, 'TOPIC_XML', 'T', 670, '\\254...')*/
00476       SNPRINTF(queryString, LEN256, "INSERT INTO %.20sENTRIES VALUES ( ?, ?, ?, ?, ?, ?, ?)", dbInfo->prop.tablePrefix);
00477       stateOk = compilePreparedQuery(queueP, "put", &dbInfo->pVm_put, queryString, exception);
00478    }
00479 
00480    if (stateOk) { /* set prepared statement tokens */
00481       char intStr[INT64_STRLEN_MAX];
00482       int index = 0;
00483       const int len = -1; /* Calculated by sqlite_bind */
00484       rc = SQLITE_OK;
00485 
00486       int64ToStr(intStr, queueEntry->uniqueId);
00487       /*LOG __FILE__, "put uniqueId as string '%s'", intStr);*/
00488       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
00489       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, dbInfo->prop.queueName, len, false);
00490       SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", queueEntry->priority);
00491       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
00492       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, embeddedType, len, false);
00493       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, queueEntry->isPersistent?"T":"F", len, false);
00494       SNPRINTF(intStr, INT64_STRLEN_MAX, "%d", (int32_t)queueEntry->embeddedBlob.dataLen);
00495       if (rc == SQLITE_OK) rc = sqlite_bind(dbInfo->pVm_put, ++index, intStr, len, true);
00496       if (rc == SQLITE_OK) {
00497          /* As SQLite does only store strings we encode our blob to a string */
00498          size_t estimatedSize = 2 +(257 * queueEntry->embeddedBlob.dataLen )/254;
00499          unsigned char *out = (unsigned char *)malloc(estimatedSize*sizeof(char));
00500          int encodedSize = sqlite_encode_binary((const unsigned char *)queueEntry->embeddedBlob.data,
00501                               (int)queueEntry->embeddedBlob.dataLen, out);
00502          rc = sqlite_bind(dbInfo->pVm_put, ++index, (const char *)out, encodedSize+1, true);
00503          free(out);
00504       }
00505 
00506       if (rc != SQLITE_OK) {
00507          LOG __FILE__, "put(%s) SQL error: %d %s", int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc));
00508          strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00509          SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00510                   "[%.100s:%d] put(%s) SQL error: %d %s", __FILE__, __LINE__, int64ToStr(int64Str, queueEntry->uniqueId), rc, sqlite_error_string(rc));
00511          stateOk = false;
00512       }
00513    }
00514 
00515    if (stateOk) { /* start the query, process results */
00516       int countRows = getResultRows(queueP, "put", dbInfo->pVm_put, 0, 0, false, exception);
00517       stateOk = countRows >= 0;
00518    }
00519 
00520    if (stateOk) {
00521       dbInfo->numOfEntries += 1;
00522       dbInfo->numOfBytes += ((queueEntry->sizeInBytes > 0) ? queueEntry->sizeInBytes : queueEntry->embeddedBlob.dataLen);
00523    }
00524 
00525    LOG __FILE__, "put(%s) %s", int64ToStr(int64Str, queueEntry->uniqueId), stateOk ? "done" : "failed");
00526 }
00527 
00528 
00539 static bool compilePreparedQuery(I_Queue *queueP, const char *methodName,
00540                     sqlite_vm **ppVm, const char *queryString, ExceptionStruct *exception)
00541 {
00542    int iRetry, numRetry=100;
00543    char *errMsg = 0;
00544    int rc = 0;
00545    const char *pzTail = 0;   /* OUT: uncompiled tail of zSql */
00546    bool stateOk = true;
00547    DbInfo *dbInfo = getDbInfo(queueP);
00548 
00549    if (*ppVm == 0) {  /* Compile prepared  query */
00550       for (iRetry = 0; iRetry < numRetry; iRetry++) {
00551          rc = sqlite_compile(dbInfo->db, queryString, &pzTail, ppVm, &errMsg);
00552          switch (rc) {
00553             case SQLITE_BUSY:
00554                if (iRetry == (numRetry-1)) {
00555                   strncpy0(exception->errorCode, "resource.db.block", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00556                   SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00557                            "[%.100s:%d] SQL error #%d in %s(): %s %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
00558                }
00559                LOG __FILE__, "%s() Sleeping as other thread holds DB %s", methodName, (errMsg==0)?"":errMsg);
00560                if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
00561                sleepMillis(10);
00562                break;
00563             case SQLITE_OK:
00564                iRetry = numRetry; /* We're done */
00565                LOG __FILE__, "%s() Pre-compiled prepared query '%s'", methodName, queryString);
00566                if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
00567                break;
00568             default:
00569                LOG __FILE__, "SQL error #%d %s in %s(): %s: %s", rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
00570                strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00571                SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00572                         "[%.100s:%d] SQL error #%d %s in %s(): %s", __FILE__, __LINE__, rc, sqlite_error_string(rc), methodName, (errMsg==0)?"":errMsg);
00573                iRetry = numRetry; /* We're done */
00574                if (errMsg != 0) { sqlite_freemem(errMsg); errMsg = 0; }
00575                stateOk = false;
00576                break;
00577          }
00578       }
00579    }
00580    if (*ppVm == 0) stateOk = false;
00581    return stateOk;
00582 }
00583 
00596 static bool parseQueueEntryArr(I_Queue *queueP, size_t currIndex, void *userP,
00597                                const char **pazValue, const char **pazColName, ExceptionStruct *exception)
00598 {
00599    bool doContinue = true;
00600    int numAssigned;
00601    bool stateOk = true;
00602    int decodeSize = 0;
00603    QueueEntry *queueEntry = 0;
00604    QueueEntryArr *queueEntryArr;
00605    TmpHelper *helper = (TmpHelper*)userP;
00606    QueueEntryArr **queueEntryArrPP = helper->queueEntryArrPP;
00607 
00608    if (currIndex == 0) {
00609       helper->currEntries = 0;
00610       helper->currBytes = 0;
00611    }
00612 
00613    if (*queueEntryArrPP == 0) {
00614       *queueEntryArrPP = (QueueEntryArr *)calloc(1, sizeof(QueueEntryArr));;
00615       if (helper->maxNumOfEntries == 0) {
00616          doContinue = false;
00617          return doContinue;
00618       }
00619    }
00620    queueEntryArr = *queueEntryArrPP;
00621 
00622    if (queueEntryArr->len == 0) {
00623       queueEntryArr->len = 10;
00624       queueEntryArr->queueEntryArr = (QueueEntry *)calloc(queueEntryArr->len, sizeof(QueueEntry));
00625    }
00626    else if (currIndex >= queueEntryArr->len) {
00627       queueEntryArr->len += 10;
00628       queueEntryArr->queueEntryArr = (QueueEntry *)realloc(queueEntryArr->queueEntryArr, queueEntryArr->len * sizeof(QueueEntry));
00629    }
00630    queueEntry = &queueEntryArr->queueEntryArr[currIndex];
00631    memset(queueEntry, 0, sizeof(QueueEntry));
00632 
00633    stateOk = strToInt64(&queueEntry->uniqueId, pazValue[XB_ENTRIES_DATA_ID]);
00634    if (!stateOk) {
00635       LOG __FILE__, "peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' to uniqueId, ignoring entry.", pazValue[XB_ENTRIES_DATA_ID]);
00636       strncpy0(exception->errorCode, "resource.db.unknown", EXCEPTIONSTRUCT_ERRORCODE_LEN);
00637       SNPRINTF(exception->message, EXCEPTIONSTRUCT_MESSAGE_LEN,
00638                "[%.100s:%d] peekWithSamePriority() ERROR: Can't parse pazValue[0] '%.20s' col=%s to uniqueId, ignoring entry.", __FILE__, __LINE__, pazValue[XB_ENTRIES_DATA_ID], pazColName[XB_ENTRIES_DATA_ID]);
00639       doContinue = false;
00640       return doContinue;
00641    }
00642 
00643    LOG __FILE__, "peekWithSamePriority(%s) currIndex=%d", int64ToStr(int64Str, queueEntry->uniqueId), currIndex);
00644    /* strncpy0(dbInfo->prop.queueName, pazValue[2], ID_MAX); */
00645    numAssigned = sscanf(pazValue[XB_ENTRIES_PRIO], "%hd", &queueEntry->priority);
00646    if (numAssigned != 1) {
00647       LOG __FILE__, "peekWithSamePriority(%s) ERROR: Can't parse pazValue[XB_ENTRIES_PRIO] '%.20s' to priority, setting it to NORM", int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_PRIO]);
00648       queueEntry->priority = 4;
00649    }
00650    strncpy0(queueEntry->embeddedType, pazValue[XB_ENTRIES_TYPE_NAME], QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
00651    queueEntry->isPersistent = *pazValue[XB_ENTRIES_PERSISTENT] == 'T' ? true : false;
00652    {
00653       int64_t ival = 0;
00654       stateOk = strToInt64(&ival, pazValue[XB_ENTRIES_SIZE_IN_BYTES]);
00655       queueEntry->embeddedBlob.dataLen = (size_t)ival;
00656    }
00657 
00658    /* TODO!!! in Java the length is the size in RAM and not strlen(data) */
00659    /* queueEntry->embeddedBlob.data = (char *)malloc(queueEntry->embeddedBlob.dataLen*sizeof(char)); */
00660    queueEntry->embeddedBlob.data = (char *)malloc(strlen(pazValue[XB_ENTRIES_BLOB])*sizeof(char)); /* we spoil some 2 % */
00661    decodeSize = sqlite_decode_binary((const unsigned char *)pazValue[XB_ENTRIES_BLOB], (unsigned char *)queueEntry->embeddedBlob.data);
00662    if (decodeSize == -1 || (size_t)decodeSize != queueEntry->embeddedBlob.dataLen) {
00663       *(queueEntry->embeddedBlob.data + strlen(pazValue[XB_ENTRIES_BLOB]) - 1) = 0; 
00664       LOG __FILE__, "peekWithSamePriority(%s) ERROR: Returned blob encoded='%s', decodeSize=%d"
00665                         " but expected decoded len=%d: '%s'",
00666                     int64ToStr(int64Str, queueEntry->uniqueId), pazValue[XB_ENTRIES_BLOB], decodeSize,
00667                     queueEntry->embeddedBlob.dataLen, queueEntry->embeddedBlob.data);
00668    }
00669 
00670    helper->currEntries += 1;
00671    helper->currBytes += queueEntry->embeddedBlob.dataLen;
00672 
00673    /* Limit the number of entries */
00674    if ((helper->maxNumOfEntries != -1 && helper->currEntries >= helper->maxNumOfEntries) ||
00675        (helper->maxNumOfBytes != -1 && helper->currBytes >= helper->maxNumOfBytes)) {
00676       /* sqlite_interrupt(dbInfo->db); -> sets rc==SQLITE_ERROR on next sqlite-step() which i can't distinguish from a real error */
00677       doContinue = false;
00678    }
00679 
00680    return doContinue;
00681 }
00682 
00699 static int32_t getResultRows(I_Queue *queueP, const char *methodName,
00700                              sqlite_vm *pVm, 
00701                              ParseDataFp parseDataFp, void *userP,
00702                              bool finalize,
00703                              ExceptionStruct *exception)
00704 {
00705    char *errMsg = 0;
00706    int32_t currIndex = 0;
00707    int numCol = 0;
00708    const char **pazValue = 0;
00709    const char **pazColName = 0;
00710    bool done = false;
00711    bool stateOk = true;
00712    int rc;
00713    while (!done) {
00714       rc = sqlite_step(pVm, &numCol, &pazValue, &pazColName);
00715       switch( rc ){
00716          case SQLITE_DONE:
00717             done = true;
00718          break;
00719          case SQLITE_BUSY:
00720             LOG __FILE__, "%s() Sleeping as other thread holds DB.", methodName);
00721             sleepMillis(10);
00722          break;
00723          case SQLITE_ROW:
00724          {
00725             bool doContinue = true;
00726             if (parseDataFp) {
00727                /* @return true->to continue, false->to break execution or on error exception->errorCode is not null */
00728                doContinue = parseDataFp(queueP, currIndex, userP, pazValue, pazColName, exception);
00729                stateOk = *exception->errorCode == 0;
00730             }
00731             else {
00732                /*
00733                printf("RESULT[%d]\n", iRow);
00734                for (iCol = 0; iCol < numCol; iCol++) {
00735                   printf("%10.10s = %s\n", pazColName[iCol], pazValue[iCol]);
00736                }
00737                */
00738             }
00739             currIndex++;
00740             if (!stateOk || !doContinue) done = true;
00741          }
00742          break;
00743          case SQLITE_ERROR:   /* If exists already */
00744             LOG __FILE__, "%s() SQL execution problem [sqlCode=%d], entry already exists", methodName, rc);
00745             done = true;
00746             stateOk = false;
00747          break;
00748          case SQLITE_MISUSE:
00749          default:
00750             LOG __FILE__, "%s() SQL execution problem [sqlCode=%d %s]", methodName, rc, sqlite_error_string(rc));
00751             done = true;
00752             stateOk = false;
00753          break;
00754       }
00755    }
00756    LOG __FILE__, "%s() Processed %lu entries.", methodName, (unsigned long)currIndex);
00757 
00758    if (finalize) {
00759       sqlite_finalize(pVm, &errMsg);
00760       if (rc != SQLITE_OK && rc != SQLITE_DONE) {
00761          LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled. %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg);
00762       }
00763       if (errMsg != 0) sqlite_freemem(errMsg);
00764    }
00765    else { /* Reset prepared statement */
00766       rc = sqlite_reset(pVm, &errMsg);
00767       if (rc == SQLITE_SCHEMA) {
00768          LOG __FILE__, "WARN: getResultRows() sqlCode=%d %s is not handled %s", rc, sqlite_error_string(rc), errMsg==0?"":errMsg);
00769       }
00770       if (errMsg != 0) sqlite_freemem(errMsg);
00771    }
00772 
00773    return stateOk ? currIndex : (-1)*rc;
00774 }
00775 
00779 static QueueEntryArr *persistentQueuePeekWithSamePriority(