
00001 /*---------------------------------------------------------------------------- 00002 Name: XmlBlasterConnectionUnparsed.c 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Wraps raw socket connection to xmlBlaster 00006 for complete synchronous xmlBlaster access, 00007 without callbacks and not threading necessary 00008 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info> 00009 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html 00010 -----------------------------------------------------------------------------*/ 00011 #include <stdio.h> 00012 #include <stdlib.h> 00013 #include <string.h> 00014 #include <ctype.h> /* isalpha() */ 00015 #if defined(WINCE) 00016 # if defined(XB_USE_PTHREADS) 00017 # include <pthreads/pthread.h> 00018 # else 00019 /*#include <pthreads/need_errno.h> */ 00020 static int errno=0; /* single threaded workaround*/ 00021 # endif 00022 #else 00023 # include <errno.h> 00024 # include <sys/types.h> 00025 #endif 00026 #include <socket/xmlBlasterSocket.h> 00027 #include <socket/xmlBlasterZlib.h> 00028 #include <XmlBlasterConnectionUnparsed.h> 00029 #define SOCKET_TCP false 00030 00031 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception); 00032 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception); 00033 static bool getResponse(XmlBlasterConnectionUnparsed *xb, SocketDataHolder *responseSocketDataHolder, XmlBlasterException *exception, bool udp); 00034 static char *xmlBlasterConnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception); 00035 static bool xmlBlasterDisconnect(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception); 00036 static char *xmlBlasterPublish(XmlBlasterConnectionUnparsed *xb, MsgUnit *msgUnit, XmlBlasterException *exception); 00037 static QosArr *xmlBlasterPublishArr(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception); 00038 static void xmlBlasterPublishOneway(XmlBlasterConnectionUnparsed *xb, MsgUnitArr *msgUnitArr, XmlBlasterException *exception); 00039 static char *xmlBlasterSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception); 00040 static QosArr *xmlBlasterUnSubscribe(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception); 00041 static QosArr *xmlBlasterErase(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception); 00042 static MsgUnitArr *xmlBlasterGet(XmlBlasterConnectionUnparsed *xb, const char * const key, const char * qos, XmlBlasterException *exception); 00043 static char *xmlBlasterPing(XmlBlasterConnectionUnparsed *xb, const char * const qos, XmlBlasterException *exception); 00044 static bool isConnected(XmlBlasterConnectionUnparsed *xb); 00045 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb); 00046 static ssize_t writenPlain(void *xb, const int fd, const char *ptr, const size_t nbytes); 00047 static ssize_t writenCompressed(void *xb, const int fd, const char *ptr, const size_t nbytes); 00048 static ssize_t readnPlain(void *xb, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2); 00049 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2); 00050 static bool checkArgs(XmlBlasterConnectionUnparsed *xb, const char *methodName, bool checkIsConnected, XmlBlasterException *exception); 00051 00058 XmlBlasterConnectionUnparsed *getXmlBlasterConnectionUnparsed(int argc, const char* const* argv) { 00059 XmlBlasterConnectionUnparsed *xb = (XmlBlasterConnectionUnparsed *)calloc(1, sizeof(XmlBlasterConnectionUnparsed)); 00060 if (xb == 0) return xb; 00061 xb->argc = argc; 00062 xb->argv = argv; 00063 xb->props = createProperties(xb->argc, xb->argv); 00064 if (xb->props == 0) { 00065 freeXmlBlasterConnectionUnparsed(&xb); 00066 return (XmlBlasterConnectionUnparsed *)0; 00067 } 00068 xb->socketToXmlBlaster = -1; 00069 xb->socketToXmlBlasterUdp = -1; 00070 xb->isInitialized = false; 00071 xb->requestId = 0; 00072 *xb->secretSessionId = 0; 00073 xb->initConnection = initConnection; 00074 xb->initQueue = xmlBlasterInitQueue; 00075 xb->connect = xmlBlasterConnect; 00076 xb->disconnect = xmlBlasterDisconnect; 00077 xb->publish = xmlBlasterPublish; 00078 xb->publishArr = xmlBlasterPublishArr; 00079 xb->publishOneway = xmlBlasterPublishOneway; 00080 xb->subscribe = xmlBlasterSubscribe; 00081 xb->unSubscribe = xmlBlasterUnSubscribe; 00082 xb->erase = xmlBlasterErase; 00083 xb->get = xmlBlasterGet; 00084 xb->ping = xmlBlasterPing; 00085 xb->isConnected = isConnected; 00086 xb->shutdown = xmlBlasterConnectionShutdown; 00087 xb->preSendEvent = 0; 00088 xb->preSendEvent_userP = 0; 00089 xb->postSendEvent = 0; 00090 xb->postSendEvent_userP = 0; 00091 xb->queueP = 0; 00092 xb->logLevel = parseLogLevel(xb->props->getString(xb->props, "logLevel", "WARN")); 00093 xb->log = xmlBlasterDefaultLogging; 00094 xb->logUserP = 0; 00095 xb->useUdpForOneway = false; 00096 xb->writeToSocket.writeToSocketFuncP = 0; 00097 xb->writeToSocket.userP = xb; 00098 xb->zlibWriteBuf = 0; 00099 xb->readFromSocket.readFromSocketFuncP = 0; 00100 xb->readFromSocket.userP = xb; 00101 xb->zlibReadBuf = 0; 00102 return xb; 00103 } 00104 00105 void freeXmlBlasterConnectionUnparsed(XmlBlasterConnectionUnparsed **xb_) 00106 { 00107 XmlBlasterConnectionUnparsed *xb = *xb_; 00108 if (xb != 0) { 00109 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterConnectionUnparsed 0x%x", xb); 00110 freeProperties(xb->props); 00111 if (xb->zlibWriteBuf) { 00112 xmlBlaster_endZlibWriter(xb->zlibWriteBuf); 00113 free(xb->zlibWriteBuf); 00114 xb->zlibWriteBuf = 0; 00115 } 00116 if (xb->zlibReadBuf) { 00117 xmlBlaster_endZlibReader(xb->zlibReadBuf); 00118 free(xb->zlibReadBuf); 00119 xb->zlibReadBuf = 0; 00120 } 00121 xmlBlasterConnectionShutdown(xb); 00122 free(xb); 00123 *xb_ = 0; 00124 } 00125 } 00126 00131 static bool initConnection(XmlBlasterConnectionUnparsed *xb, XmlBlasterException *exception) 00132 { 00133 const char *servTcpPort = 0; 00134 00135 struct sockaddr_in xmlBlasterAddr; 00136 struct hostent hostbuf, *hostP = 0; 00137 struct servent *portP = 0; 00138 00139 size_t hstbuflen=0; 00140 00141 char serverHostName[256]; 00142 char errP[MAX_ERRNO_LEN]; 00143 00144 #if defined(_WINDOWS) 00145 WORD wVersionRequested; 00146 WSADATA wsaData; 00147 int err; 00148 wVersionRequested = MAKEWORD( 2, 2 ); 00149 err = WSAStartup( wVersionRequested, &wsaData ); 00150 if ( err != 0 ) { 00151 strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00152 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL", __FILE__, __LINE__); 00153 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00154 return false; 00155 } 00156 00157 if ( LOBYTE( wsaData.wVersion ) != 2 || 00158 HIBYTE( wsaData.wVersion ) != 2 ) { 00159 WSACleanup( ); 00160 strncpy0(exception->errorCode, "resource.unavailable", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00161 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Couldn't find a usable WinSock DLL which supports version 2.2", __FILE__, __LINE__); 00162 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00163 return false; 00164 } 00165 # endif 00166 *errP = 0; 00167 00168 if (xb->isInitialized) { 00169 return true; 00170 } 00171 00172 { /* Switch on compression? */ 00173 const char *compressType = xb->props->getString(xb->props, "plugin/socket/compress/type", ""); 00174 compressType = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/compress/type", compressType); 00175 00176 if (!strcmp(compressType, "zlib:stream")) { 00177 00178 xb->zlibWriteBuf = (XmlBlasterZlibWriteBuffers *)malloc(sizeof(struct XmlBlasterZlibWriteBuffers)); 00179 xb->zlibReadBuf = (XmlBlasterZlibReadBuffers *)malloc(sizeof(struct XmlBlasterZlibReadBuffers)); 00180 00181 if (xmlBlaster_initZlibWriter(xb->zlibWriteBuf) != 0/*Z_OK*/) { 00182 if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00183 "Failed switching on 'plugin/socket/compress/type=%s'", compressType); 00184 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00185 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00186 "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'", 00187 __FILE__, __LINE__, compressType); 00188 free(xb->zlibWriteBuf); 00189 xb->zlibWriteBuf = 0; 00190 free(xb->zlibReadBuf); 00191 xb->zlibReadBuf = 0; 00192 return false; 00193 } 00194 00195 if (xmlBlaster_initZlibReader(xb->zlibReadBuf) != 0/*Z_OK*/) { 00196 if (xb->logLevel>=XMLBLASTER_LOG_ERROR) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00197 "Failed switching on 'plugin/socket/compress/type=%s'", compressType); 00198 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00199 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00200 "[%.100s:%d] Failed switching on 'plugin/socket/compress/type=%s'", 00201 __FILE__, __LINE__, compressType); 00202 free(xb->zlibWriteBuf); 00203 xb->zlibWriteBuf = 0; 00204 free(xb->zlibReadBuf); 00205 xb->zlibReadBuf = 0; 00206 return false; 00207 } 00208 00209 if (xb->logLevel>=XMLBLASTER_LOG_DUMP) { 00210 xb->zlibWriteBuf->debug = true; 00211 xb->zlibReadBuf->debug = true; 00212 } 00213 00214 if (!xb->writeToSocket.writeToSocketFuncP) { /* Accept setting from XmlBlasterAccessUnparsed */ 00215 xb->writeToSocket.writeToSocketFuncP = writenCompressed; 00216 xb->readFromSocket.readFromSocketFuncP = readnCompressed; 00217 } 00218 } 00219 else { 00220 if (strcmp(compressType, "")) { 00221 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode.", compressType); 00222 } 00223 if (!xb->writeToSocket.writeToSocketFuncP) { /* Accept setting from XmlBlasterAccessUnparsed */ 00224 xb->writeToSocket.writeToSocketFuncP = writenPlain; 00225 xb->readFromSocket.readFromSocketFuncP = readnPlain; 00226 } 00227 } 00228 } 00229 00230 00231 servTcpPort = xb->props->getString(xb->props, "plugin/socket/port", "7607"); 00232 servTcpPort = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/port", servTcpPort); 00233 00234 strncpy0(serverHostName, "localhost", 250); 00235 gethostname(serverHostName, 250); 00236 { 00237 const char *hn = xb->props->getString(xb->props, "plugin/socket/hostname", serverHostName); 00238 memmove(serverHostName, hn, strlen(hn)+1); /* including '\0' */ 00239 hn = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/hostname", serverHostName); 00240 memmove(serverHostName, hn, strlen(hn)+1); 00241 } 00242 00243 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00244 "Lookup xmlBlaster on -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %s ...", 00245 serverHostName, servTcpPort); 00246 00247 *xb->secretSessionId = 0; 00248 memset((char *)&xmlBlasterAddr, 0, sizeof(xmlBlasterAddr)); 00249 xmlBlasterAddr.sin_family=AF_INET; 00250 00251 # if _WINDOWS_NOT_YET_PORTED /* Windows gethostbyname is deprecated */ 00252 const struct addrinfo hints; 00253 struct addrinfo** res; 00254 int getaddrinfo(serverHostName, servTcpPort, &hints, res); 00255 res->ai_next : ai_family, ai_socktype, and ai_protocol 00256 00257 ... 00258 00259 void freeaddrinfo(*res); 00260 # endif 00261 if (isalpha(serverHostName[0]) || strchr(serverHostName,':') != 0) { /* look for dns name or ipv6 */ 00262 char *tmphstbuf=0; 00263 memset((char *)&hostbuf, 0, sizeof(struct hostent)); 00264 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server DNS lookup of hostname '%s'", serverHostName); 00265 hostP = gethostbyname_re(serverHostName, &hostbuf, &tmphstbuf, &hstbuflen, errP); 00266 if (hostP == 0) { 00267 if (*errP != 0) { 00268 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00269 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00270 "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s: %s", 00271 __FILE__, __LINE__, serverHostName, servTcpPort, errP); 00272 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 00273 *errP = 0; 00274 } 00275 else { 00276 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00277 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00278 "[%.100s:%d] Connecting to xmlBlaster failed, can't determine hostname (hostP=0), -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s, errno=%d", 00279 __FILE__, __LINE__, serverHostName, servTcpPort, errno); 00280 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 00281 } 00282 return false; 00283 } 00284 xmlBlasterAddr.sin_addr.s_addr = ((struct in_addr *)(hostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */ 00285 free(tmphstbuf); 00286 } 00287 else { 00288 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Server IP4 usage (without DNS lookup) of IP '%s'", serverHostName); 00289 /* use ip4 addr directly to avoid dns lookup */ 00290 xmlBlasterAddr.sin_addr.s_addr = inet_addr(serverHostName); 00291 } 00292 00293 portP = getservbyname(servTcpPort, "tcp"); 00294 if (portP != 0) 00295 xmlBlasterAddr.sin_port = (u_short)portP->s_port; 00296 else 00297 xmlBlasterAddr.sin_port = htons((u_short)atoi(servTcpPort)); 00298 00299 xb->socketToXmlBlaster = (int)socket(AF_INET, SOCK_STREAM, 0); 00300 if (xb->socketToXmlBlaster != -1) { 00301 int ret=0; 00302 const char *localHostName = xb->props->getString(xb->props, "plugin/socket/localHostname", 0); 00303 int localPort = xb->props->getInt(xb->props, "plugin/socket/localPort", 0); 00304 localHostName = xb->props->getString(xb->props, "dispatch/connection/plugin/socket/localHostname", localHostName); 00305 localPort = xb->props->getInt(xb->props, "dispatch/connection/plugin/socket/localPort", localPort); 00306 00307 /* Sometimes a user may whish to force the local host/port setting (e.g. for firewall tunneling 00308 and on multi homed hosts */ 00309 if (localHostName != 0 || localPort > 0) { 00310 struct sockaddr_in localAddr; 00311 struct hostent localHostbuf, *localHostP = 0; 00312 char *tmpLocalHostbuf=0; 00313 size_t localHostbuflen=0; 00314 memset(&localAddr, 0, sizeof(localAddr)); 00315 localAddr.sin_family = AF_INET; 00316 if (localHostName) { 00317 if (isalpha(localHostName[0]) || strchr(localHostName,':') != 0) { /* look for dns name or ipv6 */ 00318 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local hostname DNS lookup of hostname '%s'", localHostName); 00319 *errP = 0; 00320 localHostP = gethostbyname_re(localHostName, &localHostbuf, &tmpLocalHostbuf, &localHostbuflen, errP); 00321 if (*errP != 0) { 00322 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00323 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00324 "[%.100s:%d] Lookup of local IP failed, %s", 00325 __FILE__, __LINE__, errP); 00326 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 00327 *errP = 0; 00328 } 00329 if (localHostP != 0) { 00330 localAddr.sin_addr.s_addr = ((struct in_addr *)(localHostP->h_addr))->s_addr; /* inet_addr("192.168.1.2"); */ 00331 free(tmpLocalHostbuf); 00332 } 00333 } 00334 else { 00335 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Local IP4 usage (without DNS lookup) of IP '%s'", localHostName); 00336 /* use ip4 addr directly to avoid dns lookup */ 00337 localAddr.sin_addr.s_addr = inet_addr(localHostName); 00338 } 00339 } 00340 if (localPort > 0) { 00341 localAddr.sin_port = htons((unsigned short)localPort); 00342 } 00343 if (bind(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { 00344 if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00345 "Failed binding local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d", 00346 localHostName, localPort); 00347 } 00348 else { 00349 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00350 "Bound local port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d", 00351 localHostName, localPort); 00352 } 00353 } 00354 00355 /* int retval = fcntl(xb->socketToXmlBlaster, F_SETFL, O_NONBLOCK); */ /* Switch on none blocking mode: we then should use select() to be notified when the kernel succeeded with connect() */ 00356 00357 if ((ret=connect(xb->socketToXmlBlaster, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) != -1) { 00358 if (xb->logLevel>=XMLBLASTER_LOG_INFO) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, "Connected to xmlBlaster"); 00359 xb->useUdpForOneway = xb->props->getBool(xb->props, "plugin/socket/useUdpForOneway", xb->useUdpForOneway); 00360 xb->useUdpForOneway = xb->props->getBool(xb->props, "dispatch/connection/plugin/socket/useUdpForOneway", xb->useUdpForOneway); 00361 00362 if (xb->useUdpForOneway) { 00363 struct sockaddr_in localAddr; 00364 socklen_t size = (socklen_t)sizeof(localAddr); 00365 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00366 "Using UDP connection for oneway calls, see -dispatch/connection/plugin/socket/useUdpForOneway true"); 00367 00368 xb->socketToXmlBlasterUdp = (int)socket(AF_INET, SOCK_DGRAM, 0); 00369 00370 if (xb->socketToXmlBlasterUdp != -1) { 00371 if (getsockname(xb->socketToXmlBlaster, (struct sockaddr *)&localAddr, &size) == -1) { 00372 if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00373 "Can't determine the local socket host and port (in UDP), errno=%d", errno); 00374 return false; 00375 } 00376 00377 if (bind(xb->socketToXmlBlasterUdp, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { 00378 if (xb->logLevel>=XMLBLASTER_LOG_WARN) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, 00379 "Failed binding local port (in UDP) -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d", 00380 localHostName, localPort); 00381 return false; 00382 } 00383 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00384 "Bound local UDP port -dispatch/connection/plugin/socket/localHostname %s -dispatch/connection/plugin/socket/localPort %d", 00385 localHostName, localPort); 00386 00387 if ((ret=connect(xb->socketToXmlBlasterUdp, (struct sockaddr *)&xmlBlasterAddr, sizeof(xmlBlasterAddr))) == -1) { 00388 char errnoStr[MAX_ERRNO_LEN]; 00389 xb_strerror(errnoStr, MAX_ERRNO_LEN, errno); 00390 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00391 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00392 "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP), ret=%d, %s", 00393 __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr); 00394 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00395 return false; 00396 } 00397 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connected to xmlBlaster with UDP"); 00398 } /* if (xb->socketToXmlBlasterUdp != -1) */ 00399 else { 00400 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00401 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00402 "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed (in UDP) errno=%d", 00403 __FILE__, __LINE__, serverHostName, servTcpPort, errno); 00404 return false; 00405 } 00406 } /* if (xb->useUdpForOneway) */ 00407 00408 } 00409 else { /* connect(...) == -1 */ 00410 char errnoStr[MAX_ERRNO_LEN]; 00411 xb_strerror(errnoStr, MAX_ERRNO_LEN, errno); 00412 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00413 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00414 "[%.100s:%d] Connecting to xmlBlaster -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed, ret=%d, %s", 00415 __FILE__, __LINE__, serverHostName, servTcpPort, ret, errnoStr); 00416 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00417 return false; 00418 } 00419 } 00420 else { 00421 strncpy0(exception->errorCode, "user.configuration", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00422 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00423 "[%.100s:%d] Connecting to xmlBlaster (socket=-1) -dispatch/connection/plugin/socket/hostname %s -dispatch/connection/plugin/socket/port %.10s failed errno=%d", 00424 __FILE__, __LINE__, serverHostName, servTcpPort, errno); 00425 return false; 00426 } 00427 xb->isInitialized = true; 00428 return true; 00429 } 00430 00431 00449 static bool xmlBlasterInitQueue(XmlBlasterConnectionUnparsed *xb, QueueProperties *queueProperties, XmlBlasterException *exception) 00450 { 00451 #ifdef XMLBLASTER_PERSISTENT_QUEUE_TEST 00452 if (checkArgs(xb, "initQueue", false, exception) == false ) return false; 00453 if (xb->queueP) { 00454 char message[XMLBLASTEREXCEPTION_MESSAGE_LEN]; 00455 SNPRINTF(message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00456 "[%.100s:%d] The queue is initialized already, call to initQueue() is ignored", __FILE__, __LINE__); 00457 embedException(exception, "user.illegalArgument", message, exception); 00458 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 00459 return false; 00460 } 00461 00462 { 00463 QueueProperties tmp; 00464 memset(&tmp, 0, sizeof(QueueProperties)); 00465 00466 if (queueProperties == 0) 00467 queueProperties = &tmp; 00468 00469 if (*queueProperties->dbName == 0) { 00470 strncpy0(queueProperties->dbName, xb->props->getString(xb->props, "queue/connection/dbName", "xmlBlasterClient.db"), QUEUE_DBNAME_MAX); 00471 } 00472 if (*queueProperties->nodeId == 0) { 00473 strncpy0(queueProperties->nodeId, xb->props->getString(xb->props, "queue/connection/nodeId", "client"), QUEUE_ID_MAX); 00474 } 00475 if (*queueProperties->queueName == 0) { 00476 strncpy0(queueProperties->queueName, xb->props->getString(xb->props, "queue/connection/queueName", "connection_client"), QUEUE_ID_MAX); 00477 } 00478 if (*queueProperties->tablePrefix == 0) { 00479 strncpy0(queueProperties->tablePrefix, xb->props->getString(xb->props, "queue/connection/tablePrefix", "XB_"), QUEUE_PREFIX_MAX); 00480 } 00481 if (queueProperties->maxNumOfEntries == 0) { 00482 queueProperties->maxNumOfEntries = xb->props->getInt(xb->props, "queue/connection/maxEntries", 10000000); 00483 } 00484 if (queueProperties->maxNumOfBytes == 0) { 00485 queueProperties->maxNumOfBytes = xb->props->getInt64(xb->props, "queue/connection/maxBytes", 10000000LL); 00486 } 00487 if (queueProperties->logFp == 0) queueProperties->logFp = xb->log; 00488 if (queueProperties->logLevel == 0) queueProperties->logLevel = xb->logLevel; 00489 if (queueProperties->userObject == 0) queueProperties->userObject = xb->userObject; 00490 00491 xb->queueP = createQueue(queueProperties, exception); 00492 if (*exception->errorCode != 0) { 00493 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Queue initializeation failed: [%s] %s\n", exception->errorCode, exception->message); 00494 return false; 00495 } 00496 xb->queueP->userObject = xb; 00497 } 00498 return true; 00499 #else 00500 if (queueProperties) {} /* To suppress compiler warning that not used */ 00501 strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00502 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00503 "[%.100s:%d] Queue support is not compiled into the library, please recompile with '-DXMLBLASTER_PERSISTENT_QUEUE=1 and -DXMLBLASTER_PERSISTENT_QUEUE_TEST=1", __FILE__, __LINE__); 00504 xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message); 00505 return false; 00506 #endif /* XMLBLASTER_PERSISTENT_QUEUE_TEST */ 00507 } 00508 00509 static bool isConnected(XmlBlasterConnectionUnparsed *xb) 00510 { 00511 return (xb->socketToXmlBlaster > -1) ? true : false; 00512 } 00513 00514 const char *xmlBlasterConnectionUnparsedUsage() 00515 { 00516 /* To prevent compiler warning */ 00517 /* "string length `596' is greater than the length `509' ISO C89 compilers are required to support" */ 00518 /* we have a static variable */ 00519 enum { SIZE=2048 }; 00520 static char usage[SIZE]; 00521 strncpy0(usage, 00522 "\n -dispatch/connection/plugin/socket/hostname [localhost]" 00523 "\n Where to find xmlBlaster." 00524 "\n -dispatch/connection/plugin/socket/port [7607]" 00525 "\n The port where xmlBlaster listens." 00526 "\n -dispatch/connection/plugin/socket/localHostname [NULL]", SIZE/2); 00527 strncat0(usage, 00528 "\n Force the local IP, useful on multi homed computers." 00529 "\n -dispatch/connection/plugin/socket/localPort [0]" 00530 "\n Force the local port, useful to tunnel firewalls." 00531 "\n -dispatch/connection/plugin/socket/compress/type []" 00532 #if XMLBLASTER_ZLIB==1 00533 "\n Switch on compression with 'zlib:stream'." 00534 #else 00535 "\n No compression support. Try recompiling with with '-DXMLBLASTER_ZLIB=1'." 00536 #endif 00537 "\n -dispatch/connection/plugin/socket/useUdpForOneway [false]" 00538 "\n Use UDP for publishOneway() calls.", SIZE/2); 00539 return usage; 00540 } 00541 00545 static void xmlBlasterConnectionShutdown(XmlBlasterConnectionUnparsed *xb) 00546 { 00547 if (xb != 0 && xb->isConnected(xb)) { 00548 # if defined(_WINDOWS) 00549 int how = SD_BOTH; /* SD_BOTH requires Winsock2.h */ 00550 # else 00551 int how = SHUT_RDWR; /* enum SHUT_RDWR = 2 */ 00552 # endif 00553 if (xb->logLevel>=XMLBLASTER_LOG_TRACE) xb->log(xb->logUserP, xb->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00554 "shutdown() socketToXmlBlaster=%d socketToXmlBlasterUdp=%d", xb->