socket/XmlBlasterConnectionUnparsed.c

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002 Name:      XmlBlasterConnectionUnparsed.c
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Wraps raw socket connection to xmlBlaster
00006            for complete synchronous xmlBlaster access,
00007            without callbacks and not threading necessary
00008 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
00009 See:       http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
00010 -----------------------------------------------------------------------------*/
00011 #include <stdio.h>
00012 #include <stdlib.h>
00013 #include <string.h>
00014 #include <ctype.h> /* isalpha() */
00015 #if defined(WINCE)
00016 #  if defined(XB_USE_PTHREADS)
00017 #     include <pthreads/pthread.h>
00018 #  else
00019       /*#include <pthreads/need_errno.h> */
00020       static int errno=0; /* single threaded workaround*/
00021 #  endif
00022 #else
00023 #  include <errno.h>
00024 #  include <sys/types.h>
00025 #endif
00026 #include <socket/xmlBlasterSocket.h>
00027 #include <socket/xmlBlasterZlib.h>
00028 #include <XmlBlasterConnectionUnparsed.h>
00029 #define SOCKET_TCP false
00030 
00031 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception);
00032 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception);
00033 static bool getResponse(XmlBlasterConnectionUnparsed *xb, SocketDataHolder *responseSocketDataHolder, XmlBlasterException *exception, bool udp);
00034 static char *xmlBlasterConnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
00035 static bool xmlBlasterDisconnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
00036 static char *xmlBlasterPublish(XmlBlasterConnectionUnparsed *xb, MsgUnit *msgUnit, XmlBlasterException *exception);
00037 static QosArr *xmlBlasterPublishArr(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
00038 static void xmlBlasterPublishOneway(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
00039 static char *xmlBlasterSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
00040 static QosArr *xmlBlasterUnSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
00041 static QosArr *xmlBlasterErase(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
00042 static MsgUnitArr *xmlBlasterGet(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
00043 static char *xmlBlasterPing(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
00044 static bool isConnected(XmlBlasterConnectionUnparsed *xb);
00045 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb);
00046 static ssize_t writenPlain(void *xb, const int fd, const char *ptr, const size_t nbytes);
00047 static ssize_t writenCompressed(void *xb, const int fd, const char *ptr, const size_t nbytes);
00048 static ssize_t readnPlain(void *xb, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2);
00049 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2);
00050 static bool checkArgs(XmlBlasterConnectionUnparsed *xb, const char *methodName, bool checkIsConnected, XmlBlasterException *exception);
00051 
00058 XmlBlasterConnectionUnparsed *getXmlBlasterConnectionUnparsed(int argc, const char* const* argv) {
00059    XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)calloc(1, sizeof(XmlBlasterConnectionUnparsed));
00060    if (xb == 0) return xb;
00061    xb->argc = argc;
00062    xb->argv = argv;
00063    xb->props = createProperties(xb->argc, xb->argv);
00064    if (xb->props == 0) {
00065       freeXmlBlasterConnectionUnparsed(&xb);
00066       return (XmlBlasterConnectionUnparsed *)0;
00067    }
00068    xb->socketToXmlBlaster = -1;
00069    xb->socketToXmlBlasterUdp = -1;
00070    xb->isInitialized = false;
00071    xb->requestId = 0;
00072    *xb->secretSessionId = 0;
00073    xb->initConnection = initConnection;
00074    xb->initQueue = xmlBlasterInitQueue;
00075    xb->connect = xmlBlasterConnect;
00076    xb->disconnect = xmlBlasterDisconnect;
00077    xb->publish = xmlBlasterPublish;
00078    xb->publishArr = xmlBlasterPublishArr;
00079    xb->publishOneway = xmlBlasterPublishOneway;
00080    xb->subscribe = xmlBlasterSubscribe;
00081    xb->unSubscribe = xmlBlasterUnSubscribe;
00082    xb->erase = xmlBlasterErase;
00083    xb->get = xmlBlasterGet;
00084    xb->ping = xmlBlasterPing;
00085    xb->isConnected = isConnected;
00086    xb->shutdown = xmlBlasterConnectionShutdown;
00087    xb->preSendEvent = 0;
00088    xb->preSendEvent_userP = 0;
00089    xb->postSendEvent = 0;
00090    xb->postSendEvent_userP = 0;
00091    xb->queueP = 0;
00092    xb->logLevel = parseLogLevel(xb->props->getString(xb->props, "logLevel", "WARN"));
00093    xb->log = xmlBlasterDefaultLogging;
00094    xb->logUserP = 0;
00095    xb->useUdpForOneway = false;
00096    xb->writeToSocket.writeToSocketFuncP = 0;
00097    xb->writeToSocket.userP = xb;
00098    xb->zlibWriteBuf = 0;
00099    xb->readFromSocket.readFromSocketFuncP = 0;
00100    xb->readFromSocket.userP = xb;
00101    xb->zlibReadBuf = 0;
00102    return xb;
00103 }
00104 
00105 void freeXmlBlasterConnectionUnparsed(XmlBlasterConnectionUnparsed **xb_)
00106 {
00107    XmlBlasterConnectionUnparsed *xb = *xb_;
00108    if (xb != 0) {
00109       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterConnectionUnparsed 0x%x", xb);
00110       freeProperties(xb->props);
00111       if (xb->zlibWriteBuf) {
00112          xmlBlaster_endZlibWriter(xb->zlibWriteBuf);
00113          free(xb->zlibWriteBuf);
00114          xb->zlibWriteBuf = 0;
00115       }
00116       if (xb->zlibReadBuf) {
00117          xmlBlaster_endZlibReader(xb->zlibReadBuf);
00118          free(xb->zlibReadBuf);
00119          xb->zlibReadBuf = 0;
00120       }
00121       xmlBlasterConnectionShutdown(xb);
00122       free(xb);
00123       *xb_ = 0;
00124    }
00125 }
00126 
00131 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception)
00132 {
00133    const char *servTcpPort = 0;
00134 
00135    struct sockaddr_in xmlBlasterAddr;
00136    struct hostent hostbuf, *hostP = 0;
00137    struct servent *portP = 0;
00138 
00139    size_t hstbuflen=0;
00140 
00141    char serverHostName[256];
00142    char errP[MAX_ERRNO_LEN];
00143 
00144 #if defined(_WINDOWS)
00145    WORD wVersionRequested;
00146    WSADATA wsaData;
00147    int err;
00148    wVersionRequested = MAKEWORD( 2, 2 );
00149    err = WSAStartup( wVersionRequested, &wsaData );
00150    if ( err != 0 ) {
00151       strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00152       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL", __FILE__, __LINE__);
00153       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00154       return false;
00155    }
00156 
00157    if ( LOBYTE( wsaData.wVersion ) != 2 ||
00158    HIBYTE( wsaData.wVersion ) != 2 ) {
00159       WSACleanup( );
00160       strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00161       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL which supports version 2.2", __FILE__, __LINE__);
00162       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00163       return false; 
00164    }
00165 # endif
00166    *errP = 0;
00167 
00168    if (xb->isInitialized) {
00169       return true;
00170    }
00171 
00172    {  /* Switch on compression? */
00173       const char *compressType = xb->props->getString(xb->props, "plugin/socket/compress/type", "");
00174       compressType = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/compress/type", compressType);
00175 
00176       if (!strcmp(compressType, "zlib:stream")) {
00177          
00178          xb->zlibWriteBuf = (XmlBlasterZlibWriteBuffers *)malloc(sizeof(struct XmlBlasterZlibWriteBuffers));
00179          xb->zlibReadBuf = (XmlBlasterZlibReadBuffers *)malloc(sizeof(struct XmlBlasterZlibReadBuffers));
00180 
00181          if (xmlBlaster_initZlibWriter(xb->zlibWriteBuf) != 0/*Z_OK*/) {
00182             if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00183                   "Failed switching on 'plugin/socket/compress/type=%s'", compressType);
00184             strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00185             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00186                      "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'",
00187                      __FILE__, __LINE__, compressType);
00188             free(xb->zlibWriteBuf);
00189             xb->zlibWriteBuf = 0;
00190             free(xb->zlibReadBuf);
00191             xb->zlibReadBuf = 0;
00192             return false;
00193          }
00194 
00195          if (xmlBlaster_initZlibReader(xb->zlibReadBuf) != 0/*Z_OK*/) {
00196             if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00197                   "Failed switching on 'plugin/socket/compress/type=%s'", compressType);
00198             strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00199             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00200                      "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'",
00201                      __FILE__, __LINE__, compressType);
00202             free(xb->zlibWriteBuf);
00203             xb->zlibWriteBuf = 0;
00204             free(xb->zlibReadBuf);
00205             xb->zlibReadBuf = 0;
00206             return false;
00207          }
00208 
00209          if (xb->logLevel>=XMLBLASTER_LOG_DUMP) {
00210             xb->zlibWriteBuf->debug = true;
00211             xb->zlibReadBuf->debug = true;
00212          }
00213 
00214          if (!xb->writeToSocket.writeToSocketFuncP) {  /* Accept setting from XmlBlasterAccessUnparsed */
00215             xb->writeToSocket.writeToSocketFuncP = writenCompressed;
00216             xb->readFromSocket.readFromSocketFuncP = readnCompressed;
00217          }
00218       }
00219       else {
00220          if (strcmp(compressType, "")) {
00221             xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode.", compressType);
00222          }
00223          if (!xb->writeToSocket.writeToSocketFuncP) {  /* Accept setting from XmlBlasterAccessUnparsed */
00224             xb->writeToSocket.writeToSocketFuncP = writenPlain;
00225             xb->readFromSocket.readFromSocketFuncP = readnPlain;
00226          }
00227       }
00228    }
00229 
00230 
00231    servTcpPort = xb->props->getString(xb->props, "plugin/socket/port", "7607");
00232    servTcpPort = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/port", servTcpPort);
00233 
00234    strncpy0(serverHostName, "localhost", 250);
00235    gethostname(serverHostName, 250);
00236    {
00237       const char *hn = xb->props->getString(xb->props, "plugin/socket/hostname", serverHostName);
00238       memmove(serverHostName, hn, strlen(hn)+1);  /* including '\0' */
00239       hn = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/hostname", serverHostName);
00240       memmove(serverHostName, hn, strlen(hn)+1);
00241    }
00242 
00243    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00244       "Lookup xmlBlaster on -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %s ...",
00245       serverHostName, servTcpPort);
00246 
00247    *xb->secretSessionId = 0;
00248    memset((char *)&xmlBlasterAddr, 0, sizeof(xmlBlasterAddr));
00249    xmlBlasterAddr.sin_family=AF_INET;
00250 
00251 # if _WINDOWS_NOT_YET_PORTED /* Windows gethostbyname is deprecated */
00252    const struct addrinfo hints;
00253    struct addrinfo** res;
00254    int getaddrinfo(serverHostName, servTcpPort, &hints, res);
00255    res->ai_next : ai_family, ai_socktype, and ai_protocol
00256 
00257    ...
00258 
00259    void freeaddrinfo(*res);
00260 # endif
00261         if (isalpha(serverHostName[0]) || strchr(serverHostName,':') != 0) { /* look for dns name or ipv6 */
00262            char *tmphstbuf=0;
00263            memset((char *)&hostbuf, 0, sizeof(struct hostent));
00264            xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server DNS lookup of hostname '%s'", serverHostName);
00265            hostP = gethostbyname_re(serverHostName, &hostbuf, &tmphstbuf, &hstbuflen, errP);
00266            if (hostP == 0) {
00267               if (*errP != 0) {
00268                  strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00269                  SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00270                     "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s: %s",
00271                           __FILE__, __LINE__, serverHostName, servTcpPort, errP);
00272                  xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
00273                  *errP = 0;
00274               }
00275               else {
00276                  strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00277                  SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00278                           "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s, errno=%d",
00279                           __FILE__, __LINE__, serverHostName, servTcpPort, errno);
00280                  xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
00281               }
00282               return false;
00283            }
00284            xmlBlasterAddr.sin_addr.s_addr = ((struct in_addr *)(hostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */
00285            free(tmphstbuf);
00286         }
00287         else {
00288            xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server IP4 usage (without DNS lookup) of IP '%s'", serverHostName);
00289            /* use ip4 addr directly to avoid dns lookup */
00290            xmlBlasterAddr.sin_addr.s_addr = inet_addr(serverHostName);
00291         }
00292 
00293    portP = getservbyname(servTcpPort, "tcp");
00294    if (portP != 0)
00295                 xmlBlasterAddr.sin_port = (u_short)portP->s_port;
00296    else
00297       xmlBlasterAddr.sin_port = htons((u_short)atoi(servTcpPort));
00298 
00299    xb->socketToXmlBlaster = (int)socket(AF_INET, SOCK_STREAM, 0);
00300    if (xb->socketToXmlBlaster != -1) {
00301       int ret=0;
00302       const char *localHostName = xb->props->getString(xb->props, "plugin/socket/localHostname", 0);
00303       int localPort = xb->props->getInt(xb->props, "plugin/socket/localPort", 0);
00304       localHostName = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/localHostname", localHostName);
00305       localPort = xb->props->getInt(xb->props, "dispatch/connection/plugin/socket/localPort", localPort);
00306 
00307       /* Sometimes a user may whish to force the local host/port setting (e.g. for firewall tunneling
00308          and on multi homed hosts */
00309       if (localHostName != 0 || localPort > 0) {
00310          struct sockaddr_in localAddr;
00311          struct hostent localHostbuf, *localHostP = 0;
00312          char *tmpLocalHostbuf=0;
00313          size_t localHostbuflen=0;
00314          memset(&localAddr, 0, sizeof(localAddr));
00315          localAddr.sin_family = AF_INET;
00316          if (localHostName) {
00317             if (isalpha(localHostName[0]) || strchr(localHostName,':') != 0) { /* look for dns name or ipv6 */
00318                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local hostname DNS lookup of hostname '%s'", localHostName);
00319                *errP = 0;
00320                localHostP = gethostbyname_re(localHostName, &localHostbuf, &tmpLocalHostbuf, &localHostbuflen, errP);
00321                if (*errP != 0) {
00322                   strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00323                   SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00324                            "[%.100s:%d] Lookup of local IP failed, %s",
00325                            __FILE__, __LINE__, errP);
00326                   xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
00327                   *errP = 0;
00328                }
00329                if (localHostP != 0) {
00330                   localAddr.sin_addr.s_addr = ((struct in_addr *)(localHostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */
00331                   free(tmpLocalHostbuf);
00332                }
00333             }
00334             else {
00335                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local IP4 usage (without DNS lookup) of IP '%s'", localHostName);
00336                /* use ip4 addr directly to avoid dns lookup */
00337                localAddr.sin_addr.s_addr = inet_addr(localHostName);
00338             }
00339          }
00340          if (localPort > 0) {
00341             localAddr.sin_port = htons((unsigned short)localPort);
00342          }
00343          if (bind(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
00344             if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00345                "Failed binding local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
00346                   localHostName, localPort);
00347          }
00348          else {
00349             xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00350                "Bound local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
00351                   localHostName, localPort);
00352          }
00353       }
00354 
00355       /* int retval = fcntl(xb->socketToXmlBlaster, F_SETFL, O_NONBLOCK); */ /* Switch on none blocking mode: we then should use select() to be notified when the kernel succeeded with connect() */
00356 
00357       if ((ret=connect(xb->socketToXmlBlaster, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) != -1) {
00358          if (xb->logLevel>=XMLBLASTER_LOG_INFO) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Connected to xmlBlaster");
00359          xb->useUdpForOneway = xb->props->getBool(xb->props, "plugin/socket/useUdpForOneway", xb->useUdpForOneway);
00360          xb->useUdpForOneway = xb->props->getBool(xb->props, "dispatch/connection/plugin/socket/useUdpForOneway", xb->useUdpForOneway);
00361 
00362          if (xb->useUdpForOneway) {
00363             struct sockaddr_in localAddr;
00364             socklen_t size = (socklen_t)sizeof(localAddr);
00365             xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00366                "Using UDP connection for oneway calls, see -dispatch/connection/plugin/socket/useUdpForOneway true");
00367 
00368             xb->socketToXmlBlasterUdp = (int)socket(AF_INET, SOCK_DGRAM, 0);
00369 
00370             if (xb->socketToXmlBlasterUdp != -1) {
00371                if (getsockname(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, &size) == -1) {
00372                   if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00373                      "Can't determine the local socket host and port (in UDP), errno=%d", errno);
00374                   return false;
00375                }
00376 
00377                if (bind(xb->socketToXmlBlasterUdp, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
00378                   if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00379                      "Failed binding local port (in UDP) -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
00380                      localHostName, localPort);
00381                   return false;
00382                }
00383                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00384                   "Bound local UDP port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
00385                   localHostName, localPort);
00386 
00387                if ((ret=connect(xb->socketToXmlBlasterUdp, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) == -1) {
00388                   char errnoStr[MAX_ERRNO_LEN];
00389                   xb_strerror(errnoStr, MAX_ERRNO_LEN, errno);
00390                   strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00391                   SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00392                            "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP), ret=%d, %s",
00393                            __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr);
00394                   if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00395                   return false;
00396                }
00397                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connected to xmlBlaster with UDP");
00398             } /* if (xb->socketToXmlBlasterUdp != -1) */
00399             else {
00400                strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00401                SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00402                         "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP) errno=%d",
00403                         __FILE__, __LINE__, serverHostName, servTcpPort, errno);
00404                return false;
00405             }
00406          } /* if (xb->useUdpForOneway) */
00407 
00408       }
00409       else { /* connect(...) == -1 */
00410          char errnoStr[MAX_ERRNO_LEN];
00411          xb_strerror(errnoStr, MAX_ERRNO_LEN, errno);
00412          strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00413          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00414                   "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed, ret=%d, %s",
00415                   __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr);
00416          if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00417          return false;
00418       }
00419    }
00420    else {
00421       strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00422       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00423                "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed errno=%d",
00424                __FILE__, __LINE__, serverHostName, servTcpPort, errno);
00425       return false;
00426    }
00427    xb->isInitialized = true;
00428    return true;
00429 }
00430 
00431 
00449 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception)
00450 {
00451 #ifdef XMLBLASTER_PERSISTENT_QUEUE_TEST
00452    if (checkArgs(xb, "initQueue", false, exception) == false ) return false;
00453    if (xb->queueP) {
00454       char message[XMLBLASTEREXCEPTION_MESSAGE_LEN];
00455       SNPRINTF(message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00456                "[%.100s:%d] The queue is initialized already, call to initQueue() is ignored", __FILE__, __LINE__);
00457       embedException(exception, "user.illegalArgument", message, exception);
00458       xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
00459       return false;
00460    }
00461 
00462    {
00463       QueueProperties tmp;
00464       memset(&tmp, 0, sizeof(QueueProperties));
00465 
00466       if (queueProperties == 0)
00467          queueProperties = &tmp;
00468 
00469       if (*queueProperties->dbName == 0) {
00470          strncpy0(queueProperties->dbName, xb->props->getString(xb->props, "queue/connection/dbName", "xmlBlasterClient.db"), QUEUE_DBNAME_MAX);
00471       }
00472       if (*queueProperties->nodeId == 0) {
00473          strncpy0(queueProperties->nodeId, xb->props->getString(xb->props, "queue/connection/nodeId", "client"), QUEUE_ID_MAX);
00474       }
00475       if (*queueProperties->queueName == 0) {
00476          strncpy0(queueProperties->queueName, xb->props->getString(xb->props, "queue/connection/queueName", "connection_client"), QUEUE_ID_MAX);
00477       }
00478       if (*queueProperties->tablePrefix == 0) {
00479          strncpy0(queueProperties->tablePrefix, xb->props->getString(xb->props, "queue/connection/tablePrefix", "XB_"), QUEUE_PREFIX_MAX);
00480       }
00481       if (queueProperties->maxNumOfEntries == 0) {
00482          queueProperties->maxNumOfEntries = xb->props->getInt(xb->props, "queue/connection/maxEntries", 10000000);
00483       }
00484       if (queueProperties->maxNumOfBytes == 0) {
00485          queueProperties->maxNumOfBytes = xb->props->getInt64(xb->props, "queue/connection/maxBytes", 10000000LL);
00486       }
00487       if (queueProperties->logFp == 0) queueProperties->logFp = xb->log;
00488       if (queueProperties->logLevel == 0) queueProperties->logLevel = xb->logLevel;
00489       if (queueProperties->userObject == 0) queueProperties->userObject = xb->userObject;
00490 
00491       xb->queueP = createQueue(queueProperties, exception);
00492       if (*exception->errorCode != 0) {
00493          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Queue initializeation failed: [%s] %s\n", exception->errorCode, exception->message);
00494          return false;
00495       }
00496       xb->queueP->userObject = xb;
00497    }
00498    return true;
00499 #else
00500    if (queueProperties) {} /* To suppress compiler warning that not used */
00501    strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00502    SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00503             "[%.100s:%d] Queue support is not compiled into the library, please recompile with '-DXMLBLASTER_PERSISTENT_QUEUE=1 and -DXMLBLASTER_PERSISTENT_QUEUE_TEST=1", __FILE__, __LINE__);
00504    xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
00505    return false;
00506 #endif /* XMLBLASTER_PERSISTENT_QUEUE_TEST */
00507 }
00508 
00509 static bool isConnected(XmlBlasterConnectionUnparsed *xb)
00510 {
00511    return (xb->socketToXmlBlaster > -1) ? true : false;
00512 }
00513 
00514 const char *xmlBlasterConnectionUnparsedUsage()
00515 {
00516    /* To prevent compiler warning */
00517    /*   "string length `596' is greater than the length `509' ISO C89 compilers are required to support" */
00518    /* we have a static variable */
00519    enum { SIZE=2048 };
00520    static char usage[SIZE];
00521    strncpy0(usage, 
00522       "\n   -dispatch/connection/plugin/socket/hostname [localhost]"
00523       "\n                       Where to find xmlBlaster."
00524       "\n   -dispatch/connection/plugin/socket/port [7607]"
00525       "\n                       The port where xmlBlaster listens."
00526       "\n   -dispatch/connection/plugin/socket/localHostname [NULL]", SIZE/2);
00527    strncat0(usage,
00528       "\n                       Force the local IP, useful on multi homed computers."
00529       "\n   -dispatch/connection/plugin/socket/localPort [0]"
00530       "\n                       Force the local port, useful to tunnel firewalls."
00531       "\n   -dispatch/connection/plugin/socket/compress/type []"
00532 #if XMLBLASTER_ZLIB==1
00533       "\n                       Switch on compression with 'zlib:stream'."
00534 #else
00535       "\n                       No compression support. Try recompiling with with '-DXMLBLASTER_ZLIB=1'."
00536 #endif
00537       "\n   -dispatch/connection/plugin/socket/useUdpForOneway [false]"
00538       "\n                       Use UDP for publishOneway() calls.", SIZE/2);
00539    return usage;
00540 }
00541 
00545 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb)
00546 {
00547    if (xb != 0 && xb->isConnected(xb)) {
00548 #     if defined(_WINDOWS)
00549       int how = SD_BOTH;   /* SD_BOTH requires Winsock2.h */
00550 #     else
00551       int how = SHUT_RDWR; /* enum SHUT_RDWR = 2 */
00552 #     endif
00553       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00554             "shutdown() socketToXmlBlaster=%d socketToXmlBlasterUdp=%d", xb->socketToXmlBlaster, xb->socketToXmlBlasterUdp);
00555       shutdown(xb->socketToXmlBlaster, how); 
00556       closeSocket(xb->socketToXmlBlaster);
00557       xb->socketToXmlBlaster = -1;
00558       if (xb->socketToXmlBlasterUdp != -1) {
00559          shutdown(xb->socketToXmlBlasterUdp, how);
00560          closeSocket(xb->socketToXmlBlasterUdp);
00561          xb->socketToXmlBlasterUdp = -1;
00562       }
00563    }
00564 }
00565 
00581 static bool sendData(XmlBlasterConnectionUnparsed *xb, 
00582               const char * const methodName,
00583               enum XMLBLASTER_MSG_TYPE_ENUM msgType,
00584               const char *data_,
00585               size_t dataLen_,
00586               SocketDataHolder *responseSocketDataHolder,
00587               XmlBlasterException *exception,
00588               bool udp)
00589 {
00590    ssize_t numSent;
00591    size_t rawMsgLen = 0;
00592    char *rawMsg = (char *)0;
00593    char *rawMsgStr;
00594    MsgRequestInfo *requestInfoP;
00595    MsgRequestInfo requestInfo;
00596    memset(&requestInfo, 0, sizeof(MsgRequestInfo));
00597 
00598    if (data_ == 0) {
00599       data_ = "";
00600       dataLen_ = 0;
00601    }
00602 
00603    if (exception == 0) {
00604       xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception to sendData()", __FILE__, __LINE__);
00605       return false;
00606    }
00607    initializeXmlBlasterException(exception);
00608 
00609    if (responseSocketDataHolder)
00610       memset(responseSocketDataHolder, 0, sizeof(SocketDataHolder));
00611 
00612    if (!xb->isConnected(xb)) {
00613       strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00614       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] No connection to xmlBlaster", __FILE__, __LINE__);
00615       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00616       return false;
00617    }
00618 
00619    if (strcmp(XMLBLASTER_CONNECT, methodName) && strlen(xb->secretSessionId) < 1) {
00620       strncpy0(exception->errorCode, "user.notConnected", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00621       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please call connect() before invoking '%s'", __FILE__, __LINE__, methodName);
00622       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00623       return false;
00624    }
00625 
00626    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00627       "sendData(udp=%s) requestId '%ld' increment to '%ld', dataLen=%d",
00628       ((udp==true) ? "true" : "false"), xb->requestId, xb->requestId+1, dataLen_);
00629 
00630    {
00631       long tmp = ++xb->requestId; /* TODO: We need to sync requestId !!!! */
00632       if (xb->requestId > 1000000000) xb->requestId = 0;
00633       SNPRINTF(requestInfo.requestIdStr, MAX_REQUESTID_LEN, "%-ld", tmp);
00634    }
00635 
00636    requestInfo.methodName = methodName;
00637    if (xb->preSendEvent != 0) {
00638       /* A callback function pointer is registered to be notified just before sending */
00639       XmlBlasterBlob blob;
00640       blobcpyAlloc(&blob, data_, dataLen_); /* Take a clone, the preSendEvent() function may manipulate it */
00641       requestInfo.blob.dataLen = blob.dataLen;
00642       requestInfo.blob.data = blob.data;
00643       requestInfo.xa = xb->preSendEvent_userP;
00644       requestInfoP = xb->preSendEvent(&requestInfo, exception);
00645       if (*exception->message != 0) {
00646          if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00647             "Re-throw exception from preSendEvent errorCode=%s message=%s", exception->errorCode, exception->message);
00648          return false;
00649       }
00650       if (requestInfoP == 0) {
00651          strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00652          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR: returning requestInfo 0 without exception is not supported, please correct your preSendEvent() function.", __FILE__, __LINE__);
00653          if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00654          return false;
00655       }
00656       if (blob.data != requestInfoP->blob.data) {
00657          /* The callback function has changed/manipulated the user data */
00658          freeBlobHolderContent(&blob);
00659       }
00660       rawMsg = encodeSocketMessage(msgType, requestInfo.requestIdStr, requestInfo.methodName, xb->secretSessionId,
00661                              requestInfoP->blob.data, requestInfoP->blob.dataLen, xb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen);
00662       freeBlobHolderContent(&requestInfoP->blob);
00663    }
00664    else {
00665       rawMsg = encodeSocketMessage(msgType, requestInfo.requestIdStr, requestInfo.methodName, xb->secretSessionId,
00666                              data_, dataLen_, xb->logLevel >= XMLBLASTER_LOG_DUMP, &rawMsgLen);
00667    }
00668    
00669    /* send the header ... */
00670    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Lowlevel writing data to socket ...");
00671    numSent = xb->writeToSocket.writeToSocketFuncP(xb->writeToSocket.userP, udp ? xb->socketToXmlBlasterUdp : xb->socketToXmlBlaster, rawMsg, (int)rawMsgLen);
00672    if (numSent == -1) {
00673       if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00674                                    "Lost connection to xmlBlaster server");
00675       xmlBlasterConnectionShutdown(xb);
00676       strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00677       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Lost connection to xmlBlaster server", __FILE__, __LINE__);
00678       free(rawMsg);
00679       if (xb->postSendEvent != 0) {
00680          requestInfo.rollback = true;
00681          requestInfoP = xb->postSendEvent(&requestInfo, exception);
00682       }
00683       return false;
00684    }
00685 
00686    if (numSent != (int)rawMsgLen) {
00687       if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00688          "Sent only %d bytes from %u", numSent, rawMsgLen);
00689       strncpy0(exception->errorCode, "user.connect", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00690       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR Sent only %ld bytes from %lu", __FILE__, __LINE__, (long)numSent, (unsigned long)rawMsgLen);
00691       free(rawMsg);
00692       if (xb->postSendEvent != 0) {
00693          requestInfo.rollback = true;
00694          requestInfoP = xb->postSendEvent(&requestInfo, exception);
00695       }
00696       return false;
00697    }
00698    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Lowlevel writing data to socket done.");
00699 
00700    free(rawMsg);
00701    rawMsg = 0;
00702 
00703    if (xbl_isOneway(msgType, methodName))
00704       return true; /* Responses and exceptions are oneway */
00705 
00706    if (responseSocketDataHolder) { /* if not oneway read the response message */
00707 
00708       if (xb->postSendEvent != 0) {
00709          /* A callback function pointer is registered to be notified just after sending */
00710          requestInfo.responseType = 0;
00711          requestInfo.blob.dataLen = 0;
00712          requestInfo.blob.data = 0;
00713          /* Here the thread blocks until a response from CallbackServer arrives */
00714          requestInfoP = xb->postSendEvent(&requestInfo, exception);
00715          if (*exception->message != 0) {
00716             if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00717                "Re-throw exception from preSendEvent errorCode=%s message=%s", exception->errorCode, exception->message);
00718             return false;
00719          }
00720          if (requestInfoP == 0) {
00721             printf("[XmlBlasterConnectionUnparsed] TODO: returning requestInfo 0 is not implemented");
00722          }
00723          /* TODO: Possible race condition */
00724          responseSocketDataHolder->type = requestInfoP->responseType;
00725          responseSocketDataHolder->version = XMLBLASTER_SOCKET_VERSION;
00726          strncpy0(responseSocketDataHolder->requestId, requestInfo.requestIdStr, MAX_REQUESTID_LEN);
00727          strncpy0(responseSocketDataHolder->methodName, methodName, MAX_METHODNAME_LEN);
00728 
00729          if (requestInfoP->responseType == MSG_TYPE_EXCEPTION) { /* convert XmlBlasterException thrown from remote */
00730             convertToXmlBlasterException(&requestInfoP->blob, exception, xb->logLevel >= XMLBLASTER_LOG_DUMP);
00731             freeBlobHolderContent(&requestInfoP->blob);
00732             return false;
00733          }
00734          else {
00735             responseSocketDataHolder->blob.dataLen = requestInfoP->blob.dataLen;
00736             responseSocketDataHolder->blob.data = requestInfoP->blob.data;     /* The responseSocketDataHolder is now responsible to free(responseSocketDataHolder->blob.data) */
00737          }
00738          if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00739             "requestId '%s' returns dataLen=%d", requestInfo.requestIdStr, requestInfoP->blob.dataLen);
00740       }
00741       else {
00742          /* Wait on the response ourself */
00743          if (getResponse(xb, responseSocketDataHolder, exception, udp) == false) {  /* false on EOF */
00744             xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Lost connection to xmlBlaster server");
00745             xmlBlasterConnectionShutdown(xb);
00746             strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00747             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Lost connection to xmlBlaster server", __FILE__, __LINE__);
00748             return false;
00749          }
00750          if (responseSocketDataHolder->type == MSG_TYPE_EXCEPTION) { /* convert XmlBlasterException */
00751             convertToXmlBlasterException(&responseSocketDataHolder->blob, exception, xb->logLevel >= XMLBLASTER_LOG_DUMP);
00752             freeBlobHolderContent(&responseSocketDataHolder->blob);
00753             if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00754                "Re-throw exception from response errorCode=%s message=%s", exception->errorCode, exception->message);
00755             return false;
00756          }
00757       }
00758 
00759       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) {
00760          rawMsgStr = blobDump(&responseSocketDataHolder->blob);
00761          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Received response msgLen=%u type=%c version=%c requestId=%s methodName=%s dateLen=%u data='%.100s ...'",
00762                   responseSocketDataHolder->msgLen, responseSocketDataHolder->type, responseSocketDataHolder->version, responseSocketDataHolder->requestId,
00763                   responseSocketDataHolder->methodName, responseSocketDataHolder->blob.dataLen, rawMsgStr);
00764          freeBlobDump(rawMsgStr);
00765       }
00766    }
00767 
00768    return true;
00769 }
00770 
00782 static bool getResponse(XmlBlasterConnectionUnparsed *xb, SocketDataHolder *responseSocketDataHolder, XmlBlasterException *exception, bool udp)
00783 {
00784    bool stopListenLoop = false;
00785    return parseSocketData(xb->socketToXmlBlaster, &xb->readFromSocket, responseSocketDataHolder, exception, &stopListenLoop, udp, xb->logLevel >= XMLBLASTER_LOG_DUMP);
00786 }
00787 
00799 static char *xmlBlasterConnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception)
00800 {
00801    SocketDataHolder responseSocketDataHolder;
00802    char *response;
00803    
00804    if (qos == 0) {
00805       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00806       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterConnect()", __FILE__, __LINE__);
00807       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00808       return (char *)0;
00809    }
00810 
00811    if (initConnection(xb, exception) == false) {
00812       return (char *)0;
00813    }
00814 
00815    if (sendData(xb, XMLBLASTER_CONNECT, MSG_TYPE_INVOKE, (const char *)qos,
00816                 (qos == (const char *)0) ? 0 : strlen(qos),
00817                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
00818       return (char *)0;
00819    }
00820 
00821    response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
00822    freeBlobHolderContent(&responseSocketDataHolder.blob);
00823 
00824    /* Extract secret session ID from ConnectReturnQos */
00825    *xb->secretSessionId = 0;
00826    {
00827       const char *pEnd = (const char *)0;
00828       const char *pStart = strstr(response, "sessionId='");
00829       if (pStart) {
00830          pStart += strlen("sessionId='");
00831          pEnd = strstr(pStart, "'");
00832          if (pEnd) {
00833             int len = (int)(pEnd - pStart + 1);
00834             if (len >= MAX_SECRETSESSIONID_LEN) {
00835                strncpy0(exception->errorCode, "user.response", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00836                SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] ERROR Received too long secret sessionId with len=%d, please change setting MAX_SECRETSESSIONID_LEN", __FILE__, __LINE__, len);
00837                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00838             }
00839             strncpy0(xb->secretSessionId, pStart, len);
00840          }
00841       }
00842    }
00843 
00844    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00845       "Got response for connect(secretSessionId=%s)", xb->secretSessionId);
00846 
00847    return response;
00848 }
00849 
00858 static bool xmlBlasterDisconnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception)
00859 {
00860    SocketDataHolder responseSocketDataHolder;
00861 
00862    if (checkArgs(xb, XMLBLASTER_DISCONNECT, true, exception) == false ) return 0;
00863 
00864    if (sendData(xb, XMLBLASTER_DISCONNECT, MSG_TYPE_INVOKE, (const char *)qos, 
00865                 (qos == (const char *)0) ? 0 : strlen(qos),
00866                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
00867       return false;
00868    }
00869 
00870    freeBlobHolderContent(&responseSocketDataHolder.blob);
00871 
00872    xmlBlasterConnectionShutdown(xb);
00873    *xb->secretSessionId = 0;
00874    return true;
00875 }
00876 
00877 
00878 #if XMLBLASTER_PERSISTENT_QUEUE_TEST==1
00879 
00883 static int parsePriority(const char *qos) {
00884    char *pPrio, *pPrioEnd;
00885    /*const int PRIORITY_MAXLEN = 10;*/
00886    #define PRIORITY_MAXLEN 10 /* To be backward compatible to C90 */
00887    char prioStr[PRIORITY_MAXLEN];
00888    int len = 1;
00889    int prio = 5;
00890    const int lenPrio=strlen("<priority>");
00891 
00892    if (qos == 0) return prio;
00893 
00894    pPrio = strstr(qos, "<priority>");
00895    if (pPrio == 0) return prio;
00896 
00897    pPrioEnd = strstr(qos, "</priority>");
00898    if (pPrioEnd == 0) return prio;
00899 
00900    len = pPrioEnd-pPrio-lenPrio;
00901    if (len >= PRIORITY_MAXLEN) {
00902       return prio;
00903    }
00904    strncpy(prioStr, pPrio+lenPrio, len);
00905    *(prioStr+len) = 0;
00906    sscanf(prioStr, "%d", &prio); /* on error prio remains 5, white spaces are stripped by sscanf */
00907    return prio;
00908 }
00909 
00915 static char *xmlBlasterQueuePut(XmlBlasterConnectionUnparsed *xb, int priority, BlobHolder *blob, XmlBlasterException *exception)
00916 {
00917    QueueEntry queueEntry;
00918    XmlBlasterException queueException;
00919 
00920    QueueProperties *queuePropertiesP = 0; /* 0: read configuration from environment */
00921    /*
00922    QueueProperties queueProperties;
00923    memset(&queueProperties, 0, sizeof(QueueProperties));
00924    queuePropertiesP = &queueProperties;
00925    strncpy0(queueProperties.dbName, "xmlBlasterClient.db", QUEUE_DBNAME_MAX);
00926    strncpy0(queueProperties.nodeId, "clientJoe1081594557415", QUEUE_ID_MAX);
00927    strncpy0(queueProperties.queueName, "connection_clientJoe", QUEUE_ID_MAX);
00928    strncpy0(queueProperties.tablePrefix, "XB_", QUEUE_PREFIX_MAX);
00929    queueProperties.maxNumOfEntries = 10000000L;
00930    queueProperties.maxNumOfBytes = 1000000000LL;
00931    queueProperties.logFp = xb->log;
00932    queueProperties.logLevel = xb->logLevel;
00933    queueProperties.userObject = xb->userObject;
00934    queueP = createQueue(&queueProperties, &queueException);
00935    */
00936 
00937    if (xb->queueP == 0) {
00938       if (xb->initQueue(xb, queuePropertiesP, exception) == false)
00939          return 0;
00940    }
00941 
00942    queueEntry.priority = priority;
00943    queueEntry.isPersistent = true;
00944    queueEntry.uniqueId = getTimestamp();
00945    strncpy0(queueEntry.embeddedType, "MSG_RAW|publish", QUEUE_ENTRY_EMBEDDEDTYPE_LEN);
00946    queueEntry.embeddedBlob.data = blob->data;
00947    queueEntry.embeddedBlob.dataLen = blob->dataLen;
00948 
00949    xb->queueP->put(xb->queueP, &queueEntry, &queueException);
00950    if (*queueException.errorCode != 0) {
00951       embedException(exception, queueException.errorCode, queueException.message, exception);
00952       xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Put to queue failed: [%s] %s\n", exception->errorCode, exception->message);
00953       return 0;
00954    }
00955    *exception->errorCode = 0; /* Successfully queued: no error */
00956    return strcpyAlloc("<qos><state id='OK' info='QUEUED'/></qos>");
00957 }
00958 #endif /*XMLBLASTER_PERSISTENT_QUEUE_TEST==1*/
00959 
00967 static char *xmlBlasterPublish(XmlBlasterConnectionUnparsed *xb, MsgUnit *msgUnit, XmlBlasterException *exception)
00968 {
00969    SocketDataHolder responseSocketDataHolder;
00970    char *response = 0;
00971 
00972    BlobHolder blob = encodeMsgUnit(msgUnit, xb->logLevel >= XMLBLASTER_LOG_DUMP);
00973    msgUnit->responseQos = 0; /* In case no initial memset(&msgUnit, 0, sizeof(MsgUnit)); was made */ 
00974 
00975    if (checkArgs(xb, "publish", true, exception) == false ) return 0;
00976 
00977    msgUnit->responseQos = 0; /* Initialize properly */
00978 
00979    if (sendData(xb, XMLBLASTER_PUBLISH, MSG_TYPE_INVOKE, blob.data, blob.dataLen,
00980                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
00981 
00982 #     if XMLBLASTER_PERSISTENT_QUEUE_TEST==1 /* TEST CODE */
00983          if (strstr(exception->errorCode, "user.notConnected") != 0 ||
00984              strstr(exception->errorCode, "communication.noConnection") != 0) { /* On communication problem queue messages */
00985             int priority = parsePriority(msgUnit->qos);
00986             response = xmlBlasterQueuePut(xb, priority, &blob, exception);
00987             /* NO: msgUnit->responseQos = response; otherwise a free(msgUnit) will free the response as well */
00988          }
00989 #     endif
00990 
00991       free(blob.data);
00992       return response;
00993    }
00994    free(blob.data);
00995 
00996    response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
00997    freeBlobHolderContent(&responseSocketDataHolder.blob);
00998 
00999    return response;
01000 }
01001 
01009 static QosArr *xmlBlasterPublishArr(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
01010 {
01011    size_t i;
01012    SocketDataHolder responseSocketDataHolder;
01013    QosArr *response = 0;
01014 
01015    BlobHolder blob = encodeMsgUnitArr(msgUnitArr, xb->logLevel >= XMLBLASTER_LOG_DUMP);
01016 
01017    if (checkArgs(xb, "publishArr", true, exception) == false ) return 0;
01018 
01019    for (i=0; i<msgUnitArr->len; i++)
01020       msgUnitArr->msgUnitArr[i].responseQos = 0; /* Initialize properly */
01021 
01022    if (sendData(xb, XMLBLASTER_PUBLISH, MSG_TYPE_INVOKE, blob.data, blob.dataLen,
01023                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
01024       free(blob.data);
01025       return 0;
01026    }
01027    free(blob.data);
01028 
01029    response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
01030    freeBlobHolderContent(&responseSocketDataHolder.blob);
01031 
01032    return response;
01033 }
01034 
01040 static void xmlBlasterPublishOneway(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
01041 {
01042    size_t i;
01043    SocketDataHolder responseSocketDataHolder;
01044 
01045    BlobHolder blob = encodeMsgUnitArr(msgUnitArr, xb->logLevel >= XMLBLASTER_LOG_DUMP);
01046 
01047    if (checkArgs(xb, "publishOneway", true, exception) == false ) return;
01048 
01049    for (i=0; i<msgUnitArr->len; i++) {
01050       msgUnitArr->msgUnitArr[i].responseQos = 0; /* Initialize properly */
01051    }
01052 
01053    /*
01054    if (!xb->useUdpForOneway) {
01055       strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
01056       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] UDP not enabled, use -dispatch/connection/plugin/socket/enableUDP true", __FILE__, __LINE__);
01057       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
01058       free(blob.data);
01059       return;
01060    }
01061    */
01062 
01063    if (sendData(xb, XMLBLASTER_PUBLISH_ONEWAY, MSG_TYPE_INVOKE, blob.data, blob.dataLen,
01064                 &responseSocketDataHolder, exception, xb->useUdpForOneway) == false) {
01065       free(blob.data);
01066       return;
01067    }
01068    free(blob.data);
01069    freeBlobHolderContent(&responseSocketDataHolder.blob); /* Could be ommitted for oneway */
01070 }
01071 
01079 static char *xmlBlasterSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
01080 {
01081    size_t qosLen, keyLen, totalLen;
01082    char *data;
01083    size_t currpos = 0;
01084    SocketDataHolder responseSocketDataHolder;
01085    char *response;
01086 
01087    if (checkArgs(xb, "subscribe", true, exception) == false ) return 0;
01088    
01089    if (key == 0) {
01090       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
01091       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterSubscribe()", __FILE__, __LINE__);
01092       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
01093       return (char *)0;
01094    }
01095 
01096    if (qos == (const char *)0) {
01097       qos = "";
01098    }
01099    qosLen = strlen(qos);
01100    keyLen = strlen(key);
01101 
01102    totalLen = qosLen + 1 + keyLen + 1;
01103 
01104    data = (char *)malloc(totalLen);
01105 
01106    memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
01107    currpos += qosLen+1;
01108 
01109    memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
01110    currpos += keyLen+1;
01111 
01112    if (sendData(xb, XMLBLASTER_SUBSCRIBE, MSG_TYPE_INVOKE, data, totalLen,
01113                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
01114       free(data);
01115       return (char *)0;
01116    }
01117    free(data);
01118 
01119    response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
01120    freeBlobHolderContent(&responseSocketDataHolder.blob);
01121 
01122    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
01123       "Got response for subscribe(): %s", response);
01124 
01125    return response;
01126 }
01127 
01135 static QosArr *xmlBlasterUnSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
01136 {
01137    size_t qosLen, keyLen, totalLen;
01138    char *data;
01139    size_t currpos = 0;
01140    SocketDataHolder responseSocketDataHolder;
01141    QosArr *response;
01142 
01143    if (checkArgs(xb, "unSubscribe", true, exception) == false ) return 0;
01144 
01145    if (key == 0) {
01146       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
01147       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterUnSubscribe()", __FILE__, __LINE__);
01148       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
01149       return (QosArr *)0;
01150    }
01151 
01152    if (qos == (const char *)0) {
01153       qos = "";
01154    }
01155    qosLen = strlen(qos);
01156    keyLen = strlen(key);
01157 
01158    totalLen = qosLen + 1 + keyLen + 1;
01159 
01160    data = (char *)malloc(totalLen);
01161 
01162    memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
01163    currpos += qosLen+1;
01164 
01165    memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
01166    currpos += keyLen+1;
01167 
01168    if (sendData(xb, XMLBLASTER_UNSUBSCRIBE, MSG_TYPE_INVOKE, data, totalLen,
01169                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
01170       free(data);
01171       return (QosArr *)0;
01172    }
01173    free(data);
01174 
01175    response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
01176    freeBlobHolderContent(&responseSocketDataHolder.blob);
01177 
01178    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) {
01179       size_t ii;
01180       for (ii=0; ii<response->len; ii++) {
01181          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
01182             "Got response for unSubscribe(): %s", response->qosArr[ii]);
01183       }
01184    }
01185 
01186    return response;
01187 }
01188 
01197 static QosArr *xmlBlasterErase(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
01198 {
01199    size_t qosLen, keyLen, totalLen;
01200    char *data;
01201    size_t currpos = 0;
01202    SocketDataHolder responseSocketDataHolder;
01203    QosArr *response;
01204 
01205    if (checkArgs(xb, "erase", true, exception) == false ) return 0;
01206 
01207    if (key == 0) {
01208       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
01209       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterErase()", __FILE__, __LINE__);
01210       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
01211       return (QosArr *)0;
01212    }
01213 
01214    if (qos == (const char *)0) {
01215       qos = "";
01216    }
01217    qosLen = strlen(qos);
01218    keyLen = strlen(key);
01219 
01220    totalLen = qosLen + 1 + keyLen + 1;
01221 
01222    data = (char *)malloc(totalLen);
01223 
01224    memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
01225    currpos += qosLen+1;
01226 
01227    memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
01228    currpos += keyLen+1;
01229 
01230    if (sendData(xb, XMLBLASTER_ERASE, MSG_TYPE_INVOKE, data, totalLen,
01231                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
01232       free(data);
01233       return (QosArr *)0;
01234    }
01235    free(data);
01236 
01237    response = parseQosArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
01238    freeBlobHolderContent(&responseSocketDataHolder.blob);
01239 
01240    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) {
01241       size_t ii;
01242       for (ii=0; ii<response->len; ii++) {
01243          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
01244             "Got response for erase(): %s", response->qosArr[ii]);
01245       }
01246    }
01247 
01248    return response;
01249 }
01250 
01259 static char *xmlBlasterPing(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception)
01260 {
01261    SocketDataHolder responseSocketDataHolder;
01262    char *response;
01263 
01264    if (checkArgs(xb, "ping", true, exception) == false ) return 0;
01265    
01266    if (sendData(xb, XMLBLASTER_PING, MSG_TYPE_INVOKE, (const char *)qos,
01267                 (qos == (const char *)0) ? 0 : strlen(qos),
01268                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
01269       return (char *)0;
01270    }
01271 
01272    response = strFromBlobAlloc(responseSocketDataHolder.blob.data, responseSocketDataHolder.blob.dataLen);
01273    freeBlobHolderContent(&responseSocketDataHolder.blob);
01274    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
01275       "Got response for ping '%s'", response);
01276    return response;
01277 }
01278 
01286 static MsgUnitArr *xmlBlasterGet(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception)
01287 {
01288    size_t qosLen, keyLen, totalLen;
01289    char *data;
01290    size_t currpos = 0;
01291    SocketDataHolder responseSocketDataHolder;
01292    MsgUnitArr *msgUnitArr = 0;
01293 
01294    if (key == 0) {
01295       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
01296       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%s:%d] Please provide valid arguments to xmlBlasterGet()", __FILE__, __LINE__);
01297       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
01298       return (MsgUnitArr *)0;
01299    }
01300 
01301    if (qos == (const char *)0) qos = "";
01302    qosLen = strlen(qos);
01303    keyLen = strlen(key);
01304 
01305    totalLen = qosLen + 1 + keyLen + 1;
01306 
01307    data = (char *)malloc(totalLen);
01308 
01309    memcpy(data+currpos, qos, qosLen+1); /* inclusive '\0' */
01310    currpos += qosLen+1;
01311 
01312    memcpy(data+currpos, key, keyLen+1); /* inclusive '\0' */
01313    currpos += keyLen+1;
01314 
01315    if (sendData(xb, XMLBLASTER_GET, MSG_TYPE_INVOKE, data, totalLen,
01316                 &responseSocketDataHolder, exception, SOCKET_TCP) == false) {
01317       free(data);
01318       return (MsgUnitArr *)0; /* exception is filled with details */
01319    }
01320    free(data);
01321 
01322    /* Now process the returned messages */
01323 
01324    msgUnitArr = parseMsgUnitArr(responseSocketDataHolder.blob.dataLen, responseSocketDataHolder.blob.data);
01325    freeBlobHolderContent(&responseSocketDataHolder.blob);
01326 
01327    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
01328       "Returned %u messages for get()", msgUnitArr->len);
01329 
01330    return msgUnitArr;
01331 }
01332 
01336 static ssize_t writenPlain(void *userP, const int fd, const char *ptr, const size_t nbytes) {
01337    XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
01338    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,  "writenPlain(%u)", nbytes);
01339    return writen(fd, ptr, nbytes);
01340 }
01341 
01345 static ssize_t writenCompressed(void *userP, const int fd, const char *ptr, const size_t nbytes) {
01346    XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
01347    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,  "writenCompressed(%u)", nbytes);
01348    return xmlBlaster_writenCompressed(xb->zlibWriteBuf, fd, ptr, nbytes);
01349 }
01350 
01354 static ssize_t readnPlain(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
01355    XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
01356    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,  "readnPlain(%u)", nbytes);
01357    return readn(fd, ptr, nbytes, fpNumRead, userP2);
01358 }
01359 
01363 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
01364    XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)userP;
01365    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,  "readnCompressed(%u)", nbytes);
01366    return xmlBlaster_readnCompressed(xb->zlibReadBuf, fd, ptr, nbytes, fpNumRead, userP2);
01367 }
01368 
01376 static bool checkArgs(XmlBlasterConnectionUnparsed *xb, const char *methodName, bool checkIsConnected, XmlBlasterException *exception)
01377 {
01378    if (xb == 0) {
01379       char *stack = getStackTrace(10);
01380       printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %s() %s",
01381                __FILE__, __LINE__, methodName, stack);
01382       free(stack);
01383       return false;
01384    }
01385 
01386    if (exception == 0) {
01387       char *stack = getStackTrace(10);
01388       xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception pointer to %s() %s",
01389               __FILE__, __LINE__, methodName, stack);
01390       free(stack);
01391       return false;
01392    }
01393 
01394    if (checkIsConnected) {
01395       if (!xb->isConnected(xb)) {
01396          char *stack = getStackTrace(10);
01397          strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
01398          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
01399                   "[%.100s:%d] Not connected to xmlBlaster, %s() failed %s",
01400                    __FILE__, __LINE__, methodName, stack);
01401          free(stack);
01402          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
01403          return false;
01404       }
01405    }
01406 
01407    return true;
01408 }
01409 
01410