1 /*----------------------------------------------------------------------------
   2 Name:      XmlBlasterConnectionUnparsed.c
   3 Project:   xmlBlaster.org
   4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
   5 Comment:   Wraps raw socket connection to xmlBlaster
   6            for complete synchronous xmlBlaster access,
   7            without callbacks and not threading necessary
   8            socket connect timeout may be specified ja, bj
   9 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
  10 See:       http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
  11 -----------------------------------------------------------------------------*/
  12 #include <stdio.h>
  13 #include <stdlib.h>
  14 #include <string.h>
  15 #include <ctype.h> /* isalpha() */
  16 #if defined(WINCE)
  17 #  if defined(XB_USE_PTHREADS)
  18 #     include <pthreads/pthread.h>
  19 #  else
  20       /*#include <pthreads/need_errno.h> */
  21       static int errno=0; /* single threaded workaround*/
  22 #  endif
  23 #else
  24 #  include <errno.h>
  25 #  include <sys/types.h>
  26 #endif
  27 #include <socket/xmlBlasterSocket.h>
  28 #include <socket/xmlBlasterZlib.h>
  29 #include <XmlBlasterConnectionUnparsed.h>
  30 #include <util/Timestampc.h>
  31 #define SOCKET_TCP false
  32 
  33 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception);
  34 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception);
  35 static bool getResponse(XmlBlasterConnectionUnparsed *xb, SocketDataHolder *responseSocketDataHolder, XmlBlasterException *exception, bool udp);
  36 static char *xmlBlasterConnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
  37 static bool xmlBlasterDisconnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
  38 static char *xmlBlasterPublish(XmlBlasterConnectionUnparsed *xb, MsgUnit *msgUnit, XmlBlasterException *exception);
  39 static QosArr *xmlBlasterPublishArr(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
  40 static void xmlBlasterPublishOneway(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
  41 static char *xmlBlasterSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
  42 static QosArr *xmlBlasterUnSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
  43 static QosArr *xmlBlasterErase(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
  44 static MsgUnitArr *xmlBlasterGet(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
  45 static char *xmlBlasterPing(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
  46 static bool isConnected(XmlBlasterConnectionUnparsed *xb);
  47 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb);
  48 static ssize_t writenPlain(void *xb, const int fd, const char *ptr, const size_t nbytes);
  49 static ssize_t writenCompressed(void *xb, const int fd, const char *ptr, const size_t nbytes);
  50 static ssize_t readnPlain(void *xb, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2);
  51 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2);
  52 static bool checkArgs(XmlBlasterConnectionUnparsed *xb, const char *methodName, bool checkIsConnected, XmlBlasterException *exception);
  53 
  54 /**
  55  * Create a new instance to handle a synchronous connection to the server.
  56  * This is usually the first call of a client.
  57  * @return NULL if bootstrapping failed. If not NULL you need to free() it when you are done
  58  * usually by calling freeXmlBlasterConnectionUnparsed().
  59  */
  60 XmlBlasterConnectionUnparsed *getXmlBlasterConnectionUnparsed(int argc, const char* const* argv) {
  61    XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)calloc(1, sizeof(XmlBlasterConnectionUnparsed));
  62    if (xb == 0) return xb;
  63    xb->argc = argc;
  64    xb->argv = argv;
  65    xb->props = createProperties(xb->argc, xb->argv);
  66    if (xb->props == 0) {
  67       freeXmlBlasterConnectionUnparsed(&xb);
  68       return (XmlBlasterConnectionUnparsed *)0;
  69    }
  70 #ifdef __IPhoneOS__
  71       xb->cfSocketRef = nil;
  72       xb->readStream = nil;
  73       xb->writeStream = nil;
  74 #endif
  75    xb->socketToXmlBlaster = -1;
  76    xb->socketToXmlBlasterUdp = -1;
  77    xb->isInitialized = false;
  78    *xb->secretSessionId = 0;
  79    xb->initConnection = initConnection;
  80    xb->initQueue = xmlBlasterInitQueue;
  81    xb->connect = xmlBlasterConnect;
  82    xb->disconnect = xmlBlasterDisconnect;
  83    xb->publish = xmlBlasterPublish;
  84    xb->publishArr = xmlBlasterPublishArr;
  85    xb->publishOneway = xmlBlasterPublishOneway;
  86    xb->subscribe = xmlBlasterSubscribe;
  87    xb->unSubscribe = xmlBlasterUnSubscribe;
  88    xb->erase = xmlBlasterErase;
  89    xb->get = xmlBlasterGet;
  90    xb->ping = xmlBlasterPing;
  91    xb->isConnected = isConnected;
  92    xb->shutdown = xmlBlasterConnectionShutdown;
  93    xb->preSendEvent = 0;
  94    xb->preSendEvent_userP = 0;
  95    xb->postSendEvent = 0;
  96    xb->postSendEvent_userP = 0;
  97    xb->queueP = 0;
  98    xb->logLevel = parseLogLevel(xb->props->getString(xb->props, "logLevel", "WARN"));
  99    xb->log = xmlBlasterDefaultLogging;
 100    xb->logUserP = 0;
 101    xb->useUdpForOneway = false;
 102    xb->writeToSocket.writeToSocketFuncP = 0;
 103    xb->writeToSocket.userP = xb;
 104    xb->zlibWriteBuf = 0;
 105    xb->readFromSocket.readFromSocketFuncP = 0;
 106    xb->readFromSocket.userP = xb;
 107    xb->zlibReadBuf = 0;
 108    return xb;
 109 }
 110 
 111 void freeXmlBlasterConnectionUnparsed(XmlBlasterConnectionUnparsed **xb_)
 112 {
 113    XmlBlasterConnectionUnparsed *xb = *xb_;
 114    if (xb != 0) {
 115       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterConnectionUnparsed 0x%x", xb);
 116       freeProperties(xb->props);
 117       if (xb->zlibWriteBuf) {
 118          xmlBlaster_endZlibWriter(xb->zlibWriteBuf);
 119          free(xb->zlibWriteBuf);
 120          xb->zlibWriteBuf = 0;
 121       }
 122       if (xb->zlibReadBuf) {
 123          xmlBlaster_endZlibReader(xb->zlibReadBuf);
 124          free(xb->zlibReadBuf);
 125          xb->zlibReadBuf = 0;
 126       }
 127       xmlBlasterConnectionShutdown(xb);
 128       free(xb);
 129       *xb_ = 0;
 130    }
 131 }
 132 
 133 /**
 134  * Connects on TCP/IP level to xmlBlaster
 135  * @return true If the low level TCP/IP connect to xmlBlaster succeeded
 136  */
 137 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception)
 138 {
 139    const char *servTcpPort = 0;
 140 
 141    struct sockaddr_in xmlBlasterAddr;
 142    struct hostent hostbuf, *hostP = 0;
 143    struct servent *portP = 0;
 144 
 145    size_t hstbuflen=0;
 146 
 147    char serverHostName[256];
 148    char errP[MAX_ERRNO_LEN];
 149 
 150 #if defined(_WINDOWS)
 151    WORD wVersionRequested;
 152    WSADATA wsaData;
 153    int err;
 154    wVersionRequested = MAKEWORD( 2, 2 );
 155    err = WSAStartup( wVersionRequested, &wsaData );
 156    if ( err != 0 ) {
 157       strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 158       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL", __FILE__, __LINE__);
 159       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 160       return false;
 161    }
 162 
 163    if ( LOBYTE( wsaData.wVersion ) != 2 ||
 164    HIBYTE( wsaData.wVersion ) != 2 ) {
 165       WSACleanup( );
 166       strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 167       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL which supports version 2.2", __FILE__, __LINE__);
 168       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 169       return false;
 170    }
 171 # endif
 172    *errP = 0;
 173 
 174    if (xb->isInitialized) {
 175       return true;
 176    }
 177 
 178    {  /* Switch on compression? */
 179       const char *compressType = xb->props->getString(xb->props, "plugin/socket/compress/type", "");
 180       compressType = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/compress/type", compressType);
 181 
 182       if (!strcmp(compressType, "zlib:stream")) {
 183 
 184          xb->zlibWriteBuf = (XmlBlasterZlibWriteBuffers *)malloc(sizeof(struct XmlBlasterZlibWriteBuffers));
 185          xb->zlibReadBuf = (XmlBlasterZlibReadBuffers *)malloc(sizeof(struct XmlBlasterZlibReadBuffers));
 186 
 187          if (xmlBlaster_initZlibWriter(xb->zlibWriteBuf) != 0/*Z_OK*/) {
 188             if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
 189                   "Failed switching on 'plugin/socket/compress/type=%s'", compressType);
 190             strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 191             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 192                      "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'",
 193                      __FILE__, __LINE__, compressType);
 194             free(xb->zlibWriteBuf);
 195             xb->zlibWriteBuf = 0;
 196             free(xb->zlibReadBuf);
 197             xb->zlibReadBuf = 0;
 198             return false;
 199          }
 200 
 201          if (xmlBlaster_initZlibReader(xb->zlibReadBuf) != 0/*Z_OK*/) {
 202             if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
 203                   "Failed switching on 'plugin/socket/compress/type=%s'", compressType);
 204             strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 205             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 206                      "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'",
 207                      __FILE__, __LINE__, compressType);
 208             free(xb->zlibWriteBuf);
 209             xb->zlibWriteBuf = 0;
 210             free(xb->zlibReadBuf);
 211             xb->zlibReadBuf = 0;
 212             return false;
 213          }
 214 
 215          if (xb->logLevel>=XMLBLASTER_LOG_DUMP) {
 216             xb->zlibWriteBuf->debug = true;
 217             xb->zlibReadBuf->debug = true;
 218          }
 219 
 220          if (!xb->writeToSocket.writeToSocketFuncP) {  /* Accept setting from XmlBlasterAccessUnparsed */
 221             xb->writeToSocket.writeToSocketFuncP = writenCompressed;
 222             xb->readFromSocket.readFromSocketFuncP = readnCompressed;
 223          }
 224       }
 225       else {
 226          if (strcmp(compressType, "")) {
 227             xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode.", compressType);
 228          }
 229          if (!xb->writeToSocket.writeToSocketFuncP) {  /* Accept setting from XmlBlasterAccessUnparsed */
 230             xb->writeToSocket.writeToSocketFuncP = writenPlain;
 231             xb->readFromSocket.readFromSocketFuncP = readnPlain;
 232          }
 233       }
 234    }
 235 
 236 
 237    servTcpPort = xb->props->getString(xb->props, "plugin/socket/port", "7607");
 238    servTcpPort = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/port", servTcpPort);
 239 
 240    strncpy0(serverHostName, "localhost", 250);
 241    gethostname(serverHostName, 250);
 242    {
 243       const char *hn = xb->props->getString(xb->props, "plugin/socket/hostname", serverHostName);
 244       memmove(serverHostName, hn, strlen(hn)+1);  /* including '\0' */
 245       hn = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/hostname", serverHostName);
 246       memmove(serverHostName, hn, strlen(hn)+1);
 247    }
 248 
 249    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 250       "Lookup xmlBlaster on -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %s ...",
 251       serverHostName, servTcpPort);
 252 
 253    *xb->secretSessionId = 0;
 254    memset((char *)&xmlBlasterAddr, 0, sizeof(xmlBlasterAddr));
 255    xmlBlasterAddr.sin_family=AF_INET;
 256 
 257 # ifdef _WINDOWS_NOT_YET_PORTED /* Windows gethostbyname is deprecated */
 258    const struct addrinfo hints;
 259    struct addrinfo** res;
 260    int getaddrinfo(serverHostName, servTcpPort, &hints, res);
 261    res->ai_next : ai_family, ai_socktype, and ai_protocol
 262 
 263    ...
 264 
 265    void freeaddrinfo(*res);
 266 # endif
 267         if (isalpha(serverHostName[0]) || strchr(serverHostName,':') != 0) { /* look for dns name or ipv6 */
 268            char *tmphstbuf=0;
 269            memset((char *)&hostbuf, 0, sizeof(struct hostent));
 270            xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server DNS lookup of hostname '%s'", serverHostName);
 271            hostP = gethostbyname_re(serverHostName, &hostbuf, &tmphstbuf, &hstbuflen, errP);
 272            if (hostP == 0) {
 273               if (*errP != 0) {
 274                  strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 275                  SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 276                     "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s: %s",
 277                           __FILE__, __LINE__, serverHostName, servTcpPort, errP);
 278                  xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
 279                  *errP = 0;
 280               }
 281               else {
 282                  strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 283                  SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 284                           "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s, errno=%d",
 285                           __FILE__, __LINE__, serverHostName, servTcpPort, errno);
 286                  xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
 287               }
 288               return false;
 289            }
 290            xmlBlasterAddr.sin_addr.s_addr = ((struct in_addr *)(hostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */
 291            free(tmphstbuf);
 292         }
 293         else {
 294            xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server IP4 usage (without DNS lookup) of IP '%s'", serverHostName);
 295            /* use ip4 addr directly to avoid dns lookup */
 296            xmlBlasterAddr.sin_addr.s_addr = inet_addr(serverHostName);
 297         }
 298 
 299    portP = getservbyname(servTcpPort, "tcp");
 300    if (portP != 0)
 301                 xmlBlasterAddr.sin_port = (u_short)portP->s_port;
 302    else
 303       xmlBlasterAddr.sin_port = htons((u_short)atoi(servTcpPort));
 304 #ifdef __IPhoneOS__
 305    xb->socketToXmlBlaster = 0;
 306 #else
 307    xb->socketToXmlBlaster = (int)socket(AF_INET, SOCK_STREAM, 0);
 308 #endif
 309    if (xb->socketToXmlBlaster != -1) {
 310       int ret=0;
 311       const char *localHostName = xb->props->getString(xb->props, "plugin/socket/localHostname", 0);
 312       int localPort = xb->props->getInt(xb->props, "plugin/socket/localPort", 0);
 313       localHostName = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/localHostname", localHostName);
 314       localPort = xb->props->getInt(xb->props, "dispatch/connection/plugin/socket/localPort", localPort);
 315 
 316       /* Sometimes a user may whish to force the local host/port setting (e.g. for firewall tunneling
 317          and on multi homed hosts */
 318       if (localHostName != 0 || localPort > 0) {
 319          struct sockaddr_in localAddr;
 320          struct hostent localHostbuf, *localHostP = 0;
 321          char *tmpLocalHostbuf=0;
 322          size_t localHostbuflen=0;
 323          memset(&localAddr, 0, sizeof(localAddr));
 324          localAddr.sin_family = AF_INET;
 325          if (localHostName) {
 326             if (isalpha(localHostName[0]) || strchr(localHostName,':') != 0) { /* look for dns name or ipv6 */
 327                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local hostname DNS lookup of hostname '%s'", localHostName);
 328                *errP = 0;
 329                localHostP = gethostbyname_re(localHostName, &localHostbuf, &tmpLocalHostbuf, &localHostbuflen, errP);
 330                if (*errP != 0) {
 331                   strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 332                   SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 333                            "[%.100s:%d] Lookup of local IP failed, %s",
 334                            __FILE__, __LINE__, errP);
 335                   xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
 336                   *errP = 0;
 337                }
 338                if (localHostP != 0) {
 339                   localAddr.sin_addr.s_addr = ((struct in_addr *)(localHostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */
 340                   free(tmpLocalHostbuf);
 341                }
 342             }
 343             else {
 344                 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local IP4 usage (without DNS lookup) of IP '%s'", localHostName);
 345                /* use ip4 addr directly to avoid dns lookup */
 346                localAddr.sin_addr.s_addr = inet_addr(localHostName);
 347             }
 348          }
 349          if (localPort > 0) {
 350             localAddr.sin_port = htons((unsigned short)localPort);
 351          }
 352          if (bind(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
 353             if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
 354                "Failed binding local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
 355                   localHostName, localPort);
 356          }
 357          else {
 358             xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
 359                "Bound local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
 360                   localHostName, localPort);
 361          }
 362       }
 363 
 364       /* int retval = fcntl(xb->socketToXmlBlaster, F_SETFL, O_NONBLOCK); */ /* Switch on none blocking mode: we then should use select() to be notified when the kernel succeeded with connect() */
 365 #ifdef __IPhoneOS__
 366       globalIPhoneXb = xb;
 367       {
 368          CFStringRef hostnameRef =CFStringCreateWithCString (kCFAllocatorDefault, serverHostName, kCFStringEncodingUTF8);
 369          CFHostRef hostRef = CFHostCreateWithName (kCFAllocatorDefault, hostnameRef);
 370          CFStreamCreatePairWithSocketToCFHost (kCFAllocatorDefault,hostRef, atoi(servTcpPort), &xb->readStream, &xb->writeStream);
 371       }
 372       if(xb->readStream != nil && xb->writeStream != nil)
 373       {
 374          ret = 0;
 375          if(!CFWriteStreamOpen (xb->writeStream)) /* true if stream was successfully opened */
 376             ret = -1;
 377          if(!CFReadStreamOpen (xb->readStream))
 378             ret = -1;
 379          if (ret != -1) {
 380             if (!isIPhoneSocketConnectionEstablished(xb))
 381                ret = -1;
 382          }
 383       }
 384       else
 385       {
 386          ret = -1;
 387       }
 388 #else
 389       {
 390          /*
 391             xmlBlasterProps:
 392               dispatch/connection/useSelect=1
 393               dispatch/connection/plugin/socket/connectTimeout=3
 394               Johannes Ahlert:
 395                   beides ist so laufzeitabh�ngig, find ich noch besser
 396                   default f�r useSelect ist 0
 397          */
 398          int useSelect = 0;
 399          useSelect = xb->props->getInt(xb->props, "dispatch/connection/plugin/socket/useSelect", useSelect);
 400 
 401          if ( useSelect ) 
 402          {
 403 #if defined(_WINDOWS)
 404             /* Die Variante mit select erfordert, dass der Socket nicht-blockierend
 405                gemacht wird. Dies kann (und wird in diesem Beispiel) nach dem Verbinden
 406                wieder rueckgaengig gemacht, sodass man wie gewohnt mit dem Socket arbeiten kann.
 407             */
 408             fd_set         fds;
 409             int            connectTimeout = 3;
 410             unsigned long  opt            = 1;
 411             struct timeval timeout;
 412             int conret, wsaret, rets;
 413 
 414             ioctlsocket( xb->socketToXmlBlaster, FIONBIO, &opt );
 415 
 416             /*
 417                Den Verbindungsaufbau anstossen
 418                returns 0 or SOCKET_ERROR only
 419             */
 420             if ( (conret = connect(xb->socketToXmlBlaster, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) == SOCKET_ERROR )
 421             {
 422                /*
 423                   Das schlaegt normalerweise fehl, wobei der Fehler WSAEWOULDBLOCK
 424                   darauf hinweist, dass der Verbindungsaufbau durchaus noch Erfolgreich
 425                   sein kann, und der Aufruf nur fehlgeschlagen ist, weil er andernfalls
 426                   blockieren wuerde, was ja absichtlich deaktiviert wurde.
 427                */
 428                if ( (wsaret = WSAGetLastError()) != WSAEWOULDBLOCK ) {
 429                   return false; /* kein logging und socket wird nicht wieder blockierend?   TODO  ?? */
 430                }
 431 
 432                /* Deskriptor-Set zuruecksetzen und mit dem zu verbindenden Socket belegen */
 433                FD_ZERO( &fds );
 434                FD_SET( xb->socketToXmlBlaster, &fds );
 435 
 436                /* Den gewaehlte timeout-Wert einsetzen */
 437                connectTimeout  = xb->props->getInt(xb->props, "dispatch/connection/plugin/socket/connectTimeout", connectTimeout);
 438                timeout.tv_sec  = connectTimeout;
 439                timeout.tv_usec = 0;
 440 
 441                /* Nun select aufrufen; dieses kehrt entweder nach Ablauf des Timeouts
 442                   zurueck, oder wenn der Socket zum Schreiben bereit ist, was genau dann
 443                   passiert, wenn er erfolgreich verbunden wurde.
 444 
 445                   The select function returns the total number of socket handles that are ready and contained in the fd_set structures,
 446                   zero if the time limit expired, or SOCKET_ERROR if an error occurred.
 447                   If the return value is SOCKET_ERROR, WSAGetLastError can be used to retrieve a specific error code.
 448                */
 449                rets = select( xb->socketToXmlBlaster + 1, 0, &fds, 0, &timeout );
 450                if ( rets == SOCKET_ERROR )
 451                   ret = -1;
 452 
 453                if (rets == 0)
 454                   ret = -1; /* timeout */
 455 
 456                /* Falls select zurueckgekehrt ist, aber der zu verbindende Socket nicht
 457                   im Deskriptor-Set vorhanden ist, war das Verbinden in der gegebenen
 458                   Zeit nicht erfolgreich.
 459                */
 460                if ( FD_ISSET(xb->socketToXmlBlaster, &fds) == 0 )
 461                   ret = -1; /* timeout anderer test */
 462             }
 463 
 464             /*
 465                Der Socket kann nun wieder blockierend gemacht werden
 466             */
 467             opt = 0;
 468             ioctlsocket( xb->socketToXmlBlaster, FIONBIO, &opt );
 469 #else
 470             printf("dispatch/connection/plugin/socket/useSelect is not supported on Linux, exit! (For Linux porting you could use fcntl)");
 471             exit(1);
 472 #endif
 473          }
 474          else {
 475             ret=connect(xb->socketToXmlBlaster, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr));
 476          }
 477       }
 478 #endif
 479       if (ret == -1) {
 480          char errnoStr[MAX_ERRNO_LEN];
 481          xb_strerror(errnoStr, MAX_ERRNO_LEN, errno);
 482          strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 483          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 484                   "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed, ret=%d, %s",
 485                   __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr);
 486          if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 487          return false;
 488       }
 489 
 490       if (xb->logLevel>=XMLBLASTER_LOG_INFO) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Connected to xmlBlaster");
 491       xb->useUdpForOneway = xb->props->getBool(xb->props, "plugin/socket/useUdpForOneway", xb->useUdpForOneway);
 492       xb->useUdpForOneway = xb->props->getBool(xb->props, "dispatch/connection/plugin/socket/useUdpForOneway", xb->useUdpForOneway);
 493 
 494       if (xb->useUdpForOneway) {
 495          struct sockaddr_in localAddr;
 496          socklen_t size = (socklen_t)sizeof(localAddr);
 497          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
 498             "Using UDP connection for oneway calls, see -dispatch/connection/plugin/socket/useUdpForOneway true");
 499 
 500          xb->socketToXmlBlasterUdp = (int)socket(AF_INET, SOCK_DGRAM, 0);
 501 
 502          if (xb->socketToXmlBlasterUdp != -1) {
 503             if (getsockname(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, &size) == -1) {
 504                if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
 505                   "Can't determine the local socket host and port (in UDP), errno=%d", errno);
 506                return false;
 507             }
 508 
 509             if (bind(xb->socketToXmlBlasterUdp, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
 510                if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
 511                   "Failed binding local port (in UDP) -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
 512                   localHostName, localPort);
 513                return false;
 514             }
 515             if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 516                "Bound local UDP port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
 517                localHostName, localPort);
 518 
 519             if ((ret=connect(xb->socketToXmlBlasterUdp, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) == -1) {
 520                char errnoStr[MAX_ERRNO_LEN];
 521                xb_strerror(errnoStr, MAX_ERRNO_LEN, errno);
 522                strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 523                SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 524                         "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP), ret=%d, %s",
 525                         __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr);
 526                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 527                return false;
 528             }
 529             if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connected to xmlBlaster with UDP");
 530          } /* if (xb->socketToXmlBlasterUdp != -1) */
 531          else {
 532             strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 533             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 534                      "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP) errno=%d",
 535                      __FILE__, __LINE__, serverHostName, servTcpPort, errno);
 536             return false;
 537          }
 538       } /* if (xb->useUdpForOneway) */
 539 
 540    }
 541    else {
 542       strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 543       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 544                "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed errno=%d",
 545                __FILE__, __LINE__, serverHostName, servTcpPort, errno);
 546       return false;
 547    }
 548    xb->isInitialized = true;
 549    return true;
 550 }
 551 
 552 
 553 /**
 554  * Set the queue properties.
 555  * Example:
 556  * <pre>
 557    QueueProperties queueProperties;
 558    strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
 559    strncpy0(queueProperties.nodeId, "clientJoe1081594557415", QUEUE_ID_MAX);
 560    strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
 561    strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
 562    queueProperties.maxNumOfEntries = 10000000L;
 563    queueProperties.maxNumOfBytes = 1000000000LL;
 564  * <pre>
 565  * @param queueProperties The queue configuration,
 566  *        if 0 or parts of it are empty it will be initialized by environment settings
 567  * @return true on success
 568  * @throws exception if already initialized or if initialization fails
 569  */
 570 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception)
 571 {
 572 #ifdef XMLBLASTER_PERSISTENT_QUEUE_TEST
 573    if (checkArgs(xb, "initQueue", false, exception) == false ) return false;
 574    if (xb->queueP) {
 575       char message[XMLBLASTEREXCEPTION_MESSAGE_LEN];
 576       SNPRINTF(message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 577                "[%.100s:%d] The queue is initialized already, call to initQueue() is ignored", __FILE__, __LINE__);
 578       embedException(exception, "user.illegalArgument", message, exception);
 579       xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
 580       return false;
 581    }
 582 
 583    {
 584       QueueProperties tmp;
 585       memset(&tmp, 0, sizeof(QueueProperties));
 586 
 587       if (queueProperties == 0)
 588          queueProperties = &tmp;
 589 
 590       if (*queueProperties->dbName == 0) {
 591          strncpy0(queueProperties->dbName, xb->props->getString(xb->props, "queue/connection/dbName", "xmlBlasterClient.db"), QUEUE_DBNAME_MAX);
 592       }
 593       if (*queueProperties->nodeId == 0) {
 594          strncpy0(queueProperties->nodeId, xb->props->getString(xb->props, "queue/connection/nodeId", "client"), QUEUE_ID_MAX);
 595       }
 596       if (*queueProperties->queueName == 0) {
 597          strncpy0(queueProperties->queueName, xb->props->getString(xb->props, "queue/connection/queueName", "connection_client"), QUEUE_ID_MAX);
 598       }
 599       if (*queueProperties->tablePrefix == 0) {
 600          strncpy0(queueProperties->tablePrefix, xb->props->getString(xb->props, "queue/connection/tablePrefix", "XB_"), QUEUE_PREFIX_MAX);
 601       }
 602       if (queueProperties->maxNumOfEntries == 0) {
 603          queueProperties->maxNumOfEntries = xb->props->getInt(xb->props, "queue/connection/maxEntries", 10000000);
 604       }
 605       if (queueProperties->maxNumOfBytes == 0) {
 606          queueProperties->maxNumOfBytes = xb->props->getInt64(xb->props, "queue/connection/maxBytes", 10000000LL);
 607       }
 608       if (queueProperties->logFp == 0) queueProperties->logFp = xb->log;
 609       if (queueProperties->logLevel == 0) queueProperties->logLevel = xb->logLevel;
 610       if (queueProperties->userObject == 0) queueProperties->userObject = xb->userObject;
 611 
 612       xb->queueP = createQueue(queueProperties, exception);
 613       if (*exception->errorCode != 0) {
 614          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Queue initializeation failed: [%s] %s\n", exception->errorCode, exception->message);
 615          return false;
 616       }
 617       xb->queueP->userObject = xb;
 618    }
 619    return true;
 620 #else
 621    if (queueProperties) {} /* To suppress compiler warning that not used */
 622    strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 623    SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 624             "[%.100s:%d] Queue support is not compiled into the library, please recompile with '-DXMLBLASTER_PERSISTENT_QUEUE=1 and -DXMLBLASTER_PERSISTENT_QUEUE_TEST=1", __FILE__, __LINE__);
 625    xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
 626    return false;
 627 #endif /* XMLBLASTER_PERSISTENT_QUEUE_TEST */
 628 }
 629 
 630 static bool isConnected(XmlBlasterConnectionUnparsed *xb)
 631 {
 632    return (xb->socketToXmlBlaster > -1) ? true : false;
 633 }
 634 
 635 const char *xmlBlasterConnectionUnparsedUsage()
 636 {
 637    /* To prevent compiler warning */
 638    /*   "string length `596' is greater than the length `509' ISO C89 compilers are required to support" */
 639    /* we have a static variable */
 640    enum { SIZE=2048 };
 641    static char usage[SIZE];
 642    strncpy0(usage,
 643       "\n   -dispatch/connection/plugin/socket/hostname [localhost]"
 644       "\n                       Where to find xmlBlaster."
 645       "\n   -dispatch/connection/plugin/socket/port [7607]"
 646       "\n                       The port where xmlBlaster listens."
 647       "\n   -dispatch/connection/plugin/socket/localHostname [NULL]", SIZE/2);
 648    strncat0(usage,
 649       "\n                       Force the local IP, useful on multi homed computers."
 650       "\n   -dispatch/connection/plugin/socket/localPort [0]"
 651       "\n                       Force the local port, useful to tunnel firewalls."
 652       "\n   -dispatch/connection/plugin/socket/compress/type []"
 653 #if XMLBLASTER_ZLIB==1
 654       "\n                       Switch on compression with 'zlib:stream'."
 655 #else
 656       "\n                       No compression support. Try recompiling with with '-DXMLBLASTER_ZLIB=1'."
 657 #endif
 658       "\n   -dispatch/connection/plugin/socket/useUdpForOneway [false]"
 659       "\n                       Use UDP for publishOneway() calls.", SIZE/2);
 660    return usage;
 661 }
 662 
 663 /**
 664  * Used internally only, does no disconnect, only cleanup of socket
 665  */
 666 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb)
 667 {
 668    if (xb != 0 && xb->isConnected(xb)) {
 669 #     if defined(_WINDOWS)
 670       int how = SD_BOTH;   /* SD_BOTH requires Winsock2.h */
 671 #     elif defined(__IPhoneOS__)
 672 #     else
 673       int how = SHUT_RDWR; /* enum SHUT_RDWR = 2 */
 674 #     endif
 675 
 676 #ifdef __IPhoneOS__
 677       {
 678          CFReadStreamRef readStream = xb->readStream;
 679          if (readStream != nil) {
 680                  xb->readStream = nil;
 681                  CFReadStreamClose(readStream);
 682                  CFRelease(readStream);
 683          }
 684       }
 685       {
 686          CFWriteStreamRef writeStream = xb->writeStream;
 687          if (writeStream != nil) {
 688             xb->writeStream = nil;
 689             CFWriteStreamClose(writeStream);
 690             CFRelease(writeStream);
 691             xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
 692                             "shutdown() CFStreams were cosed=%d", writeStream);
 693          }
 694       }
 695 #else
 696       if (xb->socketToXmlBlaster != -1 && xb->socketToXmlBlaster != 0) {
 697          if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 698             "shutdown() socketToXmlBlaster=%d socketToXmlBlasterUdp=%d", xb->socketToXmlBlaster, xb->socketToXmlBlasterUdp);
 699          shutdown(xb->socketToXmlBlaster, how);
 700          closeSocket(xb->socketToXmlBlaster);
 701          xb->socketToXmlBlaster = -1;
 702       }
 703       if (xb->socketToXmlBlasterUdp != -1) {
 704          shutdown(xb->socketToXmlBlasterUdp, how);
 705          closeSocket(xb->socketToXmlBlasterUdp);
 706          xb->socketToXmlBlasterUdp = -1;
 707       }
 708 #endif
 709    }
 710 
 711 
 712 }
 713 
 714 /**
 715  * Send a message over the socket to xmlBlaster.
 716  * @param xb The this pointer
 717  * @param methodName The name of the remote method to invoke e.g. "connect"
 718  * @param msgType The type of message: INVOKE, RESPONSE, EXCEPTION
 719  * @param data The message payload to send, we take a clone so you can do with it what you want
 720  * @param dataLen The length of data in bytes
 721  * @param responseSocketDataHolder The returned data, you need to free it with free(response->data) if we returned true.
 722  *        Supply NULL for oneway messages.
 723  * @param exception The exception struct, exception->errorCode is filled on exception.
 724  *        You need to supply it.
 725  * @param udp Whether to use UDP or TCP. Supply true for UDP.
 726  * @return true if OK and response is filled (if not oneway or exception or response itself)<br />
 727            false on error and exception is filled
 728  */
 729 static bool sendData(XmlBlasterConnectionUnparsed *xb,
 730               const char * const methodName,
 731               enum XMLBLASTER_MSG_TYPE_ENUM msgType,
 732               const char *data_,
 733               size_t dataLen_,
 734               SocketDataHolder *responseSocketDataHolder,
 735               XmlBlasterException *exception,
 736               bool udp)
 737 {
 738    ssize_t numSent;
 739    size_t rawMsgLen = 0;
 740    char *rawMsg = (char *)0;
 741    char *rawMsgStr;
 742    MsgRequestInfo *requestInfoP;
 743    MsgRequestInfo requestInfo;
 744    memset(&requestInfo, 0, sizeof(MsgRequestInfo));
 745    requestInfo.responseMutexIsValid = false; /* Remember when the client thread has created the mutex */
 746    if (data_ == 0) {
 747       data_ = "";
 748       dataLen_ = 0;
 749    }
 750 
 751    if (exception == 0) {
 752       xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception to sendData()", __FILE__, __LINE__);
 753       return false;
 754    }
 755    initializeXmlBlasterException(exception);
 756 
 757    if (responseSocketDataHolder)
 758       memset(responseSocketDataHolder, 0, sizeof(SocketDataHolder));
 759 
 760    if (!xb->isConnected(xb)) {
 761       strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 762       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] No connection to xmlBlaster", __FILE__, __LINE__);
 763       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 764       return false;
 765    }
 766 
 767    if (strcmp(XMLBLASTER_CONNECT, methodName) && strlen(xb->secretSessionId) < 1) {
 768       strncpy0(exception->errorCode, "user.notConnected", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 769       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please call connect() before invoking '%s'", __FILE__, __LINE__, methodName);
 770       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 771       return false;
 772    }
 773 
 774    getTimestampStr(requestInfo.requestIdStr, MAX_REQUESTID_LEN);
 775 
 776    requestInfo.methodName = methodName;
 777    if (xb->preSendEvent != 0) {
 778       /* A callback function pointer is registered to be notified just before sending */
 779       XmlBlasterBlob blob;
 780       blobcpyAlloc(&blob, data_, dataLen_); /* Take a clone, the preSendEvent() function may manipulate it */
 781       requestInfo.blob.dataLen = blob.dataLen;
 782       requestInfo.blob.data = blob.data;
 783       requestInfo.xa = xb->preSendEvent_userP;
 784       requestInfoP = xb->preSendEvent(&requestInfo, exception);
 785       if (*exception->message != 0) {
 786          if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 787             "Re-throw exception from preSendEvent errorCode=%s message=%s", exception->errorCode, exception->message);
 788          return false;
 789       }
 790       if (requestInfoP == 0) {
 791          strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 792          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR: returning requestInfo 0 without exception is not supported, please correct your preSendEvent() function.", __FILE__, __LINE__);
 793          if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 794          return false;
 795       }
 796       if (blob.data != requestInfoP->blob.data) {
 797          /* The callback function has changed/manipulated the user data */
 798          freeBlobHolderContent(&blob);
 799       }
 800       rawMsg = encodeSocketMessage(msgType, requestInfo.requestIdStr, requestInfo.methodName, xb->secretSessionId,
 801                              requestInfoP->blob.data, requestInfoP->blob.dataLen, xb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen);
 802       freeBlobHolderContent(&requestInfoP->blob);
 803    }
 804    else {
 805       rawMsg = encodeSocketMessage(msgType, requestInfo.requestIdStr, requestInfo.methodName, xb->secretSessionId,
 806                              data_, dataLen_, xb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen);
 807    }
 808 
 809    /* AWARE:
 810     * From now on requestInfoP is used by callback thread
 811     * (was added by successful xb->preSendEvent() above)
 812     * If we leave sendData() this is destroyed (requestInfoP is on stack!)
 813     */
 814 
 815    /* send the header ... */
 816    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Lowlevel writing data to socket ...");
 817    numSent = xb->writeToSocket.writeToSocketFuncP(xb->writeToSocket.userP, udp ? xb->socketToXmlBlasterUdp : xb->socketToXmlBlaster, rawMsg, (int)rawMsgLen);
 818    if (numSent == -1) {
 819       if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
 820                                    "Lost connection to xmlBlaster server");
 821       xmlBlasterConnectionShutdown(xb);
 822       strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 823       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Lost connection to xmlBlaster server", __FILE__, __LINE__);
 824       free(rawMsg);
 825       if (xb->postSendEvent != 0) {
 826          requestInfo.rollback = true;
 827          requestInfoP = xb->postSendEvent(&requestInfo, exception);
 828       }
 829       return false;
 830    }
 831 
 832    if (numSent != (int)rawMsgLen) {
 833       if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
 834          "Sent only %d bytes from %u", numSent, rawMsgLen);
 835       strncpy0(exception->errorCode, "user.connect", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 836       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR Sent only %ld bytes from %lu", __FILE__, __LINE__, (long)numSent, (unsigned long)rawMsgLen);
 837       free(rawMsg);
 838       if (xb->postSendEvent != 0) {
 839          requestInfo.rollback = true;
 840          requestInfoP = xb->postSendEvent(&requestInfo, exception);
 841       }
 842       return false;
 843    }
 844    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Lowlevel writing data to socket done.");
 845 
 846    free(rawMsg);
 847    rawMsg = 0;
 848 
 849    if (xbl_isOneway(msgType, methodName))
 850       return true; /* Responses and exceptions are oneway */
 851 
 852    if (responseSocketDataHolder) { /* if not oneway read the response message */
 853 
 854       if (xb->postSendEvent != 0) {
 855          /* A callback function pointer is registered to be notified just after sending */
 856          requestInfo.responseType = 0;
 857          requestInfo.blob.dataLen = 0;
 858          requestInfo.blob.data = 0;
 859 
 860          /* !!! Here the thread blocks until a response from CallbackServer arrives !!! */
 861          requestInfoP = xb->postSendEvent(&requestInfo, exception);
 862          if (*exception->message != 0) {
 863             if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 864                "Re-throw exception from preSendEvent errorCode=%s message=%s", exception->errorCode, exception->message);
 865             return false;
 866          }
 867          if (requestInfoP == 0) {
 868             printf("[XmlBlasterConnectionUnparsed] TODO: returning requestInfo 0 is not implemented");
 869          }
 870          /* TODO: Possible race condition */
 871          responseSocketDataHolder->type = requestInfoP->responseType;
 872          responseSocketDataHolder->version = XMLBLASTER_SOCKET_VERSION;
 873          strncpy0(responseSocketDataHolder->requestId, requestInfo.requestIdStr, MAX_REQUESTID_LEN);
 874          strncpy0(responseSocketDataHolder->methodName, methodName, MAX_METHODNAME_LEN);
 875 
 876          if (requestInfoP->responseType == MSG_TYPE_EXCEPTION) { /* convert XmlBlasterException thrown from remote */
 877             convertToXmlBlasterException(&requestInfoP->blob, exception, xb->logLevel >= XMLBLASTER_LOG_DUMP);
 878             freeBlobHolderContent(&requestInfoP->blob);
 879             return false;
 880          }
 881          else {
 882             responseSocketDataHolder->blob.dataLen = requestInfoP->blob.dataLen;
 883             responseSocketDataHolder->blob.data = requestInfoP->blob.data;     /* The responseSocketDataHolder is now responsible to free(responseSocketDataHolder->blob.data) */
 884          }
 885          if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 886             "requestId '%s' returns dataLen=%d", requestInfo.requestIdStr, requestInfoP->blob.dataLen);
 887       }
 888       else {
 889          /* Wait on the response ourself */
 890          if (getResponse(xb, responseSocketDataHolder, exception, udp) == false) {  /* false on EOF */
 891             xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Lost connection to xmlBlaster server");
 892             xmlBlasterConnectionShutdown(xb);
 893             strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 894             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Lost connection to xmlBlaster server", __FILE__, __LINE__);
 895             return false;
 896          }
 897          if (responseSocketDataHolder->type == MSG_TYPE_EXCEPTION) { /* convert XmlBlasterException */
 898             convertToXmlBlasterException(&responseSocketDataHolder->blob, exception, xb->logLevel >= XMLBLASTER_LOG_DUMP);
 899             freeBlobHolderContent(&responseSocketDataHolder->blob);
 900             if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 901                "Re-throw exception from response errorCode=%s message=%s", exception->errorCode, exception->message);
 902             return false;
 903          }
 904       }
 905 
 906       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) {
 907          rawMsgStr = blobDump(&responseSocketDataHolder->blob);
 908          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Received response msgLen=%u type=%c version=%c requestId=%s methodName=%s dateLen=%u data='%.100s ...'",
 909                   responseSocketDataHolder->msgLen, responseSocketDataHolder->type, responseSocketDataHolder->version, responseSocketDataHolder->requestId,
 910                   responseSocketDataHolder->methodName, responseSocketDataHolder->blob.dataLen, rawMsgStr);
 911          freeBlobDump(rawMsgStr);
 912       }
 913    }
 914 
 915    return true;
 916 }
 917 
 918 /**
 919  * Parse the returned message from xmlBlaster.
 920  * This method blocks until data arrives.
 921  * <br />
 922  * The responseSocketDataHolder holds all informations about the returned data from xmlBlaster,
 923  * on error the exception struct is filled.
 924  *
 925  * @param responseSocketDataHolder You need to free(responseSocketDataHolder->data) if return is 'true'.
 926  * @param exception Contains the exception thrown (on error only *exception->errorCode!=0)
 927  * @return true if OK or on exception, false on EOF
 928  */
 929 static bool getResponse(XmlBlasterConnectionUnparsed *xb, SocketDataHolder *responseSocketDataHolder, XmlBlasterException *exception, bool udp)
 930 {
 931    bool stopListenLoop = false;
 932    return parseSocketData(xb->socketToXmlBlaster, &xb->readFromSocket, responseSocketDataHolder, exception, &stopListenLoop, udp, xb->logLevel >= XMLBLASTER_LOG_DUMP);
 933 }
 934 
 935 /**
 936  * Connect to the server.
 937  * @param qos The QoS to connect
 938  * @param The exception struct, exception->errorCode is filled on exception
 939  * @return The raw ConnectReturnQos XML string returned from xmlBlaster,
 940  *         only NULL if an exception is thrown.
 941  *         You need to free() it
 942  * @return The ConnectReturnQos raw xml string, you need to free() it
 943  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.connect.html
 944  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 945  */
 946 static char *xmlBlasterConnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception)
 947 {
 948    SocketDataHolder responseSocketDataHolder;
 949    char *response;
 950    char *qos2;
 951    char timestampStr[256];
 952 
 953    if (qos == 0) {
 954       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 955       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterConnect()", __FILE__, __LINE__);
 956       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 957       return (char *)0;
 958    }
 959 
 960    if (initConnection(xb, exception) == false) {
 961       return (char *)0;
 962    }
 963 
 964    /** Append current client UTC timestamp */
 965    qos2 = strcpyAlloc(qos);
 966    trimEnd(qos2);
 967    if (endsWith(qos2, "</qos>")) {
 968       getCurrentLocalIsoTimestampStr(timestampStr, 200);
 969       qos2[strlen(qos2) - 6] = 0;
 970       strcatAlloc(&qos2, "<clientProperty name='__UTC'>");
 971       strcatAlloc(&qos2, timestampStr);
 972       strcatAlloc(&qos2, "</clientProperty></qos>");
 973    }
 974 
 975    if (sendData(xb, XMLBLASTER_CONNECT, MSG_TYPE_INVOKE, (const char *)qos2,
 976                 (qos2 == (const char *)0) ? 0 : strlen(qos2),
 977                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
 978       free(qos2);
 979       return (char *)0;
 980    }
 981    free(qos2);
 982 
 983    response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
 984    freeBlobHolderContent(&responseSocketDataHolder.blob);
 985 
 986    /* Extract secret session ID from ConnectReturnQos */
 987    *xb->secretSessionId = 0;
 988    {
 989       const char *pEnd = (const char *)0;
 990       const char *pStart = strstr(response, "sessionId='");
 991       if (pStart) {
 992          pStart += strlen("sessionId='");
 993          pEnd = strstr(pStart, "'");
 994          if (pEnd) {
 995             int len = (int)(pEnd - pStart + 1);
 996             if (len >= MAX_SECRETSESSIONID_LEN) {
 997                strncpy0(exception->errorCode, "user.response", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 998                SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR Received too long secret sessionId with len=%d, please change setting MAX_SECRETSESSIONID_LEN", __FILE__, __LINE__, len);
 999                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1000             }
1001             strncpy0(xb->secretSessionId, pStart, len);
1002          }
1003       }
1004    }
1005 
1006    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1007       "Got response for connect(secretSessionId=%s)", xb->secretSessionId);
1008 
1009    return response;
1010 }
1011 
1012 /**
1013  * Disconnect from server.
1014  * @param qos The QoS to disconnect
1015  * @param The exception struct, exception->errorCode is filled on exception
1016  * @return false on exception
1017  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.disconnect.html
1018  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1019  */
1020 static bool xmlBlasterDisconnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception)
1021 {
1022    SocketDataHolder responseSocketDataHolder;
1023 
1024    if (checkArgs(xb, XMLBLASTER_DISCONNECT, true, exception) == false ) return 0;
1025 
1026    if (sendData(xb, XMLBLASTER_DISCONNECT, MSG_TYPE_INVOKE, (const char *)qos,
1027                 (qos == (const char *)0) ? 0 : strlen(qos),
1028                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1029       return false;
1030    }
1031 
1032    freeBlobHolderContent(&responseSocketDataHolder.blob);
1033 
1034    xmlBlasterConnectionShutdown(xb);
1035    *xb->secretSessionId = 0;
1036    return true;
1037 }
1038 
1039 
1040 #ifdef XMLBLASTER_PERSISTENT_QUEUE_TEST
1041 /**
1042  * Extracts the priority from the given QoS.
1043  * @return NORM=5 on error
1044  */
1045 static int parsePriority(const char *qos) {
1046    char *pPrio, *pPrioEnd;
1047    /*const int PRIORITY_MAXLEN = 10;*/
1048    #define PRIORITY_MAXLEN 10 /* To be backward compatible to C90 */
1049    char prioStr[PRIORITY_MAXLEN];
1050    int len = 1;
1051    int prio = 5;
1052    const int lenPrio=strlen("<priority>");
1053 
1054    if (qos == 0) return prio;
1055 
1056    pPrio = strstr(qos, "<priority>");
1057    if (pPrio == 0) return prio;
1058 
1059    pPrioEnd = strstr(qos, "</priority>");
1060    if (pPrioEnd == 0) return prio;
1061 
1062    len = pPrioEnd-pPrio-lenPrio;
1063    if (len >= PRIORITY_MAXLEN) {
1064       return prio;
1065    }
1066    strncpy(prioStr, pPrio+lenPrio, len);
1067    *(prioStr+len) = 0;
1068    sscanf(prioStr, "%d", &prio); /* on error prio remains 5, white spaces are stripped by sscanf */
1069    return prio;
1070 }
1071 
1072 /**
1073  * Puts an entry into the client side queue.
1074  * @param exception Can be prefilled with an original exception which will be embedded
1075  * @return 0 on failure, else an allocated "<qos><state id='OK' info='QUEUED'/></qos>" which the caller needs to free()
1076  */
1077 static char *xmlBlasterQueuePut(XmlBlasterConnectionUnparsed *xb, int priority, BlobHolder *blob, XmlBlasterException *exception)
1078 {
1079    QueueEntry queueEntry;
1080    XmlBlasterException queueException;
1081 
1082    QueueProperties *queuePropertiesP = 0; /* 0: read configuration from environment */
1083    /*
1084    QueueProperties queueProperties;
1085    memset(&queueProperties, 0, sizeof(QueueProperties));
1086    queuePropertiesP = &queueProperties;
1087    strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
1088    strncpy0(queueProperties.nodeId, "clientJoe1081594557415", QUEUE_ID_MAX);
1089    strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
1090    strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
1091    queueProperties.maxNumOfEntries = 10000000L;
1092    queueProperties.maxNumOfBytes = 1000000000LL;
1093    queueProperties.logFp = xb->log;
1094    queueProperties.logLevel = xb->logLevel;
1095    queueProperties.userObject = xb->userObject;
1096    queueP = createQueue(&queueProperties, &queueException);
1097    */
1098 
1099    if (xb->queueP == 0) {
1100       if (xb->initQueue(xb, queuePropertiesP, exception) == false)
1101          return 0;
1102    }
1103 
1104    queueEntry.priority = priority;
1105    queueEntry.isPersistent = true;
1106    queueEntry.uniqueId = getTimestamp();
1107    strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
1108    queueEntry.embeddedBlob.data = blob->data;
1109    queueEntry.embeddedBlob.dataLen = blob->dataLen;
1110 
1111    xb->queueP->put(xb->queueP, &queueEntry, &queueException);
1112    if (*queueException.errorCode != 0) {
1113       embedException(exception, queueException.errorCode, queueException.message, exception);
1114       xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Put to queue failed: [%s] %s\n", exception->errorCode, exception->message);
1115       return 0;
1116    }
1117    *exception->errorCode = 0; /* Successfully queued: no error */
1118    return strcpyAlloc("<qos><state id='OK' info='QUEUED'/></qos>");
1119 }
1120 #endif /*XMLBLASTER_PERSISTENT_QUEUE_TEST==1*/
1121 
1122 /**
1123  * Publish a message to the server.
1124  * @return The raw XML string returned from xmlBlaster, only NULL if an exception is thrown
1125  *         You need to free() it
1126  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
1127  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1128  */
1129 static char *xmlBlasterPublish(XmlBlasterConnectionUnparsed *xb, MsgUnit *msgUnit, XmlBlasterException *exception)
1130 {
1131    SocketDataHolder responseSocketDataHolder;
1132    char *response = 0;
1133 
1134    BlobHolder blob = encodeMsgUnit(msgUnit, xb->logLevel >= XMLBLASTER_LOG_DUMP);
1135    msgUnit->responseQos = 0; /* In case no initial memset(&msgUnit, 0, sizeof(MsgUnit)); was made */
1136 
1137    if (checkArgs(xb, "publish", true, exception) == false ) return 0;
1138 
1139    msgUnit->responseQos = 0; /* Initialize properly */
1140 
1141    if (sendData(xb, XMLBLASTER_PUBLISH, MSG_TYPE_INVOKE, blob.data, blob.dataLen,
1142                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1143 
1144 #     ifdef XMLBLASTER_PERSISTENT_QUEUE_TEST /* TEST CODE */
1145          if (strstr(exception->errorCode, "user.notConnected") != 0 ||
1146              strstr(exception->errorCode, "communication.noConnection") != 0) { /* On communication problem queue messages */
1147             int priority = parsePriority(msgUnit->qos);
1148             response = xmlBlasterQueuePut(xb, priority, &blob, exception);
1149             /* NO: msgUnit->responseQos = response; otherwise a free(msgUnit) will free the response as well */
1150          }
1151 #     endif
1152 
1153       free(blob.data);
1154       return response;
1155    }
1156    free(blob.data);
1157 
1158    response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
1159    freeBlobHolderContent(&responseSocketDataHolder.blob);
1160 
1161    return response;
1162 }
1163 
1164 /**
1165  * Publish a message array in a bulk to the server.
1166  * @return The raw XML string array returned from xmlBlaster, only NULL if an exception is thrown
1167  *         You need to free() it
1168  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
1169  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1170  */
1171 static QosArr *xmlBlasterPublishArr(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
1172 {
1173    size_t i;
1174    SocketDataHolder responseSocketDataHolder;
1175    QosArr *response = 0;
1176 
1177    BlobHolder blob = encodeMsgUnitArr(msgUnitArr, xb->logLevel >= XMLBLASTER_LOG_DUMP);
1178 
1179    if (checkArgs(xb, "publishArr", true, exception) == false ) return 0;
1180 
1181    for (i=0; i<msgUnitArr->len; i++)
1182       msgUnitArr->msgUnitArr[i].responseQos = 0; /* Initialize properly */
1183 
1184    if (sendData(xb, XMLBLASTER_PUBLISH, MSG_TYPE_INVOKE, blob.data, blob.dataLen,
1185                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1186       free(blob.data);
1187       return 0;
1188    }
1189    free(blob.data);
1190 
1191    response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
1192    freeBlobHolderContent(&responseSocketDataHolder.blob);
1193 
1194    return response;
1195 }
1196 
1197 /**
1198  * Publish oneway a message array in a bulk to the server without receiving an ACK.
1199  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
1200  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1201  */
1202 static void xmlBlasterPublishOneway(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
1203 {
1204    size_t i;
1205    SocketDataHolder responseSocketDataHolder;
1206 
1207    BlobHolder blob = encodeMsgUnitArr(msgUnitArr, xb->logLevel >= XMLBLASTER_LOG_DUMP);
1208 
1209    if (checkArgs(xb, "publishOneway", true, exception) == false ) return;
1210 
1211    for (i=0; i<msgUnitArr->len; i++) {
1212       msgUnitArr->msgUnitArr[i].responseQos = 0; /* Initialize properly */
1213    }
1214 
1215    /*
1216    if (!xb->useUdpForOneway) {
1217       strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1218       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] UDP not enabled, use -dispatch/connection/plugin/socket/enableUDP true", __FILE__, __LINE__);
1219       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1220       free(blob.data);
1221       return;
1222    }
1223    */
1224 
1225    if (sendData(xb, XMLBLASTER_PUBLISH_ONEWAY, MSG_TYPE_INVOKE, blob.data, blob.dataLen,
1226                 &responseSocketDataHolder, exception, xb->useUdpForOneway) == false) {
1227       free(blob.data);
1228       return;
1229    }
1230    free(blob.data);
1231    freeBlobHolderContent(&responseSocketDataHolder.blob); /* Could be ommitted for oneway */
1232 }
1233 
1234 /**
1235  * Subscribe a message.
1236  * @return The raw XML string returned from xmlBlaster, only NULL if an exception is thrown
1237  *         You need to free() it
1238  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.subscribe.html
1239  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1240  */
1241 static char *xmlBlasterSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
1242 {
1243    size_t qosLen, keyLen, totalLen;
1244    char *data;
1245    size_t currpos = 0;
1246    SocketDataHolder responseSocketDataHolder;
1247    char *response;
1248 
1249    if (checkArgs(xb, "subscribe", true, exception) == false ) return 0;
1250 
1251    if (key == 0) {
1252       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1253       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterSubscribe()", __FILE__, __LINE__);
1254       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1255       return (char *)0;
1256    }
1257 
1258    if (qos == (const char *)0) {
1259       qos = "";
1260    }
1261    qosLen = strlen(qos);
1262    keyLen = strlen(key);
1263 
1264    totalLen = qosLen + 1 + keyLen + 1;
1265 
1266    data = (char *)malloc(totalLen);
1267 
1268    memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
1269    currpos += qosLen+1;
1270 
1271    memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
1272    currpos += keyLen+1;
1273 
1274    if (sendData(xb, XMLBLASTER_SUBSCRIBE, MSG_TYPE_INVOKE, data, totalLen,
1275                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1276       free(data);
1277       return (char *)0;
1278    }
1279    free(data);
1280 
1281    response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
1282    freeBlobHolderContent(&responseSocketDataHolder.blob);
1283 
1284    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1285       "Got response for subscribe(): %s", response);
1286 
1287    return response;
1288 }
1289 
1290 /**
1291  * UnSubscribe a message from the server.
1292  * @return The raw QoS XML strings returned from xmlBlaster, only NULL if an exception is thrown
1293  *         You need to free it with freeQosArr() after usage
1294  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.unSubscribe.html
1295  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1296  */
1297 static QosArr *xmlBlasterUnSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
1298 {
1299    size_t qosLen, keyLen, totalLen;
1300    char *data;
1301    size_t currpos = 0;
1302    SocketDataHolder responseSocketDataHolder;
1303    QosArr *response;
1304 
1305    if (checkArgs(xb, "unSubscribe", true, exception) == false ) return 0;
1306 
1307    if (key == 0) {
1308       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1309       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterUnSubscribe()", __FILE__, __LINE__);
1310       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1311       return (QosArr *)0;
1312    }
1313 
1314    if (qos == (const char *)0) {
1315       qos = "";
1316    }
1317    qosLen = strlen(qos);
1318    keyLen = strlen(key);
1319 
1320    totalLen = qosLen + 1 + keyLen + 1;
1321 
1322    data = (char *)malloc(totalLen);
1323 
1324    memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
1325    currpos += qosLen+1;
1326 
1327    memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
1328    currpos += keyLen+1;
1329 
1330    if (sendData(xb, XMLBLASTER_UNSUBSCRIBE, MSG_TYPE_INVOKE, data, totalLen,
1331                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1332       free(data);
1333       return (QosArr *)0;
1334    }
1335    free(data);
1336 
1337    response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
1338    freeBlobHolderContent(&responseSocketDataHolder.blob);
1339 
1340    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) {
1341       size_t ii;
1342       for (ii=0; ii<response->len; ii++) {
1343          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1344             "Got response for unSubscribe(): %s", response->qosArr[ii]);
1345       }
1346    }
1347 
1348    return response;
1349 }
1350 
1351 /**
1352  * Erase a message from the server.
1353  * @return A struct holding the raw QoS XML strings returned from xmlBlaster,
1354  *         only NULL if an exception is thrown.
1355  *         You need to freeQosArr() it
1356  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.erase.html
1357  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1358  */
1359 static QosArr *xmlBlasterErase(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
1360 {
1361    size_t qosLen, keyLen, totalLen;
1362    char *data;
1363    size_t currpos = 0;
1364    SocketDataHolder responseSocketDataHolder;
1365    QosArr *response;
1366 
1367    if (checkArgs(xb, "erase", true, exception) == false ) return 0;
1368 
1369    if (key == 0) {
1370       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1371       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterErase()", __FILE__, __LINE__);
1372       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1373       return (QosArr *)0;
1374    }
1375 
1376    if (qos == (const char *)0) {
1377       qos = "";
1378    }
1379    qosLen = strlen(qos);
1380    keyLen = strlen(key);
1381 
1382    totalLen = qosLen + 1 + keyLen + 1;
1383 
1384    data = (char *)malloc(totalLen);
1385 
1386    memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
1387    currpos += qosLen+1;
1388 
1389    memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
1390    currpos += keyLen+1;
1391 
1392    if (sendData(xb, XMLBLASTER_ERASE, MSG_TYPE_INVOKE, data, totalLen,
1393                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1394       free(data);
1395       return (QosArr *)0;
1396    }
1397    free(data);
1398 
1399    response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
1400    freeBlobHolderContent(&responseSocketDataHolder.blob);
1401 
1402    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) {
1403       size_t ii;
1404       for (ii=0; ii<response->len; ii++) {
1405          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1406             "Got response for erase(): %s", response->qosArr[ii]);
1407       }
1408    }
1409 
1410    return response;
1411 }
1412 
1413 /**
1414  * Ping the server.
1415  * @param qos The QoS or 0
1416  * @param exception *errorCode!=0 on failure
1417  * @return The ping return QoS raw xml string, you need to free() it
1418  *         or 0 on failure (in which case *exception.errorCode!='\0')
1419  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1420  */
1421 static char *xmlBlasterPing(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception)
1422 {
1423    SocketDataHolder responseSocketDataHolder;
1424    char *response;
1425 
1426    if (checkArgs(xb, "ping", true, exception) == false ) return 0;
1427 
1428    if (sendData(xb, XMLBLASTER_PING, MSG_TYPE_INVOKE, (const char *)qos,
1429                 (qos == (const char *)0) ? 0 : strlen(qos),
1430                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1431       return (char *)0;
1432    }
1433 
1434    response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
1435    freeBlobHolderContent(&responseSocketDataHolder.blob);
1436    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1437       "Got response for ping '%s'", response);
1438    return response;
1439 }
1440 
1441 /**
1442  * Get a message.
1443  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.get.html
1444  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
1445  * @return NULL on error, please check exception in such a case, you need to
1446  *         call freeMsgUnitArr(msgUnitArr); after usage.
1447  */
1448 static MsgUnitArr *xmlBlasterGet(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
1449 {
1450    size_t qosLen, keyLen, totalLen;
1451    char *data;
1452    size_t currpos = 0;
1453    SocketDataHolder responseSocketDataHolder;
1454    MsgUnitArr *msgUnitArr = 0;
1455 
1456    if (key == 0) {
1457       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1458       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterGet()", __FILE__, __LINE__);
1459       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
1460       return (MsgUnitArr *)0;
1461    }
1462 
1463    if (qos == (const char *)0) qos = "";
1464    qosLen = strlen(qos);
1465    keyLen = strlen(key);
1466 
1467    totalLen = qosLen + 1 + keyLen + 1;
1468 
1469    data = (char *)malloc(totalLen);
1470 
1471    memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
1472    currpos += qosLen+1;
1473 
1474    memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
1475    currpos += keyLen+1;
1476 
1477    if (sendData(xb, XMLBLASTER_GET, MSG_TYPE_INVOKE, data, totalLen,
1478                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
1479       free(data);
1480       return (MsgUnitArr *)0; /* exception is filled with details */
1481    }
1482    free(data);
1483 
1484    /* Now process the returned messages */
1485 
1486    msgUnitArr = parseMsgUnitArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
1487    freeBlobHolderContent(&responseSocketDataHolder.blob);
1488 
1489    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1490       "Returned %u messages for get()", msgUnitArr->len);
1491 
1492    return msgUnitArr;
1493 }
1494 
1495 /**
1496  * Write uncompressed to socket (not thread safe)
1497  */
1498 static ssize_t writenPlain(void *userP, const int fd, const char *ptr, const size_t nbytes) {
1499    XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
1500    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,  "writenPlain(%u)", nbytes);
1501    return writen(fd, ptr, nbytes);
1502 }
1503 
1504 /**
1505  * Compress data and send to socket.
1506  */
1507 static ssize_t writenCompressed(void *userP, const int fd, const char *ptr, const size_t nbytes) {
1508    XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
1509    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,  "writenCompressed(%u)", nbytes);
1510    return xmlBlaster_writenCompressed(xb->zlibWriteBuf, fd, ptr, nbytes);
1511 }
1512 
1513 /**
1514  * Write uncompressed to socket (not thread safe)
1515  */
1516 static ssize_t readnPlain(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1517    XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
1518    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,  "readnPlain(%u)", nbytes);
1519    return readn(fd, ptr, nbytes, fpNumRead, userP2);
1520 }
1521 
1522 /**
1523  * Compress data and send to socket.
1524  */
1525 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1526    XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
1527    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,  "readnCompressed(%u)", nbytes);
1528    return xmlBlaster_readnCompressed(xb->zlibReadBuf, fd, ptr, nbytes, fpNumRead, userP2);
1529 }
1530 
1531 /**
1532  * Checks the given arguments to be valid.
1533  * @param methodName For logging
1534  * @param checkIsConnected If true does check the connection state as well
1535  * @return false if the parameters are not usable,
1536  *         in this case 'exception' is filled with detail informations
1537  */
1538 static bool checkArgs(XmlBlasterConnectionUnparsed *xb, const char *methodName, bool checkIsConnected, XmlBlasterException *exception)
1539 {
1540    if (xb == 0) {
1541       char *stack = getStackTrace(10);
1542       printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %s() %s",
1543                __FILE__, __LINE__, methodName, stack);
1544       free(stack);
1545       return false;
1546    }
1547 
1548    if (exception == 0) {
1549       char *stack = getStackTrace(10);
1550       xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception pointer to %s() %s",
1551               __FILE__, __LINE__, methodName, stack);
1552       free(stack);
1553       return false;
1554    }
1555 
1556    if (checkIsConnected) {
1557       if (!xb->isConnected(xb)) {
1558          char *stack = getStackTrace(10);
1559          strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1560          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
1561                   "[%.100s:%d] Not connected to xmlBlaster, %s() failed %s",
1562                    __FILE__, __LINE__, methodName, stack);
1563          free(stack);
1564          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
1565          return false;
1566       }
1567    }
1568 
1569    return true;
1570 }


syntax highlighted by Code2HTML, v. 0.9.1