socket/XmlBlasterAccessUnparsed.c

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002 Name:      XmlBlasterAccessUnparsed.c
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Wraps raw socket connection to xmlBlaster
00006            Implements sync connection and async callback
00007            Needs pthread to compile (multi threading).
00008 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
00009 Compile:
00010   LINUX:   gcc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -export-dynamic -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
00011            g++ -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
00012            icc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
00013   WIN:     cl /MT /W4 -DXmlBlasterAccessUnparsedMain -D_WINDOWS -I.. -I../pthreads /FeXmlBlasterAccessUnparsedMain.exe  XmlBlasterAccessUnparsed.c ..\util\msgUtil.c ..\util\Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c ws2_32.lib pthreadVC2.lib
00014            (download pthread for Windows and WinCE from http://sources.redhat.com/pthreads-win32)
00015   Solaris: cc  -DXmlBlasterAccessUnparsedMain -v -Xc -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
00016            CC  -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
00017 
00018   Linux with libxmlBlasterC.so:
00019            gcc -DXmlBlasterAccessUnparsedMain -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c  -L../../../lib -lxmlBlasterClientC -I.. -Wl,-rpath=../../../lib -D_REENTRANT  -lpthread
00020 See:       http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
00021 -----------------------------------------------------------------------------*/
00022 #include <stdio.h>
00023 #include <stdlib.h>
00024 #include <string.h>
00025 #if defined(WINCE)
00026 #  if defined(XB_USE_PTHREADS)
00027 #     include <pthreads/pthread.h>
00028 #  else
00029       /*#include <pthreads/need_errno.h> */
00030       static int errno=0; /* single threaded workaround*/
00031 #  endif
00032 #else
00033 #  include <errno.h>
00034 #  include <sys/types.h>
00035 #endif
00036 #include <socket/xmlBlasterSocket.h>
00037 #include <socket/xmlBlasterZlib.h>
00038 #include <XmlBlasterAccessUnparsed.h>
00039 
00043 typedef struct Dll_Export UpdateContainer {
00044    XmlBlasterAccessUnparsed *xa;
00045    MsgUnitArr *msgUnitArrP;
00046    void *userData;
00047    XmlBlasterException exception;     /* Holding a clone from the original as the callback thread may use it for another message */
00048    SocketDataHolder socketDataHolder; /* Holding a clone from the original */
00049 } UpdateContainer;
00050 
00051 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp update, XmlBlasterException *exception);
00052 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos, UpdateFp update, XmlBlasterException *exception);
00053 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
00054 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception);
00055 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
00056 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
00057 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
00058 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
00059 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
00060 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
00061 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
00062 static bool isConnected(XmlBlasterAccessUnparsed *xa);
00063 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder);
00064 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
00065 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
00066 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName, bool checkIsConnected, XmlBlasterException *exception);
00067 static void interceptUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException, void/*SocketDataHolder*/ *socketDataHolder);
00068 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception);
00069 static ssize_t writenPlain(void *xa, const int fd, const char *ptr, const size_t nbytes);
00070 static ssize_t writenCompressed(void *xa, const int fd, const char *ptr, const size_t nbytes);
00071 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
00072 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
00073 
00074 Dll_Export XmlBlasterAccessUnparsed *getXmlBlasterAccessUnparsed(int argc, const char* const* argv) {
00075    XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)calloc(1, sizeof(XmlBlasterAccessUnparsed));
00076    if (xa == 0) return xa;
00077    xa->argc = argc;
00078    xa->argv = argv;
00079    xa->props = createProperties(xa->argc, xa->argv);
00080    if (xa->props == 0) {
00081       freeXmlBlasterAccessUnparsed(xa);
00082       return (XmlBlasterAccessUnparsed *)0;
00083    }
00084    xa->isInitialized = false;
00085    xa->isShutdown = false;
00086    xa->connectionP = 0;
00087    xa->callbackP = 0;
00088    xa->userObject = 0; /* A client can use this pointer to point to any client specific information */
00089    xa->userFp = 0;
00090    xa->connect = xmlBlasterConnect;
00091    xa->initialize = initialize;
00092    xa->disconnect = xmlBlasterDisconnect;
00093    xa->publish = xmlBlasterPublish;
00094    xa->publishArr = xmlBlasterPublishArr;
00095    xa->publishOneway = xmlBlasterPublishOneway;
00096    xa->subscribe = xmlBlasterSubscribe;
00097    xa->unSubscribe = xmlBlasterUnSubscribe;
00098    xa->erase = xmlBlasterErase;
00099    xa->get = xmlBlasterGet;
00100    xa->ping = xmlBlasterPing;
00101    xa->isConnected = isConnected;
00102    xa->logLevel = parseLogLevel(xa->props->getString(xa->props, "logLevel", "WARN"));
00103    xa->log = xmlBlasterDefaultLogging;
00104    xa->logUserP = 0;
00105    xa->clientsUpdateFp = 0;
00106    xa->callbackMultiThreaded = xa->props->getBool(xa->props, "plugin/socket/multiThreaded", true);
00107    xa->callbackMultiThreaded = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/multiThreaded", xa->callbackMultiThreaded);
00108    /*   xa->lowLevelAutoAck = xa->props->getBool(xa->props, "plugin/socket/lowLevelAutoAck", false); */
00109    /*   xa->lowLevelAutoAck = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/lowLevelAutoAck", xa->lowLevelAutoAck); */
00110    /* Currently forced to false: needs mutex and reference counter to not freeMsgUnitArr twice */
00111    xa->lowLevelAutoAck = false;
00112 
00113    /* We shouldn't do much logging here, as the caller had no chance to redirect it up to now */
00114    if (xa->callbackMultiThreaded == true) {
00115       if (xa->logLevel>=XMLBLASTER_LOG_DUMP) 
00116          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__, "Multi threaded callback delivery is activated with -plugin/socket/multiThreaded true");
00117       /*xa->callbackMultiThreaded = false;*/
00118    }
00119    /* stdint.h: # define INT32_MAX              (2147483647) */
00120    xa->responseTimeout = xa->props->getLong(xa->props, "plugin/socket/responseTimeout", 2147483647L); /* Before xmlBlaster 1.1: One minute (given in millis) */
00121    xa->responseTimeout = xa->props->getLong(xa->props, "dispatch/connection/plugin/socket/responseTimeout", xa->responseTimeout);
00122    /* ERROR HANDLING ? xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Your configuration '-plugin/socket/responseTimeout %s' is invalid", argv[iarg]); */
00123    memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
00124    xa->threadCounter = 0;
00125 
00126    if (xa->logLevel>=XMLBLASTER_LOG_DUMP) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__,
00127                                 "Created handle: -logLevel=%s -plugin/socket/responseTimeout=%ld",
00128                                 getLogLevelStr(xa->logLevel), xa->responseTimeout);
00129 
00130    /* See: http://www.llnl.gov/computing/tutorials/workshops/workshop/pthreads/MAIN.html */
00131    pthread_mutex_init(&xa->writenMutex, NULL); /* returns always 0 */
00132    pthread_mutex_init(&xa->readnMutex, NULL);
00133    return xa;
00134 }
00135 
00136 Dll_Export void freeXmlBlasterAccessUnparsed(XmlBlasterAccessUnparsed *xa)
00137 {
00138    int rc;
00139 
00140    if (xa == 0) {
00141       char *stack = getStackTrace(10);
00142       printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to freeXmlBlasterAccessUnparsed() %s",
00143                 __FILE__, __LINE__, stack);
00144       free(stack);
00145       return;
00146    }
00147 
00148    if (xa->isShutdown) return; /* Avoid simultaneous multiple calls */
00149    xa->isShutdown = true;      /* Inhibit access to xa */
00150 
00151    if (xa->callbackP != 0) {
00152       xa->callbackP->shutdown(xa->callbackP);
00153    }
00154    if (xa->connectionP != 0) {
00155       xa->connectionP->shutdown(xa->connectionP);
00156    }
00157 
00158    if (xa->callbackP != 0) {
00159       /* Detach or join? On Linux both work fine. On Windows it blocks sometimes forever during join */
00160       const bool USE_DETACH_MODE = xa->props->getBool(xa->props, "plugin/socket/detachCbThread", true);
00161       int retVal;
00162       if (!xa->callbackP->isShutdown) {
00163 
00164          {  /* Wait for any pending update() dispatcher threads to die */
00165             int i;
00166             int num = 200;
00167             int interval = 10;
00168             for (i=0; i<num; i++) {
00169                if (xa->callbackP->isShutdown)
00170                   break;
00171                /*pthread_yield(0);*/
00172                sleepMillis(interval);
00173                if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00174                    "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for callback thread to join. %d/%d", interval, i, num);
00175             }
00176             if (i == num) {
00177                xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Proper shutdown of callback thread failed, it seems to block on the socket");
00178             }
00179          }
00180 
00181          if (!USE_DETACH_MODE) {
00182             /* pthread_cancel() does not block. Who cleans up open resources? TODO: pthread_cleanup_push() */
00183             /* On Linux all works fine without pthread_cancel() but on Windows the later pthread_join() sometimes hangs without a pthread_cancel() */
00184             /*
00185             retVal = pthread_cancel(xa->callbackThreadId);
00186             if (retVal != 0) {
00187                xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cancel problem return value is %d", retVal);
00188             }
00189             */
00190          }
00191       }
00192 
00193       if (USE_DETACH_MODE) {
00194          retVal = pthread_detach(xa->callbackThreadId); /* Frees resources (even if thread has died already), don't call multiple times on same thread! */
00195          if (retVal != 0) {
00196             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching callback thread 0x%x failed with error number %d", __LINE__, get_pthread_id(xa->callbackThreadId), retVal);
00197          }
00198          else {
00199             if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00200                                           "pthread_detach(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
00201          }
00202       }
00203       else { /* JOIN mode */
00204          retVal = pthread_join(xa->callbackThreadId, 0);
00205          if (retVal != 0) {
00206             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_join problem return value is %d", retVal);
00207          }
00208          else {
00209             if (xa->logLevel>=XMLBLASTER_LOG_INFO) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00210                                           "pthread_join(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
00211          }
00212       }
00213 
00214       memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
00215    }
00216 
00217    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterAccessUnparsed() conP=0x%x cbP=0x%x", xa->connectionP, xa->callbackP);
00218 
00219    {  /* Wait for any pending update() dispatcher threads to die */
00220       int i;
00221       int num = 1000;
00222       int interval = 10;
00223       for (i=0; i<num; i++) {
00224          if ((int)xa->threadCounter < 1)
00225             break;
00226          sleepMillis(interval);
00227          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00228              "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for update thread to join. %d/%d", interval, i, num);
00229       }
00230       if (i >= num) {
00231          if (xa->logLevel>=XMLBLASTER_LOG_ERROR) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00232              "freeXmlBlasterAccessUnparsed(): There are active callback threads in user code which didn't return after sleeping for %ld millis, we continue now to shutdown ...", (long)interval*num);
00233       }
00234    }
00235 
00236    if (xa->connectionP != 0) {
00237       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
00238    }
00239 
00240    if (xa->callbackP != 0) {
00241       freeCallbackServerUnparsed(&xa->callbackP);
00242    }
00243 
00244    freeProperties(xa->props);
00245 
00246    rc = pthread_mutex_destroy(&xa->writenMutex); /* On Linux this does nothing, but returns an error code EBUSY if the mutex was locked */
00247    if (rc != 0) /* EBUSY */
00248       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(writenMutex) returned %d, we ignore it", rc);
00249 
00250    rc = pthread_mutex_destroy(&xa->readnMutex);
00251    if (rc != 0) /* EBUSY */
00252       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(readnMutex) returned %d, we ignore it", rc);
00253 
00254    free(xa);
00255 }
00256 
00257 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp clientUpdateFp, XmlBlasterException *exception)
00258 {
00259    int threadRet = 0;
00260    const char *compressType = 0;
00261 
00262    if (checkArgs(xa, "initialize", false, exception) == false) return false;
00263 
00264    if (xa->isInitialized) {
00265       return true;
00266    }
00267 
00268    if (clientUpdateFp == 0) {
00269       xa->clientsUpdateFp = 0;
00270       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, "",
00271         "Your callback UpdateFp pointer is NULL, we use our default callback handler");
00272    }
00273    else {
00274       xa->clientsUpdateFp = clientUpdateFp;
00275    }
00276 
00277    if (xa->connectionP) {
00278       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
00279    }
00280    xa->connectionP = getXmlBlasterConnectionUnparsed(xa->argc, xa->argv);
00281    if (xa->connectionP == 0) {
00282       strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00283       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00284                "[%.100s:%d] Creating XmlBlasterConnectionUnparsed failed", __FILE__, __LINE__);
00285       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00286       return false;
00287    }
00288    xa->connectionP->log = xa->log;
00289    xa->connectionP->logUserP = xa->logUserP;
00290    xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Created XmlBlasterConnectionUnparsed");
00291 
00292 
00293    /* Switch on compression? */
00294    compressType = xa->props->getString(xa->props, "plugin/socket/compress/type", "");
00295    compressType = xa->props->getString(xa->props, "dispatch/connection/plugin/socket/compress/type", compressType);
00296          
00297    if (!strcmp(compressType, "zlib:stream")) {
00298       xa->connectionP->writeToSocket.writeToSocketFuncP = writenCompressed;
00299       xa->connectionP->writeToSocket.userP = xa;
00300       xa->connectionP->readFromSocket.readFromSocketFuncP = readnCompressed;
00301       xa->connectionP->readFromSocket.userP = xa;
00302    }
00303    else {
00304       if (strcmp(compressType, "")) {
00305          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode", compressType);
00306       }
00307       xa->connectionP->writeToSocket.writeToSocketFuncP = writenPlain;
00308       xa->connectionP->writeToSocket.userP = xa;
00309       xa->connectionP->readFromSocket.readFromSocketFuncP = readnPlain;
00310       xa->connectionP->readFromSocket.userP = xa;
00311    }
00312 
00313    if (xa->connectionP->initConnection(xa->connectionP, exception) == false) /* Establish low level IP connection */
00314       return false;
00315 
00316    /* the fourth arg 'xa' is returned as 'void *userData' in update() method */
00317    if (xa->callbackP != 0) {
00318       freeCallbackServerUnparsed(&xa->callbackP);
00319    }
00320    xa->callbackP = getCallbackServerUnparsed(xa->argc, xa->argv, interceptUpdate, xa);
00321    if (xa->callbackP == 0) {
00322       strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00323       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00324                "[%.100s:%d] Creating CallbackServerUnparsed failed", __FILE__, __LINE__);
00325       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00326       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
00327       return false;
00328    }
00329    xa->callbackP->log = xa->log;
00330    xa->callbackP->logUserP = xa->logUserP;
00331 
00332    if (!strcmp(compressType, "zlib:stream")) {
00333       xa->callbackP->writeToSocket.writeToSocketFuncP = writenCompressed;
00334       xa->callbackP->writeToSocket.userP = xa;
00335       xa->callbackP->readFromSocket.readFromSocketFuncP = readnCompressed;
00336       xa->callbackP->readFromSocket.userP = xa;
00337    }
00338    else {
00339       xa->callbackP->writeToSocket.writeToSocketFuncP = writenPlain;
00340       xa->callbackP->writeToSocket.userP = xa;
00341       xa->callbackP->readFromSocket.readFromSocketFuncP = readnPlain;
00342       xa->callbackP->readFromSocket.userP = xa;
00343    }
00344 
00345    xa->callbackP->useThisSocket(xa->callbackP, xa->connectionP->socketToXmlBlaster, xa->connectionP->socketToXmlBlasterUdp);
00346 
00347    xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00348           "Created CallbackServerUnparsed instance, creating on a separate thread a listener on socket://%s:%d...",
00349           (xa->callbackP->hostCB == 0) ? "" : xa->callbackP->hostCB, xa->callbackP->portCB);
00350 
00351    /* Register our callback funtion which is called just before sending a message */
00352    xa->connectionP->preSendEvent = preSendEvent;
00353    xa->connectionP->preSendEvent_userP = xa;
00354 
00355    /* Register our callback funtion which is called just after sending a message */
00356    xa->connectionP->postSendEvent = postSendEvent;
00357    xa->connectionP->postSendEvent_userP = xa;
00358 
00359    /* thread blocks on socket listener */
00360    threadRet = pthread_create(&xa->callbackThreadId, (const pthread_attr_t *)0, (void * (*)(void *))xa->callbackP->runCallbackServer, (void *)xa->callbackP);
00361    if (threadRet != 0) {
00362       strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00363       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00364                "[%.100s:%d] Creating thread failed with error number %d",
00365                __FILE__, __LINE__, threadRet);
00366       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00367       freeCallbackServerUnparsed(&xa->callbackP);
00368       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
00369       return false;
00370    }
00371 
00372    xa->isInitialized = true;
00373    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00374                                 "initialize() successful");
00375    return xa->isInitialized;
00376 }
00377 
00378 static bool isConnected(XmlBlasterAccessUnparsed *xa)
00379 {
00380    if (xa == 0 || xa->isShutdown || xa->connectionP == 0) {
00381       return false;
00382    }
00383    return xa->connectionP->isConnected(xa->connectionP);
00384 }
00385 
00396 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
00397 {
00398    bool retVal;
00399    XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
00400 
00401    /* if (!strcmp(XMLBLASTER_PUBLISH_ONEWAY, msgRequestInfoP->methodName)) */
00402    if (xbl_isOneway(MSG_TYPE_INVOKE, msgRequestInfoP->methodName))
00403       return msgRequestInfoP;
00404 
00405    /* ======== Initialize threading ====== */
00406    msgRequestInfoP->responseMutexIsLocked = false; /* Only to remember if the client thread holds the lock */
00407 
00408    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00409                                 "preSendEvent(%s) occurred", msgRequestInfoP->methodName);
00410    retVal = xa->callbackP->addResponseListener(xa->callbackP, msgRequestInfoP, responseEvent);
00411    if (retVal == false) {
00412       strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00413       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00414                "[%.100s:%d] Couldn't register as response listener", __FILE__, __LINE__);
00415       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00416       return (MsgRequestInfo *)0;
00417    }
00418 
00419    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00420                   "preSendEvent(requestId=%s, msgRequestInfoP->responseBlob.dataLen=%d), entering lock",
00421                   msgRequestInfoP->requestIdStr, msgRequestInfoP->responseBlob.dataLen);
00422    pthread_mutex_init(&msgRequestInfoP->responseMutex, NULL); /* returns always 0 */
00423    if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
00424       strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00425       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00426                "[%.100s:%d] Error trying to lock responseMutex %d", __FILE__, __LINE__, retVal);
00427       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00428       return (MsgRequestInfo *)0;
00429    }
00430    msgRequestInfoP->responseMutexIsLocked = true; /* Only if the client thread holds the lock */
00431 
00432    return msgRequestInfoP;
00433 }
00434 
00443 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder) {
00444    int retVal;
00445    SocketDataHolder *s = (SocketDataHolder *)socketDataHolder;
00446    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
00447 
00448    if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
00449       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to lock responseMutex in responseEvent() failed %d", retVal);
00450       /* return; */
00451    }
00452    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is LOCKED");
00453 
00454    blobcpyAlloc(&msgRequestInfoP->responseBlob, s->blob.data, s->blob.dataLen);
00455    msgRequestInfoP->responseType = s->type;
00456 
00457    if ((retVal = pthread_cond_signal(&msgRequestInfoP->responseCond)) != 0) {
00458       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d", retVal);
00459       /* return; */
00460    }
00461 
00462    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00463                                 "responseEvent(requestId '%s', msgType=%c, dataLen=%d) occurred, wake up signal sent",
00464                                 s->requestId, msgRequestInfoP->responseType, msgRequestInfoP->responseBlob.dataLen);
00465 
00466    if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) {
00467       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to unlock responseMutex in responseEvent() failed %d", retVal);
00468       /* return; */
00469    }
00470    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is UNLOCKED");
00471 }
00472 
00480 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
00481 {
00482    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
00483    struct timespec abstime;
00484    bool useTimeout = false;
00485    int retVal;
00486 
00487    if (msgRequestInfoP->rollback) {
00488       mutexUnlock(msgRequestInfoP, exception);
00489       return (MsgRequestInfo *)0;
00490    }
00491 
00492    if (xa->responseTimeout > 0 && getAbsoluteTime(xa->responseTimeout, &abstime) == true) {
00493       useTimeout = true;
00494    }
00495 
00496    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) responseMutex is LOCKED, entering wait ...", msgRequestInfoP->requestIdStr);
00497    
00498    if ((retVal = pthread_cond_init(&msgRequestInfoP->responseCond, NULL)) != 0) {
00499       xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
00500       strncpy0(exception->errorCode, "resource.exhaust", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */
00501       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] pthread_cond_init() for '%s()' with requestId=%s returned %d.",
00502                __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
00503       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00504       return (MsgRequestInfo *)0;
00505    }
00506 
00507    /* Wait for response, the callback server delivers it */
00508    while (msgRequestInfoP->responseType == 0) { /* Protect for spurious wake ups (e.g. by SIGUSR1) */
00509       if (useTimeout == true) {
00510          int error = pthread_cond_timedwait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex, &abstime);
00511          if (error == ETIMEDOUT) {
00512             xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
00513             strncpy0(exception->errorCode, "communication.responseTimeout", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */
00514             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Waiting on response for '%s()' with requestId=%s timed out after blocking %ld millis",
00515                     __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, xa->responseTimeout);
00516             if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00517             return (MsgRequestInfo *)0;
00518          }
00519       }
00520       else {
00521          pthread_cond_wait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex); /* Wakes up from responseEvent() */
00522          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00523             "Wake up tread, response of length %d arrived", msgRequestInfoP->responseBlob.dataLen);
00524       }
00525    }
00526 
00527    if ((retVal = pthread_cond_destroy(&msgRequestInfoP->responseCond)) != 0) {
00528       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cond_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
00529                  msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
00530    }
00531 
00532    msgRequestInfoP->blob.dataLen = msgRequestInfoP->responseBlob.dataLen;
00533    msgRequestInfoP->blob.data = msgRequestInfoP->responseBlob.data;
00534    msgRequestInfoP->responseBlob.dataLen = 0;
00535    msgRequestInfoP->responseBlob.data = 0; /* msgRequestInfoP->blob.data is now responsible to free() the data */
00536 
00537    if (xa->logLevel>=XMLBLASTER_LOG_TRACE)
00538       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00539          "Thread #%ld woke up in postSendEvent() for msgType=%c and dataLen=%d",
00540          msgRequestInfoP->requestIdStr, msgRequestInfoP->responseType, msgRequestInfoP->blob.dataLen);
00541 
00542 
00543    if (msgRequestInfoP->responseType == (char)MSG_TYPE_EXCEPTION) {
00544       convertToXmlBlasterException(&msgRequestInfoP->blob, exception, false);
00545       freeBlobHolderContent(&msgRequestInfoP->blob);
00546       msgRequestInfoP->responseType = 0;
00547       return (MsgRequestInfo *)0;
00548    }
00549 
00550    msgRequestInfoP->responseType = 0;
00551    
00552    /* if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) i woke up, entering unlock ...", msgRequestInfoP->requestIdStr); */
00553    if (mutexUnlock(msgRequestInfoP, exception) == false)
00554       return (MsgRequestInfo *)0;
00555 
00556    return msgRequestInfoP;
00557 }
00558 
00565 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception) {
00566    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
00567    int retVal;
00568    if (msgRequestInfoP->responseMutexIsLocked == false)
00569       return true;
00570    msgRequestInfoP->responseMutexIsLocked = false;
00571    if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) {
00572       char embeddedText[XMLBLASTEREXCEPTION_MESSAGE_LEN];
00573       if (exception == 0) {
00574          if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
00575             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
00576                        msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
00577          }
00578          return false;
00579       }
00580       if (*exception->errorCode != 0) {
00581          SNPRINTF(embeddedText, XMLBLASTEREXCEPTION_MESSAGE_LEN, "{%s:%s}", exception->errorCode, exception->message);
00582          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Ignoring embedded exception %s: %s", exception->errorCode, exception->message);
00583       }
00584       else
00585          *embeddedText = 0;
00586       strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00587       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] ERROR trying to unlock responseMutex, return=%d. Embedded %s", __FILE__, __LINE__, retVal, embeddedText);
00588       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00589 
00590       if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
00591          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
00592                     msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
00593       }
00594       return false;
00595    }
00596    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent() responseMutex is UNLOCKED");
00597 
00598    if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
00599       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
00600                  msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
00601    }
00602    return true;
00603 }
00604 
00605 Dll_Export const char *xmlBlasterAccessUnparsedUsage(char *usage)
00606 {
00607    /* take care not to exceed XMLBLASTER_MAX_USAGE_LEN */
00608    SNPRINTF(usage, XMLBLASTER_MAX_USAGE_LEN, "%.950s%.950s%s", xmlBlasterConnectionUnparsedUsage(), callbackServerRawUsage(),
00609                   "\n   -plugin/socket/multiThreaded  [true]"
00610                   "\n                       If true the update() call to your client code is a separate thread."
00611                   "\n   -plugin/socket/responseTimeout  [60000 (one minute)]"
00612                   "\n                       The time in millis to wait on a response, 0 is forever."
00613                   "\n   -logLevel           ERROR | WARN | INFO | TRACE | DUMP [WARN]"
00614                   );
00615    
00616    return usage;
00617 }
00618 
00619 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos,
00620                                UpdateFp clientUpdateFp, XmlBlasterException *exception)
00621 {
00622    char *response = 0;
00623    char *qos_;
00624 
00625    if (checkArgs(xa, "connect", false, exception) == false) return 0;
00626 
00627    /* Is allowed, we use our default handler in this case
00628    if (clientUpdateFp == 0) {
00629       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00630       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'updateFp' to connect()", __FILE__, __LINE__);
00631       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00632       return false;
00633    }
00634    */
00635 
00636    if (qos == 0) {
00637       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00638       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'qos' to connect()", __FILE__, __LINE__);
00639       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00640       return false;
00641    }
00642 
00643    if (initialize(xa, clientUpdateFp, exception) == false) {
00644       return false;
00645    }
00646    
00647    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Invoking connect()");
00648 
00649    if (strstr(qos, "<callback") != 0) {
00650       /* User has given us a callback address */
00651       qos_ = strcpyAlloc(qos);
00652    }
00653    else {
00654       /* We add the callback sequence with our tunnel callback host and port
00655          HACK: This is error prone depending on the given qos */
00656       const char *pos;
00657       enum { SIZE=1024 };
00658       char callbackQos[SIZE];
00659       snprintf0(callbackQos, SIZE,
00660                "<queue relating='callback'>" /* maxEntries='100' maxEntriesCache='100'>" */
00661                "  <callback type='SOCKET' sessionId='%s'>"
00662                "    socket://%.120s:%d"
00663                "  </callback>"
00664                "</queue>",
00665                "NoCallbackSessionId", xa->callbackP->hostCB, xa->callbackP->portCB);
00666       qos_ = (char *)calloc(strlen(qos) + SIZE, sizeof(char *));
00667       pos = strstr(qos, "</qos>");
00668       if (pos == 0) {
00669          strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00670          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid 'qos' markup to connect()", __FILE__, __LINE__);
00671          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00672          return false;
00673       }
00674       strncpy0(qos_, qos, pos-qos+1);
00675       strncat0(qos_, callbackQos, SIZE);
00676       strncat0(qos_, "</qos>", 8);
00677    }
00678    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connecting with qos=%s", qos_);
00679 
00680    /* Register our function responseEvent() to be notified when the response arrives,
00681       this is done by preSendEvent() callback called during connect() */
00682 
00683    response = xa->connectionP->connect(xa->connectionP, qos_, exception);
00684 
00685    free(qos_);
00686    /* freeBlobHolderContent(&xa->responseBlob); */
00687 
00688    /* The response was handled by a callback to postSendEvent */
00689 
00690    if (response == 0) return response;
00691 
00692    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00693       "Got response for connect(secretSessionId=%s)", xa->connectionP->secretSessionId);
00694    return response;
00695 }
00696 
00697 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception)
00698 {
00699    bool p;
00700    if (checkArgs(xa, "disconnect", true, exception) == false ) return 0;
00701    p = xa->connectionP->disconnect(xa->connectionP, qos, exception);
00702    return p;
00703 }
00704 
00711 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception)
00712 {
00713    char *p;
00714    if (checkArgs(xa, "publish", true, exception) == false ) return 0;
00715    p = xa->connectionP->publish(xa->connectionP, msgUnit, exception);
00716    return p;
00717 }
00718 
00725 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
00726 {
00727    QosArr *p;
00728    if (checkArgs(xa, "publishArr", true, exception) == false ) return 0;
00729    p = xa->connectionP->publishArr(xa->connectionP, msgUnitArr, exception);
00730    return p;
00731 }
00732 
00739 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
00740 {
00741    if (checkArgs(xa, "publishOneway", true, exception) == false ) return;
00742    xa->connectionP->publishOneway(xa->connectionP, msgUnitArr, exception);
00743 }
00744 
00750 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
00751 {
00752    char *p;
00753    if (checkArgs(xa, "subscribe", true, exception) == false ) return 0;
00754    p = xa->connectionP->subscribe(xa->connectionP, key, qos, exception);
00755    return p;
00756 }
00757 
00765 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
00766 {
00767    QosArr *p;
00768    if (checkArgs(xa, "unSubscribe", true, exception) == false ) return 0;
00769    p = xa->connectionP->unSubscribe(xa->connectionP, key, qos, exception);
00770    return p;
00771 }
00772 
00781 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
00782 {
00783    QosArr *p;
00784    if (checkArgs(xa, "erase", true, exception) == false ) return 0;
00785    p = xa->connectionP->erase(xa->connectionP, key, qos, exception);
00786    return p;
00787 }
00788 
00798 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception)
00799 {
00800    char *p;
00801    if (checkArgs(xa, "ping", true, exception) == false ) return 0;
00802    p = xa->connectionP->ping(xa->connectionP, qos, exception);
00803    return p;
00804 }
00805 
00812 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
00813 {
00814    MsgUnitArr *msgUnitArr;
00815    if (checkArgs(xa, "get", true, exception) == false ) return 0;
00816    msgUnitArr = xa->connectionP->get(xa->connectionP, key, qos, exception);
00817    return msgUnitArr;
00818 }
00819 
00820 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName,
00821             bool checkIsConnected, XmlBlasterException *exception)
00822 {
00823    if (xa == 0) {
00824       char *stack = getStackTrace(10);
00825       if (exception == 0) {
00826          printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %s() %s",
00827                   __FILE__, __LINE__, methodName, stack);
00828       }
00829       else {
00830          strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00831          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00832                   "[%.100s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %.16s() %s",
00833                    __FILE__, __LINE__, methodName, stack);
00834          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message);
00835       }
00836       free(stack);
00837       return false;
00838    }
00839 
00840    if (exception == 0) {
00841       char *stack = getStackTrace(10);
00842       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception pointer to %s() %s",
00843               __FILE__, __LINE__, methodName, stack);
00844       free(stack);
00845       return false;
00846    }
00847 
00848    if (xa->isShutdown || (checkIsConnected && !xa->isConnected(xa))) {
00849       char *stack = getStackTrace(10);
00850       strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00851       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00852                "[%.100s:%d] Not connected to xmlBlaster, %s() failed %s",
00853                 __FILE__, __LINE__, methodName, stack);
00854       free(stack);
00855       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
00856       return false;
00857    }
00858 
00859    initializeXmlBlasterException(exception);
00860 
00861    return true;
00862 }
00863 
00870 static bool runUpdate(UpdateContainer *container)
00871 {
00872    XmlBlasterAccessUnparsed *xa = container->xa;
00873    MsgUnitArr *msgUnitArrP = container->msgUnitArrP;
00874    void *userData = container->userData;
00875    CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData;
00876    XmlBlasterException *exception = &container->exception;
00877    SocketDataHolder *socketDataHolder = &container->socketDataHolder;
00878    bool retVal;
00879 
00880    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Entering runUpdate()");
00881 
00882    retVal = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
00883 
00884    if (xa->lowLevelAutoAck) { /* returned already */
00885    }
00886    else {
00887       cb->sendResponseOrException(retVal, cb, socketDataHolder, msgUnitArrP, exception);
00888    }
00889 
00890    free(container);
00891 
00892    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00893                                 "runUpdate: Update thread 0x%x is exiting", get_pthread_id(pthread_self()));
00894    xa->threadCounter--;
00895    return (retVal==true) ? 0 : 1;
00896 }
00897 
00903 static void interceptUpdate(MsgUnitArr *msgUnitArrP, void *userData,
00904                             XmlBlasterException *exception, void /*SocketDataHolder*/ *socketDataHolder)
00905 {
00906    CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData;
00907    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)cb->updateCbUserData;
00908 
00909    if (xa->clientsUpdateFp == 0) { /* Client has not registered an update() */
00910       size_t i;
00911       bool testException = false;
00912       bool success = true;
00913 
00914       for (i=0; i<msgUnitArrP->len; i++) {
00915          const char *key = msgUnitArrP->msgUnitArr[i].key;
00916          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00917              "CALLBACK update() default handler: Asynchronous message update arrived:%s id=%s, we ignore it in this default handler\n",
00918              key, ((SocketDataHolder*)socketDataHolder)->requestId);
00919          msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
00920          /* Return QoS: Everything is OK */
00921       }
00922       if (testException) {
00923          strncpy0(exception->errorCode, "user.clientCode",
00924                   XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00925          strncpy0(exception->message, "I don't want these messages",
00926                   XMLBLASTEREXCEPTION_MESSAGE_LEN);
00927          success = false;
00928       }
00929       cb->sendResponseOrException(success, cb, socketDataHolder, msgUnitArrP, exception);
00930       return;
00931    }
00932 
00933    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "interceptUpdate(): Received message");
00934 
00935    if (xa->callbackMultiThreaded == false) {
00936       bool ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
00937       cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception);
00938       return;
00939    }
00940 
00941    {
00942       pthread_t tid;
00943       int threadRet = 0;
00944       UpdateContainer *container = (UpdateContainer*)malloc(sizeof(UpdateContainer));
00945       pthread_attr_t attr;
00946 
00947       pthread_attr_init(&attr);
00948       /* Cleanup all resources after ending the thread, instead of calling pthread_join() */
00949       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00950       
00951       container->xa = xa;
00952       container->msgUnitArrP = msgUnitArrP;
00953       container->userData = userData;
00954       memcpy(&container->exception, exception, sizeof(XmlBlasterException));
00955       memcpy(&container->socketDataHolder, socketDataHolder, sizeof(SocketDataHolder)); /* The blob pointer is freed already by CallbackServerUnparsed */
00956 
00957       if (xa->lowLevelAutoAck) {
00958          size_t i;
00959          for (i=0; i<msgUnitArrP->len; i++) {
00960             msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
00961          }
00962       }
00963 
00964       /*
00965         Guaranteed sequence:
00966         The server uses max one thread to deliver update() for each client
00967         If the update contains an array of messages those are handled as a
00968         complete bulk in the correct sequence here.
00969       */
00970 
00971       /* this thread will deliver the update message to the client code,
00972          Note: we need a thread pool cache for better performance */
00973       xa->threadCounter++;
00974       threadRet = pthread_create(&tid, &attr,
00975                         (void * (*)(void *))runUpdate, (void *)container);
00976       if (threadRet != 0) {
00977          bool ret = false;
00978          free(container);
00979          strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00980          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00981                   "[%.100s:%d] Creating thread failed with error number %d, we deliver the message in the same thread",
00982                   __FILE__, __LINE__, threadRet);
00983          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message);
00984          ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
00985          cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception);
00986          xa->threadCounter--;
00987          pthread_attr_destroy(&attr);
00988          return;
00989       }
00990 
00991       /* Is done already with above PTHREAD_CREATE_DETACHED 
00992          threadRet = pthread_detach(tid);
00993          if (threadRet != 0) {
00994             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching thread failed with error number %d", __LINE__, threadRet);
00995          }
00996       */
00997 
00998       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00999          "interceptUpdate: Received message and delegated it to a separate thread 0x%x to deliver", get_pthread_id(tid));
01000 
01001       pthread_attr_destroy(&attr);
01002    }
01003 
01004    if (xa->lowLevelAutoAck) {
01005       *exception->errorCode = 0;
01006       cb->sendResponseOrException(true, cb, socketDataHolder, msgUnitArrP, exception);
01007    }
01008 }
01009 
01013 static ssize_t writenPlain(void * userP, const int fd, const char *ptr, const size_t nbytes) {
01014    int rc;
01015    ssize_t ret;
01016 
01017    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
01018 
01019    /* Start mutex */
01020    rc = pthread_mutex_lock(&xa->writenMutex);
01021    if (rc != 0) /* EINVAL */
01022       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc);
01023 
01024    /* Send data */
01025    ret = writen(fd, ptr, nbytes);
01026 
01027    /* End mutex */
01028    rc = pthread_mutex_unlock(&xa->writenMutex);
01029    if (rc != 0) /* EPERM */
01030       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc);
01031    
01032    return ret;
01033 
01034 }
01035 
01039 static ssize_t writenCompressed(void *userP, const int fd, const char *ptr, const size_t nbytes) {
01040    int rc;
01041    ssize_t ret;
01042 
01043    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
01044    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "writenCompressed(%u)", nbytes);
01045 
01046    /* Start mutex */
01047    rc = pthread_mutex_lock(&xa->writenMutex);
01048    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc);
01049 
01050    /* Send data */
01051    ret = xmlBlaster_writenCompressed(xa->connectionP->zlibWriteBuf, fd, ptr, nbytes);
01052 
01053    /* End mutex */
01054    rc = pthread_mutex_unlock(&xa->writenMutex);
01055    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc);
01056 
01057    return ret;
01058 }
01059 
01063 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
01064    int rc;
01065    ssize_t ret;
01066 
01067    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
01068 
01069    rc = pthread_mutex_lock(&xa->readnMutex);
01070    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc);
01071 
01072    ret = readn(fd, ptr, nbytes, fpNumRead, userP2);
01073 
01074    rc = pthread_mutex_unlock(&xa->readnMutex);
01075    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc);
01076    
01077    return ret;
01078 }
01079 
01083 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
01084    int rc;
01085    ssize_t ret;
01086 
01087    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
01088    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "readnCompressed(%u)", nbytes);
01089 
01090    rc = pthread_mutex_lock(&xa->readnMutex);
01091    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc);
01092 
01093    ret = xmlBlaster_readnCompressed(xa->connectionP->zlibReadBuf, fd, ptr, nbytes, fpNumRead, userP2);
01094 
01095    rc = pthread_mutex_unlock(&xa->readnMutex);
01096    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc);
01097 
01098    return ret;
01099 }
01100 
01101 #ifdef XmlBlasterAccessUnparsedMain /* compile a standalone test program */
01102 
01108 static bool myUpdate(MsgUnitArr *msgUnitArrP, void *userData, XmlBlasterException *xmlBlasterException)
01109 {
01110    size_t i;
01111    bool testException = false;
01112    if (userData != 0) ; /* to avoid compiler warning (we don't need it here) */
01113    for (i=0; i<msgUnitArrP->len; i++) {
01114       char *xml = messageUnitToXml(&msgUnitArrP->msgUnitArr[i]);
01115       printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n", xml);
01116       free(xml);
01117       msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
01118       /* Return QoS: Everything is OK */
01119    }
01120    if (testException) {
01121       strncpy0(xmlBlasterException->errorCode, "user.clientCode",
01122                XMLBLASTEREXCEPTION_ERRORCODE_LEN);
01123       strncpy0(xmlBlasterException->message, "I don't want these messages",
01124                XMLBLASTEREXCEPTION_MESSAGE_LEN);
01125       return false;
01126    }
01127    return true;
01128 }
01129 
01133 int main(int argc, char** argv)
01134 {
01135    int ii;
01136    int numTests = 1;
01137    bool testCallInitialize = false;
01138 
01139    for (ii=0; ii < argc-1; ii++)
01140       if (strcmp(argv[ii], "-numTests") == 0) {
01141          if (strToInt(&numTests, argv[++ii]) == false)
01142             printf("[XmlBlasterAccessUnparsed] WARN '-numTests %s' is invalid\n", argv[ii]);
01143       }
01144 
01145    for (ii=0; ii<numTests; ii++) {
01146       int iarg;
01147       char *response = (char *)0;
01148       /*
01149        * callbackSessionId:
01150        * Is created by the client and used to validate callback messages in update. 
01151        * This is sent on connect in ConnectQos.
01152        * (Is different from the xmlBlaster secret session ID)
01153        */
01154       const char *callbackSessionId = "topSecret";
01155       XmlBlasterException xmlBlasterException;
01156       XmlBlasterAccessUnparsed *xa = 0;
01157 
01158       /*
01159       const char *tmp = getStackTrace(20);
01160       printf("[client] stackTrace=%s\n", tmp);
01161       free(tmp);
01162       */
01163 
01164 #     ifdef PTHREAD_THREADS_MAX
01165          printf("[client] Try option '-help' if you need usage informations, max %d"
01166                 " threads per process are supported on this OS\n", PTHREAD_THREADS_MAX);
01167 #     else
01168          printf("[client] Try option '-help' if you need usage informations\n");
01169 #     endif
01170 
01171       for (iarg=0; iarg < argc; iarg++) {
01172          if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
01173             char usage[XMLBLASTER_MAX_USAGE_LEN];
01174             const char *pp =
01175             "\n   -logLevel            ERROR | WARN | INFO | TRACE | DUMP [WARN]"
01176             "\n   -numTests            How often to run the same tests [1]"
01177             "\n\nExample:"
01178             "\n   XmlBlasterAccessUnparsedMain -logLevel TRACE"
01179                  " -dispatch/connection/plugin/socket/hostname server.mars.universe";
01180             printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n",
01181                    getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp);
01182             exit(1);
01183          }
01184       }
01185 
01186       xa = getXmlBlasterAccessUnparsed(argc, argv);
01187 
01188       if (testCallInitialize) {
01189          if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) {
01190             printf("[client] Connection to xmlBlaster failed,"
01191                    " please start the server or check your configuration\n");
01192             freeXmlBlasterAccessUnparsed(xa);
01193             exit(1);
01194          }
01195       }
01196 
01197       {  /* connect */
01198          char connectQos[2048];
01199          char callbackQos[1024];
01200 
01201          if (testCallInitialize) {
01202             SNPRINTF(callbackQos, 1024,
01203                      "<queue relating='callback' maxEntries='100' maxEntriesCache='100'>"
01204                      "  <callback type='SOCKET' sessionId='%s'>"
01205                      "    socket://%.120s:%d"
01206                      "  </callback>"
01207                      "</queue>",
01208                      callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB);
01209          }
01210          else
01211             *callbackQos = '\0';
01212          
01213          SNPRINTF(connectQos, 2048,
01214                 "<qos>"
01215                 " <securityService type='htpasswd' version='1.0'>"
01216                 "  <![CDATA["
01217                 "   <user>fritz</user>"
01218                 "   <passwd>secret</passwd>"
01219                 "  ]]>"
01220                 " </securityService>"
01221                 "%.1024s"
01222                 "</qos>", callbackQos);
01223 
01224          response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException);
01225          if (*xmlBlasterException.errorCode != 0) {
01226             printf("[client] Caught exception during connect errorCode=%s, message=%s\n",
01227                    xmlBlasterException.errorCode, xmlBlasterException.message);
01228             freeXmlBlasterAccessUnparsed(xa);
01229             exit(1);
01230          }
01231          free(response);
01232          printf("[client] Connected to xmlBlaster, do some tests ...\n");
01233       }
01234 
01235       response = xa->ping(xa, 0, &xmlBlasterException);
01236       if (response == (char *)0) {
01237          printf("[client] ERROR: Pinging a connected server failed: errorCode=%s, message=%s\n",
01238             xmlBlasterException.errorCode, xmlBlasterException.message);
01239       }
01240       else {
01241          printf("[client] Pinging a connected server, response=%s\n", response);
01242          free(response);
01243       }
01244 
01245       { /* subscribe ... */
01246          const char *key = "<key oid='HelloWorld'/>";
01247          const char *qos = "<qos/>";
01248          printf("[client] Subscribe message 'HelloWorld' ...\n");
01249          response = xa->subscribe(xa, key, qos, &xmlBlasterException);
01250          if (*xmlBlasterException.errorCode != 0) {
01251             printf("[client] Caught exception in subscribe errorCode=%s, message=%s\n",
01252                    xmlBlasterException.errorCode, xmlBlasterException.message);
01253             xa->disconnect(xa, 0, &xmlBlasterException);
01254             freeXmlBlasterAccessUnparsed(xa);
01255             exit(1);
01256          }
01257          printf("[client] Subscribe success, returned status is '%s'\n", response);
01258          free(response);
01259       }
01260 
01261       {  /* publish ... */
01262          MsgUnit msgUnit;
01263          printf("[client] Publishing message 'HelloWorld' ...\n");
01264          msgUnit.key = strcpyAlloc("<key oid='HelloWorld'/>");
01265          msgUnit.content = strcpyAlloc("Some message payload");
01266          msgUnit.contentLen = strlen(msgUnit.content);
01267          msgUnit.qos =strcpyAlloc("<qos><persistent/></qos>");
01268          response = xa->publish(xa, &msgUnit, &xmlBlasterException);
01269          freeMsgUnitData(&msgUnit);
01270          if (*xmlBlasterException.errorCode != 0) {
01271             printf("[client] Caught exception in publish errorCode=%s, message=%s\n",
01272                    xmlBlasterException.errorCode, xmlBlasterException.message);
01273             xa->disconnect(xa, 0, &xmlBlasterException);
01274             freeXmlBlasterAccessUnparsed(xa);
01275             exit(1);
01276          }
01277          printf("[client] Publish success, returned status is '%s'\n", response);
01278          free(response);
01279       }
01280 
01281       {  /* unSubscribe ... */
01282          const char *key = "<key oid='HelloWorld'/>";
01283          const char *qos = "<qos/>";
01284          printf("[client] UnSubscribe message 'HelloWorld' ...\n");
01285          response = xa->unSubscribe(xa, key, qos, &xmlBlasterException);
01286          if (response) {
01287             printf("[client] Unsubscribe success, returned status is '%s'\n", response);
01288             free(response);
01289          }
01290          else {
01291             printf("[client] Caught exception in unSubscribe errorCode=%s, message=%s\n",
01292                    xmlBlasterException.errorCode, xmlBlasterException.message);
01293             xa->disconnect(xa, 0, &xmlBlasterException);
01294             freeXmlBlasterAccessUnparsed(xa);
01295             exit(1);
01296          }
01297       }
01298 
01299       {  /* get synchnronous ... */
01300          size_t i;
01301          const char *key = "<key queryType='XPATH'>//key</key>";
01302          const char *qos = "<qos/>";
01303          MsgUnitArr *msgUnitArr;
01304          printf("[client] Get synchronous messages with XPath '//key' ...\n");
01305          msgUnitArr = xa->get(xa, key, qos, &xmlBlasterException);
01306          if (*xmlBlasterException.errorCode != 0) {
01307             printf("[client] Caught exception in get errorCode=%s, message=%s\n",
01308                    xmlBlasterException.errorCode, xmlBlasterException.message);
01309             xa->disconnect(xa, 0, &xmlBlasterException);
01310             freeXmlBlasterAccessUnparsed(xa);
01311             exit(1);
01312          }
01313          if (msgUnitArr != (MsgUnitArr *)0) {
01314             for (i=0; i<msgUnitArr->len; i++) {
01315                char *contentStr = strFromBlobAlloc(msgUnitArr->msgUnitArr[i].content,
01316                                                 msgUnitArr->msgUnitArr[i].contentLen);
01317                const char *dots = (msgUnitArr->msgUnitArr[i].contentLen > 96) ?
01318                                   " ..." : "";
01319                printf("\n[client] Received message#%u/%u:\n"
01320                       "-------------------------------------"
01321                       "%s\n <content>%.100s%s</content>%s\n"
01322                       "-------------------------------------\n",
01323                       i+1, msgUnitArr->len,
01324                       msgUnitArr->msgUnitArr[i].key,
01325                       contentStr, dots,
01326                       msgUnitArr->msgUnitArr[i].qos);
01327                free(contentStr);
01328             }
01329             freeMsgUnitArr(msgUnitArr);
01330          }
01331          else {
01332             printf("[client] Caught exception in get errorCode=%s, message=%s\n",
01333                    xmlBlasterException.errorCode, xmlBlasterException.message);
01334             xa->disconnect(xa, 0, &xmlBlasterException);
01335             freeXmlBlasterAccessUnparsed(xa);
01336             exit(1);
01337          }
01338       }
01339 
01340 
01341       {  /* erase ... */
01342          const char *key = "<key oid='HelloWorld'/>";
01343          const char *qos = "<qos/>";
01344          printf("[client] Erasing message 'HelloWorld' ...\n");
01345          response = xa->erase(xa, key, qos, &xmlBlasterException);
01346          if (*xmlBlasterException.errorCode != 0) {
01347             printf("[client] Caught exception in erase errorCode=%s, message=%s\n",
01348                    xmlBlasterException.errorCode, xmlBlasterException.message);
01349             xa->disconnect(xa, 0, &xmlBlasterException);
01350             freeXmlBlasterAccessUnparsed(xa);
01351             exit(1);
01352          }
01353          printf("[client] Erase success, returned status is '%s'\n", response);
01354          free(response);
01355       }
01356 
01357       if (xa->disconnect(xa, 0, &xmlBlasterException) == false) {
01358          printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n",
01359                 xmlBlasterException.errorCode, xmlBlasterException.message);
01360          freeXmlBlasterAccessUnparsed(xa);
01361          exit(1);
01362       }
01363 
01364       freeXmlBlasterAccessUnparsed(xa);
01365       if (numTests > 1) {
01366          printf("[client] Successfully finished test #%d from %d\n\n", ii, numTests);
01367       }
01368    }
01369    printf("[client] Good bye.\n");
01370    return 0; /*exit(0);*/
01371 }
01372 #endif /* #ifdef XmlBlasterAccessUnparsedMain */
01373