socket/XmlBlasterAccessUnparsed.c

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002 Name:      XmlBlasterAccessUnparsed.c
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Wraps raw socket connection to xmlBlaster
00006            Implements sync connection and async callback
00007            Needs pthread to compile (multi threading).
00008 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
00009 Compile:
00010   LINUX:   gcc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -export-dynamic -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
00011            g++ -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
00012            icc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
00013   WIN:     cl /MT /W4 -DXmlBlasterAccessUnparsedMain -D_WINDOWS -I.. -I../pthreads /FeXmlBlasterAccessUnparsedMain.exe  XmlBlasterAccessUnparsed.c ..\util\msgUtil.c ..\util\Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c ws2_32.lib pthreadVC2.lib
00014            (download pthread for Windows and WinCE from http://sources.redhat.com/pthreads-win32)
00015   Solaris: cc  -DXmlBlasterAccessUnparsedMain -v -Xc -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
00016            CC  -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
00017 
00018   Linux with libxmlBlasterC.so:
00019            gcc -DXmlBlasterAccessUnparsedMain -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c  -L../../../lib -lxmlBlasterClientC -I.. -Wl,-rpath=../../../lib -D_REENTRANT  -lpthread
00020 See:       http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
00021 -----------------------------------------------------------------------------*/
00022 #include <stdio.h>
00023 #include <stdlib.h>
00024 #include <string.h>
00025 #if defined(WINCE)
00026 #  if defined(XB_USE_PTHREADS)
00027 #     include <pthreads/pthread.h>
00028 #  else
00029       /*#include <pthreads/need_errno.h> */
00030       static int errno=0; /* single threaded workaround*/
00031 #  endif
00032 #else
00033 #  include <errno.h>
00034 #  include <sys/types.h>
00035 #endif
00036 #include <socket/xmlBlasterSocket.h>
00037 #include <socket/xmlBlasterZlib.h>
00038 #include <XmlBlasterAccessUnparsed.h>
00039 
00043 typedef struct Dll_Export UpdateContainer {
00044    XmlBlasterAccessUnparsed *xa;
00045    MsgUnitArr *msgUnitArrP;
00046    void *userData;
00047    XmlBlasterException exception;     /* Holding a clone from the original as the callback thread may use it for another message */
00048    SocketDataHolder socketDataHolder; /* Holding a clone from the original */
00049 } UpdateContainer;
00050 
00051 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp update, XmlBlasterException *exception);
00052 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos, UpdateFp update, XmlBlasterException *exception);
00053 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
00054 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception);
00055 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
00056 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
00057 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
00058 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
00059 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
00060 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
00061 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
00062 static bool isConnected(XmlBlasterAccessUnparsed *xa);
00063 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder);
00064 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
00065 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
00066 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName, bool checkIsConnected, XmlBlasterException *exception);
00067 static void interceptUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException, void/*SocketDataHolder*/ *socketDataHolder);
00068 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception);
00069 static ssize_t writenPlain(void *xa, const int fd, const char *ptr, const size_t nbytes);
00070 static ssize_t writenCompressed(void *xa, const int fd, const char *ptr, const size_t nbytes);
00071 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
00072 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
00073 
00074 Dll_Export XmlBlasterAccessUnparsed *getXmlBlasterAccessUnparsed(int argc, const char* const* argv) {
00075    XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)calloc(1, sizeof(XmlBlasterAccessUnparsed));
00076    if (xa == 0) return xa;
00077    xa->argc = argc;
00078    xa->argv = argv;
00079    xa->props = createProperties(xa->argc, xa->argv);
00080    if (xa->props == 0) {
00081       freeXmlBlasterAccessUnparsed(xa);
00082       return (XmlBlasterAccessUnparsed *)0;
00083    }
00084    xa->isInitialized = false;
00085    xa->isShutdown = false;
00086    xa->connectionP = 0;
00087    xa->callbackP = 0;
00088    xa->userObject = 0; /* A client can use this pointer to point to any client specific information */
00089    xa->userFp = 0;
00090    xa->connect = xmlBlasterConnect;
00091    xa->initialize = initialize;
00092    xa->disconnect = xmlBlasterDisconnect;
00093    xa->publish = xmlBlasterPublish;
00094    xa->publishArr = xmlBlasterPublishArr;
00095    xa->publishOneway = xmlBlasterPublishOneway;
00096    xa->subscribe = xmlBlasterSubscribe;
00097    xa->unSubscribe = xmlBlasterUnSubscribe;
00098    xa->erase = xmlBlasterErase;
00099    xa->get = xmlBlasterGet;
00100    xa->ping = xmlBlasterPing;
00101    xa->isConnected = isConnected;
00102    xa->logLevel = parseLogLevel(xa->props->getString(xa->props, "logLevel", "WARN"));
00103    xa->log = xmlBlasterDefaultLogging;
00104    xa->logUserP = 0;
00105    xa->clientsUpdateFp = 0;
00106    xa->callbackMultiThreaded = xa->props->getBool(xa->props, "plugin/socket/multiThreaded", true);
00107    xa->callbackMultiThreaded = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/multiThreaded", xa->callbackMultiThreaded);
00108    /*   xa->lowLevelAutoAck = xa->props->getBool(xa->props, "plugin/socket/lowLevelAutoAck", false); */
00109    /*   xa->lowLevelAutoAck = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/lowLevelAutoAck", xa->lowLevelAutoAck); */
00110    /* Currently forced to false: needs mutex and reference counter to not freeMsgUnitArr twice */
00111    xa->lowLevelAutoAck = false;
00112 
00113    /* We shouldn't do much logging here, as the caller had no chance to redirect it up to now */
00114    if (xa->callbackMultiThreaded == true) {
00115       if (xa->logLevel>=XMLBLASTER_LOG_DUMP) 
00116          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__, "Multi threaded callback delivery is activated with -plugin/socket/multiThreaded true");
00117       /*xa->callbackMultiThreaded = false;*/
00118    }
00119    /* stdint.h: # define INT32_MAX              (2147483647) */
00120    xa->responseTimeout = xa->props->getLong(xa->props, "plugin/socket/responseTimeout", 2147483647L); /* Before xmlBlaster 1.1: One minute (given in millis) */
00121    xa->responseTimeout = xa->props->getLong(xa->props, "dispatch/connection/plugin/socket/responseTimeout", xa->responseTimeout);
00122    /* ERROR HANDLING ? xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Your configuration '-plugin/socket/responseTimeout %s' is invalid", argv[iarg]); */
00123    memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
00124    xa->threadCounter = 0;
00125 
00126    if (xa->logLevel>=XMLBLASTER_LOG_DUMP) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__,
00127                                 "Created handle: -logLevel=%s -plugin/socket/responseTimeout=%ld",
00128                                 getLogLevelStr(xa->logLevel), xa->responseTimeout);
00129 
00130    /* See: http://www.llnl.gov/computing/tutorials/workshops/workshop/pthreads/MAIN.html */
00131    pthread_mutex_init(&xa->writenMutex, NULL); /* returns always 0 */
00132    pthread_mutex_init(&xa->readnMutex, NULL);
00133    return xa;
00134 }
00135 
00136 Dll_Export void freeXmlBlasterAccessUnparsed(XmlBlasterAccessUnparsed *xa)
00137 {
00138    int rc;
00139 
00140    if (xa == 0) {
00141       char *stack = getStackTrace(10);
00142       printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to freeXmlBlasterAccessUnparsed() %s",
00143                 __FILE__, __LINE__, stack);
00144       free(stack);
00145       return;
00146    }
00147 
00148    if (xa->isShutdown) return; /* Avoid simultaneous multiple calls */
00149    xa->isShutdown = true;      /* Inhibit access to xa */
00150 
00151    if (xa->callbackP != 0) {
00152       xa->callbackP->shutdown(xa->callbackP);
00153    }
00154    if (xa->connectionP != 0) {
00155       xa->connectionP->shutdown(xa->connectionP);
00156    }
00157 
00158    if (xa->callbackP != 0) {
00159       /* Detach or join? On Linux both work fine. On Windows it blocks sometimes forever during join */
00160       const bool USE_DETACH_MODE = xa->props->getBool(xa->props, "plugin/socket/detachCbThread", true);
00161       int retVal;
00162       if (!xa->callbackP->isShutdown) {
00163 
00164          {  /* Wait for any pending update() dispatcher threads to die */
00165             int i;
00166             int num = 200;
00167             int interval = 10;
00168             for (i=0; i<num; i++) {
00169                if (xa->callbackP->isShutdown)
00170                   break;
00171                /*pthread_yield(0);*/
00172                sleepMillis(interval);
00173                if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00174                    "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for callback thread to join. %d/%d", interval, i, num);
00175             }
00176             if (i == num) {
00177                xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Proper shutdown of callback thread failed, it seems to block on the socket");
00178             }
00179          }
00180 
00181          if (!USE_DETACH_MODE) {
00182             /* pthread_cancel() does not block. Who cleans up open resources? TODO: pthread_cleanup_push() */
00183             /* On Linux all works fine without pthread_cancel() but on Windows the later pthread_join() sometimes hangs without a pthread_cancel() */
00184             /*
00185             retVal = pthread_cancel(xa->callbackThreadId);
00186             if (retVal != 0) {
00187                xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cancel problem return value is %d", retVal);
00188             }
00189             */
00190          }
00191       }
00192 
00193       if (USE_DETACH_MODE) {
00194          retVal = pthread_detach(xa->callbackThreadId); /* Frees resources (even if thread has died already), don't call multiple times on same thread! */
00195          if (retVal != 0) {
00196             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching callback thread 0x%x failed with error number %d", __LINE__, get_pthread_id(xa->callbackThreadId), retVal);
00197          }
00198          else {
00199             if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00200                                           "pthread_detach(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
00201          }
00202       }
00203       else { /* JOIN mode */
00204          retVal = pthread_join(xa->callbackThreadId, 0);
00205          if (retVal != 0) {
00206             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_join problem return value is %d", retVal);
00207          }
00208          else {
00209             if (xa->logLevel>=XMLBLASTER_LOG_INFO) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00210                                           "pthread_join(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
00211          }
00212       }
00213 
00214       memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
00215    }
00216 
00217    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterAccessUnparsed() conP=0x%x cbP=0x%x", xa->connectionP, xa->callbackP);
00218 
00219    {  /* Wait for any pending update() dispatcher threads to die */
00220       int i;
00221       int num = 1000;
00222       int interval = 10;
00223       for (i=0; i<num; i++) {
00224          if ((int)xa->threadCounter < 1)
00225             break;
00226          sleepMillis(interval);
00227          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00228              "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for update thread to join. %d/%d", interval, i, num);
00229       }
00230       if (i >= num) {
00231          if (xa->logLevel>=XMLBLASTER_LOG_ERROR) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00232              "freeXmlBlasterAccessUnparsed(): There are active callback threads in user code which didn't return after sleeping for %ld millis, we continue now to shutdown ...", (long)interval*num);
00233       }
00234    }
00235 
00236    if (xa->connectionP != 0) {
00237       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
00238    }
00239 
00240    if (xa->callbackP != 0) {
00241       freeCallbackServerUnparsed(&xa->callbackP);
00242    }
00243 
00244    freeProperties(xa->props);
00245 
00246    rc = pthread_mutex_destroy(&xa->writenMutex); /* On Linux this does nothing, but returns an error code EBUSY if the mutex was locked */
00247    if (rc != 0) /* EBUSY */
00248       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(writenMutex) returned %d, we ignore it", rc);
00249 
00250    rc = pthread_mutex_destroy(&xa->readnMutex);
00251    if (rc != 0) /* EBUSY */
00252       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(readnMutex) returned %d, we ignore it", rc);
00253 
00254    free(xa);
00255 }
00256 
00257 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp clientUpdateFp, XmlBlasterException *exception)
00258 {
00259    int threadRet = 0;
00260    const char *compressType = 0;
00261 
00262    if (checkArgs(xa, "initialize", false, exception) == false) return false;
00263 
00264    if (xa->isInitialized) {
00265       return true;
00266    }
00267 
00268    if (clientUpdateFp == 0) {
00269       xa->clientsUpdateFp = 0;
00270       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, "",
00271         "Your callback UpdateFp pointer is NULL, we use our default callback handler");
00272    }
00273    else {
00274       xa->clientsUpdateFp = clientUpdateFp;
00275    }
00276 
00277    if (xa->connectionP) {
00278       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
00279    }
00280    xa->connectionP = getXmlBlasterConnectionUnparsed(xa->argc, xa->argv);
00281    if (xa->connectionP == 0) {
00282       strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00283       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00284                "[%.100s:%d] Creating XmlBlasterConnectionUnparsed failed", __FILE__, __LINE__);
00285       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00286       return false;
00287    }
00288    xa->connectionP->log = xa->log;
00289    xa->connectionP->logUserP = xa->logUserP;
00290    xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Created XmlBlasterConnectionUnparsed");
00291 
00292 
00293    /* Switch on compression? */
00294    compressType = xa->props->getString(xa->props, "plugin/socket/compress/type", "");
00295    compressType = xa->props->getString(xa->props, "dispatch/connection/plugin/socket/compress/type", compressType);
00296          
00297    if (!strcmp(compressType, "zlib:stream")) {
00298       xa->connectionP->writeToSocket.writeToSocketFuncP = writenCompressed;
00299       xa->connectionP->writeToSocket.userP = xa;
00300       xa->connectionP->readFromSocket.readFromSocketFuncP = readnCompressed;
00301       xa->connectionP->readFromSocket.userP = xa;
00302    }
00303    else {
00304       if (strcmp(compressType, "")) {
00305          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode", compressType);
00306       }
00307       xa->connectionP->writeToSocket.writeToSocketFuncP = writenPlain;
00308       xa->connectionP->writeToSocket.userP = xa;
00309       xa->connectionP->readFromSocket.readFromSocketFuncP = readnPlain;
00310       xa->connectionP->readFromSocket.userP = xa;
00311    }
00312 
00313    if (xa->connectionP->initConnection(xa->connectionP, exception) == false) /* Establish low level IP connection */
00314       return false;
00315 
00316    /* the fourth arg 'xa' is returned as 'void *userData' in update() method */
00317    if (xa->callbackP != 0) {
00318       freeCallbackServerUnparsed(&xa->callbackP);
00319    }
00320    xa->callbackP = getCallbackServerUnparsed(xa->argc, xa->argv, interceptUpdate, xa);
00321    if (xa->callbackP == 0) {
00322       strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00323       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00324                "[%.100s:%d] Creating CallbackServerUnparsed failed", __FILE__, __LINE__);
00325       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00326       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
00327       return false;
00328    }
00329    xa->callbackP->log = xa->log;
00330    xa->callbackP->logUserP = xa->logUserP;
00331 
00332    if (!strcmp(compressType, "zlib:stream")) {
00333       xa->callbackP->writeToSocket.writeToSocketFuncP = writenCompressed;
00334       xa->callbackP->writeToSocket.userP = xa;
00335       xa->callbackP->readFromSocket.readFromSocketFuncP = readnCompressed;
00336       xa->callbackP->readFromSocket.userP = xa;
00337    }
00338    else {
00339       xa->callbackP->writeToSocket.writeToSocketFuncP = writenPlain;
00340       xa->callbackP->writeToSocket.userP = xa;
00341       xa->callbackP->readFromSocket.readFromSocketFuncP = readnPlain;
00342       xa->callbackP->readFromSocket.userP = xa;
00343    }
00344 
00345    xa->callbackP->useThisSocket(xa->callbackP, xa->connectionP->socketToXmlBlaster, xa->connectionP->socketToXmlBlasterUdp);
00346 
00347    xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00348           "Created CallbackServerUnparsed instance, creating on a separate thread a listener on socket://%s:%d...",
00349           (xa->callbackP->hostCB == 0) ? "" : xa->callbackP->hostCB, xa->callbackP->portCB);
00350 
00351    /* Register our callback funtion which is called just before sending a message */
00352    xa->connectionP->preSendEvent = preSendEvent;
00353    xa->connectionP->preSendEvent_userP = xa;
00354 
00355    /* Register our callback funtion which is called just after sending a message */
00356    xa->connectionP->postSendEvent = postSendEvent;
00357    xa->connectionP->postSendEvent_userP = xa;
00358 
00359    /* thread blocks on socket listener */
00360    threadRet = pthread_create(&xa->callbackThreadId, (const pthread_attr_t *)0, (void * (*)(void *))xa->callbackP->runCallbackServer, (void *)xa->callbackP);
00361    if (threadRet != 0) {
00362       strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00363       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00364                "[%.100s:%d] Creating thread failed with error number %d",
00365                __FILE__, __LINE__, threadRet);
00366       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00367       freeCallbackServerUnparsed(&xa->callbackP);
00368       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
00369       return false;
00370    }
00371 
00372    xa->isInitialized = true;
00373    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00374                                 "initialize() successful");
00375    return xa->isInitialized;
00376 }
00377 
00378 static bool isConnected(XmlBlasterAccessUnparsed *xa)
00379 {
00380    if (xa == 0 || xa->isShutdown || xa->connectionP == 0) {
00381       return false;
00382    }
00383    return xa->connectionP->isConnected(xa->connectionP);
00384 }
00385 
00396 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
00397 {
00398    bool retVal;
00399    XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
00400 
00401    /* if (!strcmp(XMLBLASTER_PUBLISH_ONEWAY, msgRequestInfoP->methodName)) */
00402    if (xbl_isOneway(MSG_TYPE_INVOKE, msgRequestInfoP->methodName))
00403       return msgRequestInfoP;
00404 
00405    /* ======== Initialize threading ====== */
00406    msgRequestInfoP->responseMutexIsLocked = false; /* Only to remember if the client thread holds the lock */
00407 
00408    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00409                                 "preSendEvent(%s) occurred", msgRequestInfoP->methodName);
00410    retVal = xa->callbackP->addResponseListener(xa->callbackP, msgRequestInfoP, responseEvent);
00411    if (retVal == false) {
00412       strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00413       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00414                "[%.100s:%d] Couldn't register as response listener", __FILE__, __LINE__);
00415       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00416       return (MsgRequestInfo *)0;
00417    }
00418 
00419    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00420                   "preSendEvent(requestId=%s, msgRequestInfoP->responseBlob.dataLen=%d), entering lock",
00421                   msgRequestInfoP->requestIdStr, msgRequestInfoP->responseBlob.dataLen);
00422    pthread_mutex_init(&msgRequestInfoP->responseMutex, NULL); /* returns always 0 */
00423    if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
00424       strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00425       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00426                "[%.100s:%d] Error trying to lock responseMutex %d", __FILE__, __LINE__, retVal);
00427       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00428       return (MsgRequestInfo *)0;
00429    }
00430    msgRequestInfoP->responseMutexIsLocked = true; /* Only if the client thread holds the lock */
00431 
00432    return msgRequestInfoP;
00433 }
00434 
00443 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder) {
00444    int retVal;
00445    SocketDataHolder *s = (SocketDataHolder *)socketDataHolder;
00446    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
00447 
00448    if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
00449       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to lock responseMutex in responseEvent() failed %d", retVal);
00450       /* return; */
00451    }
00452    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is LOCKED");
00453 
00454    blobcpyAlloc(&msgRequestInfoP->responseBlob, s->blob.data, s->blob.dataLen);
00455    msgRequestInfoP->responseType = s->type;
00456 
00457    if ((retVal = pthread_cond_signal(&msgRequestInfoP->responseCond)) != 0) {
00458       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d", retVal);
00459       /* return; */
00460    }
00461 
00462    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00463                                 "responseEvent(requestId '%s', msgType=%c, dataLen=%d) occurred, wake up signal sent",
00464                                 s->requestId, msgRequestInfoP->responseType, msgRequestInfoP->responseBlob.dataLen);
00465 
00466    if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) {
00467       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to unlock responseMutex in responseEvent() failed %d", retVal);
00468       /* return; */
00469    }
00470    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is UNLOCKED");
00471 }
00472 
00480 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
00481 {
00482    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
00483    struct timespec abstime;
00484    bool useTimeout = false;
00485    int retVal;
00486 
00487    if (msgRequestInfoP->rollback) {
00488       mutexUnlock(msgRequestInfoP, exception);
00489       return (MsgRequestInfo *)0;
00490    }
00491 
00492    if (xa->responseTimeout > 0 && getAbsoluteTime(xa->responseTimeout, &abstime) == true) {
00493       useTimeout = true;
00494    }
00495 
00496    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) responseMutex is LOCKED, entering wait ...", msgRequestInfoP->requestIdStr);
00497    
00498    if ((retVal = pthread_cond_init(&msgRequestInfoP->responseCond, NULL)) != 0) {
00499       xa->