socket/XmlBlasterConnectionUnparsed.c

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002 Name:      XmlBlasterConnectionUnparsed.c
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Wraps raw socket connection to xmlBlaster
00006            for complete synchronous xmlBlaster access,
00007            without callbacks and not threading necessary
00008 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
00009 See:       http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
00010 -----------------------------------------------------------------------------*/
00011 #include <stdio.h>
00012 #include <stdlib.h>
00013 #include <string.h>
00014 #include <ctype.h> /* isalpha() */
00015 #if defined(WINCE)
00016 #  if defined(XB_USE_PTHREADS)
00017 #     include <pthreads/pthread.h>
00018 #  else
00019       /*#include <pthreads/need_errno.h> */
00020       static int errno=0; /* single threaded workaround*/
00021 #  endif
00022 #else
00023 #  include <errno.h>
00024 #  include <sys/types.h>
00025 #endif
00026 #include <socket/xmlBlasterSocket.h>
00027 #include <socket/xmlBlasterZlib.h>
00028 #include <XmlBlasterConnectionUnparsed.h>
00029 #define SOCKET_TCP false
00030 
00031 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception);
00032 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception);
00033 static bool getResponse(XmlBlasterConnectionUnparsed *xb, SocketDataHolder *responseSocketDataHolder, XmlBlasterException *exception, bool udp);
00034 static char *xmlBlasterConnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
00035 static bool xmlBlasterDisconnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
00036 static char *xmlBlasterPublish(XmlBlasterConnectionUnparsed *xb, MsgUnit *msgUnit, XmlBlasterException *exception);
00037 static QosArr *xmlBlasterPublishArr(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
00038 static void xmlBlasterPublishOneway(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
00039 static char *xmlBlasterSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
00040 static QosArr *xmlBlasterUnSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
00041 static QosArr *xmlBlasterErase(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
00042 static MsgUnitArr *xmlBlasterGet(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception);
00043 static char *xmlBlasterPing(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception);
00044 static bool isConnected(XmlBlasterConnectionUnparsed *xb);
00045 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb);
00046 static ssize_t writenPlain(void *xb, const int fd, const char *ptr, const size_t nbytes);
00047 static ssize_t writenCompressed(void *xb, const int fd, const char *ptr, const size_t nbytes);
00048 static ssize_t readnPlain(void *xb, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2);
00049 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2);
00050 static bool checkArgs(XmlBlasterConnectionUnparsed *xb, const char *methodName, bool checkIsConnected, XmlBlasterException *exception);
00051 
00058 XmlBlasterConnectionUnparsed *getXmlBlasterConnectionUnparsed(int argc, const char* const* argv) {
00059    XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)calloc(1, sizeof(XmlBlasterConnectionUnparsed));
00060    if (xb == 0) return xb;
00061    xb->argc = argc;
00062    xb->argv = argv;
00063    xb->props = createProperties(xb->argc, xb->argv);
00064    if (xb->props == 0) {
00065       freeXmlBlasterConnectionUnparsed(&xb);
00066       return (XmlBlasterConnectionUnparsed *)0;
00067    }
00068    xb->socketToXmlBlaster = -1;
00069    xb->socketToXmlBlasterUdp = -1;
00070    xb->isInitialized = false;
00071    xb->requestId = 0;
00072    *xb->secretSessionId = 0;
00073    xb->initConnection = initConnection;
00074    xb->initQueue = xmlBlasterInitQueue;
00075    xb->connect = xmlBlasterConnect;
00076    xb->disconnect = xmlBlasterDisconnect;
00077    xb->publish = xmlBlasterPublish;
00078    xb->publishArr = xmlBlasterPublishArr;
00079    xb->publishOneway = xmlBlasterPublishOneway;
00080    xb->subscribe = xmlBlasterSubscribe;
00081    xb->unSubscribe = xmlBlasterUnSubscribe;
00082    xb->erase = xmlBlasterErase;
00083    xb->get = xmlBlasterGet;
00084    xb->ping = xmlBlasterPing;
00085    xb->isConnected = isConnected;
00086    xb->shutdown = xmlBlasterConnectionShutdown;
00087    xb->preSendEvent = 0;
00088    xb->preSendEvent_userP = 0;
00089    xb->postSendEvent = 0;
00090    xb->postSendEvent_userP = 0;
00091    xb->queueP = 0;
00092    xb->logLevel = parseLogLevel(xb->props->getString(xb->props, "logLevel", "WARN"));
00093    xb->log = xmlBlasterDefaultLogging;
00094    xb->logUserP = 0;
00095    xb->useUdpForOneway = false;
00096    xb->writeToSocket.writeToSocketFuncP = 0;
00097    xb->writeToSocket.userP = xb;
00098    xb->zlibWriteBuf = 0;
00099    xb->readFromSocket.readFromSocketFuncP = 0;
00100    xb->readFromSocket.userP = xb;
00101    xb->zlibReadBuf = 0;
00102    return xb;
00103 }
00104 
00105 void freeXmlBlasterConnectionUnparsed(XmlBlasterConnectionUnparsed **xb_)
00106 {
00107    XmlBlasterConnectionUnparsed *xb = *xb_;
00108    if (xb != 0) {
00109       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterConnectionUnparsed 0x%x", xb);
00110       freeProperties(xb->props);
00111       if (xb->zlibWriteBuf) {
00112          xmlBlaster_endZlibWriter(xb->zlibWriteBuf);
00113          free(xb->zlibWriteBuf);
00114          xb->zlibWriteBuf = 0;
00115       }
00116       if (xb->zlibReadBuf) {
00117          xmlBlaster_endZlibReader(xb->zlibReadBuf);
00118          free(xb->zlibReadBuf);
00119          xb->zlibReadBuf = 0;
00120       }
00121       xmlBlasterConnectionShutdown(xb);
00122       free(xb);
00123       *xb_ = 0;
00124    }
00125 }
00126 
00131 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception)
00132 {
00133    const char *servTcpPort = 0;
00134 
00135    struct sockaddr_in xmlBlasterAddr;
00136    struct hostent hostbuf, *hostP = 0;
00137    struct servent *portP = 0;
00138 
00139    size_t hstbuflen=0;
00140 
00141    char serverHostName[256];
00142    char errP[MAX_ERRNO_LEN];
00143 
00144 #if defined(_WINDOWS)
00145    WORD wVersionRequested;
00146    WSADATA wsaData;
00147    int err;
00148    wVersionRequested = MAKEWORD( 2, 2 );
00149    err = WSAStartup( wVersionRequested, &wsaData );
00150    if ( err != 0 ) {
00151       strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00152       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL", __FILE__, __LINE__);
00153       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00154       return false;
00155    }
00156 
00157    if ( LOBYTE( wsaData.wVersion ) != 2 ||
00158    HIBYTE( wsaData.wVersion ) != 2 ) {
00159       WSACleanup( );
00160       strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00161       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL which supports version 2.2", __FILE__, __LINE__);
00162       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00163       return false; 
00164    }
00165 # endif
00166    *errP = 0;
00167 
00168    if (xb->isInitialized) {
00169       return true;
00170    }
00171 
00172    {  /* Switch on compression? */
00173       const char *compressType = xb->props->getString(xb->props, "plugin/socket/compress/type", "");
00174       compressType = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/compress/type", compressType);
00175 
00176       if (!strcmp(compressType, "zlib:stream")) {
00177          
00178          xb->zlibWriteBuf = (XmlBlasterZlibWriteBuffers *)malloc(sizeof(struct XmlBlasterZlibWriteBuffers));
00179          xb->zlibReadBuf = (XmlBlasterZlibReadBuffers *)malloc(sizeof(struct XmlBlasterZlibReadBuffers));
00180 
00181          if (xmlBlaster_initZlibWriter(xb->zlibWriteBuf) != 0/*Z_OK*/) {
00182             if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00183                   "Failed switching on 'plugin/socket/compress/type=%s'", compressType);
00184             strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00185             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00186                      "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'",
00187                      __FILE__, __LINE__, compressType);
00188             free(xb->zlibWriteBuf);
00189             xb->zlibWriteBuf = 0;
00190             free(xb->zlibReadBuf);
00191             xb->zlibReadBuf = 0;
00192             return false;
00193          }
00194 
00195          if (xmlBlaster_initZlibReader(xb->zlibReadBuf) != 0/*Z_OK*/) {
00196             if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
00197                   "Failed switching on 'plugin/socket/compress/type=%s'", compressType);
00198             strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00199             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00200                      "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'",
00201                      __FILE__, __LINE__, compressType);
00202             free(xb->zlibWriteBuf);
00203             xb->zlibWriteBuf = 0;
00204             free(xb->zlibReadBuf);
00205             xb->zlibReadBuf = 0;
00206             return false;
00207          }
00208 
00209          if (xb->logLevel>=XMLBLASTER_LOG_DUMP) {
00210             xb->zlibWriteBuf->debug = true;
00211             xb->zlibReadBuf->debug = true;
00212          }
00213 
00214          if (!xb->writeToSocket.writeToSocketFuncP) {  /* Accept setting from XmlBlasterAccessUnparsed */
00215             xb->writeToSocket.writeToSocketFuncP = writenCompressed;
00216             xb->readFromSocket.readFromSocketFuncP = readnCompressed;
00217          }
00218       }
00219       else {
00220          if (strcmp(compressType, "")) {
00221             xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode.", compressType);
00222          }
00223          if (!xb->writeToSocket.writeToSocketFuncP) {  /* Accept setting from XmlBlasterAccessUnparsed */
00224             xb->writeToSocket.writeToSocketFuncP = writenPlain;
00225             xb->readFromSocket.readFromSocketFuncP = readnPlain;
00226          }
00227       }
00228    }
00229 
00230 
00231    servTcpPort = xb->props->getString(xb->props, "plugin/socket/port", "7607");
00232    servTcpPort = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/port", servTcpPort);
00233 
00234    strncpy0(serverHostName, "localhost", 250);
00235    gethostname(serverHostName, 250);
00236    {
00237       const char *hn = xb->props->getString(xb->props, "plugin/socket/hostname", serverHostName);
00238       memmove(serverHostName, hn, strlen(hn)+1);  /* including '\0' */
00239       hn = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/hostname", serverHostName);
00240       memmove(serverHostName, hn, strlen(hn)+1);
00241    }
00242 
00243    if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00244       "Lookup xmlBlaster on -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %s ...",
00245       serverHostName, servTcpPort);
00246 
00247    *xb->secretSessionId = 0;
00248    memset((char *)&xmlBlasterAddr, 0, sizeof(xmlBlasterAddr));
00249    xmlBlasterAddr.sin_family=AF_INET;
00250 
00251 # if _WINDOWS_NOT_YET_PORTED /* Windows gethostbyname is deprecated */
00252    const struct addrinfo hints;
00253    struct addrinfo** res;
00254    int getaddrinfo(serverHostName, servTcpPort, &hints, res);
00255    res->ai_next : ai_family, ai_socktype, and ai_protocol
00256 
00257    ...
00258 
00259    void freeaddrinfo(*res);
00260 # endif
00261         if (isalpha(serverHostName[0]) || strchr(serverHostName,':') != 0) { /* look for dns name or ipv6 */
00262            char *tmphstbuf=0;
00263            memset((char *)&hostbuf, 0, sizeof(struct hostent));
00264            xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server DNS lookup of hostname '%s'", serverHostName);
00265            hostP = gethostbyname_re(serverHostName, &hostbuf, &tmphstbuf, &hstbuflen, errP);
00266            if (hostP == 0) {
00267               if (*errP != 0) {
00268                  strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00269                  SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00270                     "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s: %s",
00271                           __FILE__, __LINE__, serverHostName, servTcpPort, errP);
00272                  xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
00273                  *errP = 0;
00274               }
00275               else {
00276                  strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00277                  SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00278                           "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s, errno=%d",
00279                           __FILE__, __LINE__, serverHostName, servTcpPort, errno);
00280                  xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
00281               }
00282               return false;
00283            }
00284            xmlBlasterAddr.sin_addr.s_addr = ((struct in_addr *)(hostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */
00285            free(tmphstbuf);
00286         }
00287         else {
00288            xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server IP4 usage (without DNS lookup) of IP '%s'", serverHostName);
00289            /* use ip4 addr directly to avoid dns lookup */
00290            xmlBlasterAddr.sin_addr.s_addr = inet_addr(serverHostName);
00291         }
00292 
00293    portP = getservbyname(servTcpPort, "tcp");
00294    if (portP != 0)
00295                 xmlBlasterAddr.sin_port = (u_short)portP->s_port;
00296    else
00297       xmlBlasterAddr.sin_port = htons((u_short)atoi(servTcpPort));
00298 
00299    xb->socketToXmlBlaster = (int)socket(AF_INET, SOCK_STREAM, 0);
00300    if (xb->socketToXmlBlaster != -1) {
00301       int ret=0;
00302       const char *localHostName = xb->props->getString(xb->props, "plugin/socket/localHostname", 0);
00303       int localPort = xb->props->getInt(xb->props, "plugin/socket/localPort", 0);
00304       localHostName = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/localHostname", localHostName);
00305       localPort = xb->props->getInt(xb->props, "dispatch/connection/plugin/socket/localPort", localPort);
00306 
00307       /* Sometimes a user may whish to force the local host/port setting (e.g. for firewall tunneling
00308          and on multi homed hosts */
00309       if (localHostName != 0 || localPort > 0) {
00310          struct sockaddr_in localAddr;
00311          struct hostent localHostbuf, *localHostP = 0;
00312          char *tmpLocalHostbuf=0;
00313          size_t localHostbuflen=0;
00314          memset(&localAddr, 0, sizeof(localAddr));
00315          localAddr.sin_family = AF_INET;
00316          if (localHostName) {
00317             if (isalpha(localHostName[0]) || strchr(localHostName,':') != 0) { /* look for dns name or ipv6 */
00318                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local hostname DNS lookup of hostname '%s'", localHostName);
00319                *errP = 0;
00320                localHostP = gethostbyname_re(localHostName, &localHostbuf, &tmpLocalHostbuf, &localHostbuflen, errP);
00321                if (*errP != 0) {
00322                   strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00323                   SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00324                            "[%.100s:%d] Lookup of local IP failed, %s",
00325                            __FILE__, __LINE__, errP);
00326                   xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
00327                   *errP = 0;
00328                }
00329                if (localHostP != 0) {
00330                   localAddr.sin_addr.s_addr = ((struct in_addr *)(localHostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */
00331                   free(tmpLocalHostbuf);
00332                }
00333             }
00334             else {
00335                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local IP4 usage (without DNS lookup) of IP '%s'", localHostName);
00336                /* use ip4 addr directly to avoid dns lookup */
00337                localAddr.sin_addr.s_addr = inet_addr(localHostName);
00338             }
00339          }
00340          if (localPort > 0) {
00341             localAddr.sin_port = htons((unsigned short)localPort);
00342          }
00343          if (bind(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
00344             if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00345                "Failed binding local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
00346                   localHostName, localPort);
00347          }
00348          else {
00349             xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00350                "Bound local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
00351                   localHostName, localPort);
00352          }
00353       }
00354 
00355       /* int retval = fcntl(xb->socketToXmlBlaster, F_SETFL, O_NONBLOCK); */ /* Switch on none blocking mode: we then should use select() to be notified when the kernel succeeded with connect() */
00356 
00357       if ((ret=connect(xb->socketToXmlBlaster, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) != -1) {
00358          if (xb->logLevel>=XMLBLASTER_LOG_INFO) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Connected to xmlBlaster");
00359          xb->useUdpForOneway = xb->props->getBool(xb->props, "plugin/socket/useUdpForOneway", xb->useUdpForOneway);
00360          xb->useUdpForOneway = xb->props->getBool(xb->props, "dispatch/connection/plugin/socket/useUdpForOneway", xb->useUdpForOneway);
00361 
00362          if (xb->useUdpForOneway) {
00363             struct sockaddr_in localAddr;
00364             socklen_t size = (socklen_t)sizeof(localAddr);
00365             xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
00366                "Using UDP connection for oneway calls, see -dispatch/connection/plugin/socket/useUdpForOneway true");
00367 
00368             xb->socketToXmlBlasterUdp = (int)socket(AF_INET, SOCK_DGRAM, 0);
00369 
00370             if (xb->socketToXmlBlasterUdp != -1) {
00371                if (getsockname(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, &size) == -1) {
00372                   if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00373                      "Can't determine the local socket host and port (in UDP), errno=%d", errno);
00374                   return false;
00375                }
00376 
00377                if (bind(xb->socketToXmlBlasterUdp, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
00378                   if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__,
00379                      "Failed binding local port (in UDP) -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
00380                      localHostName, localPort);
00381                   return false;
00382                }
00383                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00384                   "Bound local UDP port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d",
00385                   localHostName, localPort);
00386 
00387                if ((ret=connect(xb->socketToXmlBlasterUdp, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) == -1) {
00388                   char errnoStr[MAX_ERRNO_LEN];
00389                   xb_strerror(errnoStr, MAX_ERRNO_LEN, errno);
00390                   strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00391                   SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00392                            "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP), ret=%d, %s",
00393                            __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr);
00394                   if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00395                   return false;
00396                }
00397                if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connected to xmlBlaster with UDP");
00398             } /* if (xb->socketToXmlBlasterUdp != -1) */
00399             else {
00400                strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00401                SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00402                         "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP) errno=%d",
00403                         __FILE__, __LINE__, serverHostName, servTcpPort, errno);
00404                return false;
00405             }
00406          } /* if (xb->useUdpForOneway) */
00407 
00408       }
00409       else { /* connect(...) == -1 */
00410          char errnoStr[MAX_ERRNO_LEN];
00411          xb_strerror(errnoStr, MAX_ERRNO_LEN, errno);
00412          strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00413          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00414                   "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed, ret=%d, %s",
00415                   __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr);
00416          if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
00417          return false;
00418       }
00419    }
00420    else {
00421       strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00422       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00423                "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed errno=%d",
00424                __FILE__, __LINE__, serverHostName, servTcpPort, errno);
00425       return false;
00426    }
00427    xb->isInitialized = true;
00428    return true;
00429 }
00430 
00431 
00449 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception)
00450 {
00451 #ifdef XMLBLASTER_PERSISTENT_QUEUE_TEST
00452    if (checkArgs(xb, "initQueue", false, exception) == false ) return false;
00453    if (xb->queueP) {
00454       char message[XMLBLASTEREXCEPTION_MESSAGE_LEN];
00455       SNPRINTF(message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00456                "[%.100s:%d] The queue is initialized already, call to initQueue() is ignored", __FILE__, __LINE__);
00457       embedException(exception, "user.illegalArgument", message, exception);
00458       xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
00459       return false;
00460    }
00461 
00462    {
00463       QueueProperties tmp;
00464       memset(&tmp, 0, sizeof(QueueProperties));
00465 
00466       if (queueProperties == 0)
00467          queueProperties = &tmp;
00468 
00469       if (*queueProperties->dbName == 0) {
00470          strncpy0(queueProperties->dbName, xb->props->getString(xb->props, "queue/connection/dbName", "xmlBlasterClient.db"), QUEUE_DBNAME_MAX);
00471       }
00472       if (*queueProperties->nodeId == 0) {
00473          strncpy0(queueProperties->nodeId, xb->props->getString(xb->props, "queue/connection/nodeId", "client"), QUEUE_ID_MAX);
00474       }
00475       if (*queueProperties->queueName == 0) {
00476          strncpy0(queueProperties->queueName, xb->props->getString(xb->props, "queue/connection/queueName", "connection_client"), QUEUE_ID_MAX);
00477       }
00478       if (*queueProperties->tablePrefix == 0) {
00479          strncpy0(queueProperties->tablePrefix, xb->props->getString(xb->props, "queue/connection/tablePrefix", "XB_"), QUEUE_PREFIX_MAX);
00480       }
00481       if (queueProperties->maxNumOfEntries == 0) {
00482          queueProperties->maxNumOfEntries = xb->props->getInt(xb->props, "queue/connection/maxEntries", 10000000);
00483       }
00484       if (queueProperties->maxNumOfBytes == 0) {
00485          queueProperties->maxNumOfBytes = xb->props->getInt64(xb->props, "queue/connection/maxBytes", 10000000LL);
00486       }
00487       if (queueProperties->logFp == 0) queueProperties->logFp = xb->log;
00488       if (queueProperties->logLevel == 0) queueProperties->logLevel = xb->logLevel;
00489       if (queueProperties->userObject == 0) queueProperties->userObject = xb->userObject;
00490 
00491       xb->queueP = createQueue(queueProperties, exception);
00492       if (*exception->errorCode != 0) {
00493          xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Queue initializeation failed: [%s] %s\n", exception->errorCode, exception->message);
00494          return false;
00495       }
00496       xb->queueP->userObject = xb;
00497    }
00498    return true;
00499 #else
00500    if (queueProperties) {} /* To suppress compiler warning that not used */
00501    strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
00502    SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
00503             "[%.100s:%d] Queue support is not compiled into the library, please recompile with '-DXMLBLASTER_PERSISTENT_QUEUE=1 and -DXMLBLASTER_PERSISTENT_QUEUE_TEST=1", __FILE__, __LINE__);
00504    xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
00505    return false;
00506 #endif /* XMLBLASTER_PERSISTENT_QUEUE_TEST */
00507 }
00508 
00509 static bool isConnected(XmlBlasterConnectionUnparsed *xb)
00510 {
00511    return (xb->socketToXmlBlaster > -1) ? true : false;
00512 }
00513 
00514 const char *xmlBlasterConnectionUnparsedUsage()
00515 {
00516    /* To prevent compiler warning */
00517    /*   "string length `596' is greater than the length `509' ISO C89 compilers are required to support" */
00518    /* we have a static variable */
00519    enum { SIZE=2048 };
00520    static char usage[SIZE];
00521    strncpy0(usage, 
00522       "\n   -dispatch/connection/plugin/socket/hostname [localhost]"
00523       "\n                       Where to find xmlBlaster."
00524       "\n   -dispatch/connection/plugin/socket/port [7607]"
00525       "\n                       The port where xmlBlaster listens."
00526       "\n   -dispatch/connection/plugin/socket/localHostname [NULL]", SIZE/2);
00527    strncat0(usage,
00528       "\n                       Force the local IP, useful on multi homed computers."
00529       "\n   -dispatch/connection/plugin/socket/localPort [0]"
00530       "\n                       Force the local port, useful to tunnel firewalls."
00531       "\n   -dispatch/connection/plugin/socket/compress/type []"
00532 #if XMLBLASTER_ZLIB==1
00533       "\n                       Switch on compression with 'zlib:stream'."
00534 #else
00535       "\n                       No compression support. Try recompiling with with '-DXMLBLASTER_ZLIB=1'."
00536 #endif
00537       "\n   -dispatch/connection/plugin/socket/useUdpForOneway [false]"
00538       "\n                       Use UDP for publishOneway() calls.", SIZE/2);
00539    return usage;
00540 }
00541 
00545 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb)
00546 {
00547    if (xb != 0 && xb->isConnected(xb)) {
00548 #     if defined(_WINDOWS)
00549       int how = SD_BOTH;   /* SD_BOTH requires Winsock2.h */
00550 #     else
00551       int how = SHUT_RDWR; /* enum SHUT_RDWR = 2 */
00552 #     endif
00553       if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
00554             "shutdown() socketToXmlBlaster=%d socketToXmlBlasterUdp=%d", xb->