
00001 /*---------------------------------------------------------------------------- 00002 Name: XmlBlasterAccessUnparsed.c 00003 Project: xmlBlaster.org 00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file 00005 Comment: Wraps raw socket connection to xmlBlaster 00006 Implements sync connection and async callback 00007 Needs pthread to compile (multi threading). 00008 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info> 00009 Compile: 00010 LINUX: gcc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -export-dynamic -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread 00011 g++ -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread 00012 icc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread 00013 WIN: cl /MT /W4 -DXmlBlasterAccessUnparsedMain -D_WINDOWS -I.. -I../pthreads /FeXmlBlasterAccessUnparsedMain.exe XmlBlasterAccessUnparsed.c ..\util\msgUtil.c ..\util\Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c ws2_32.lib pthreadVC2.lib 00014 (download pthread for Windows and WinCE from http://sources.redhat.com/pthreads-win32) 00015 Solaris: cc -DXmlBlasterAccessUnparsedMain -v -Xc -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl 00016 CC -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl 00017 00018 Linux with libxmlBlasterC.so: 00019 gcc -DXmlBlasterAccessUnparsedMain -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c -L../../../lib -lxmlBlasterClientC -I.. -Wl,-rpath=../../../lib -D_REENTRANT -lpthread 00020 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html 00021 -----------------------------------------------------------------------------*/ 00022 #include <stdio.h> 00023 #include <stdlib.h> 00024 #include <string.h> 00025 #if defined(WINCE) 00026 # if defined(XB_USE_PTHREADS) 00027 # include <pthreads/pthread.h> 00028 # else 00029 /*#include <pthreads/need_errno.h> */ 00030 static int errno=0; /* single threaded workaround*/ 00031 # endif 00032 #else 00033 # include <errno.h> 00034 # include <sys/types.h> 00035 #endif 00036 #include <socket/xmlBlasterSocket.h> 00037 #include <socket/xmlBlasterZlib.h> 00038 #include <XmlBlasterAccessUnparsed.h> 00039 00043 typedef struct Dll_Export UpdateContainer { 00044 XmlBlasterAccessUnparsed *xa; 00045 MsgUnitArr *msgUnitArrP; 00046 void *userData; 00047 XmlBlasterException exception; /* Holding a clone from the original as the callback thread may use it for another message */ 00048 SocketDataHolder socketDataHolder; /* Holding a clone from the original */ 00049 } UpdateContainer; 00050 00051 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp update, XmlBlasterException *exception); 00052 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos, UpdateFp update, XmlBlasterException *exception); 00053 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception); 00054 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception); 00055 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception); 00056 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception); 00057 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception); 00058 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception); 00059 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception); 00060 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception); 00061 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception); 00062 static bool isConnected(XmlBlasterAccessUnparsed *xa); 00063 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder); 00064 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception); 00065 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception); 00066 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName, bool checkIsConnected, XmlBlasterException *exception); 00067 static void interceptUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException, void/*SocketDataHolder*/ *socketDataHolder); 00068 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception); 00069 static ssize_t writenPlain(void *xa, const int fd, const char *ptr, const size_t nbytes); 00070 static ssize_t writenCompressed(void *xa, const int fd, const char *ptr, const size_t nbytes); 00071 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2); 00072 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2); 00073 00074 Dll_Export XmlBlasterAccessUnparsed *getXmlBlasterAccessUnparsed(int argc, const char* const* argv) { 00075 XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)calloc(1, sizeof(XmlBlasterAccessUnparsed)); 00076 if (xa == 0) return xa; 00077 xa->argc = argc; 00078 xa->argv = argv; 00079 xa->props = createProperties(xa->argc, xa->argv); 00080 if (xa->props == 0) { 00081 freeXmlBlasterAccessUnparsed(xa); 00082 return (XmlBlasterAccessUnparsed *)0; 00083 } 00084 xa->isInitialized = false; 00085 xa->isShutdown = false; 00086 xa->connectionP = 0; 00087 xa->callbackP = 0; 00088 xa->userObject = 0; /* A client can use this pointer to point to any client specific information */ 00089 xa->userFp = 0; 00090 xa->connect = xmlBlasterConnect; 00091 xa->initialize = initialize; 00092 xa->disconnect = xmlBlasterDisconnect; 00093 xa->publish = xmlBlasterPublish; 00094 xa->publishArr = xmlBlasterPublishArr; 00095 xa->publishOneway = xmlBlasterPublishOneway; 00096 xa->subscribe = xmlBlasterSubscribe; 00097 xa->unSubscribe = xmlBlasterUnSubscribe; 00098 xa->erase = xmlBlasterErase; 00099 xa->get = xmlBlasterGet; 00100 xa->ping = xmlBlasterPing; 00101 xa->isConnected = isConnected; 00102 xa->logLevel = parseLogLevel(xa->props->getString(xa->props, "logLevel", "WARN")); 00103 xa->log = xmlBlasterDefaultLogging; 00104 xa->logUserP = 0; 00105 xa->clientsUpdateFp = 0; 00106 xa->callbackMultiThreaded = xa->props->getBool(xa->props, "plugin/socket/multiThreaded", true); 00107 xa->callbackMultiThreaded = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/multiThreaded", xa->callbackMultiThreaded); 00108 /* xa->lowLevelAutoAck = xa->props->getBool(xa->props, "plugin/socket/lowLevelAutoAck", false); */ 00109 /* xa->lowLevelAutoAck = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/lowLevelAutoAck", xa->lowLevelAutoAck); */ 00110 /* Currently forced to false: needs mutex and reference counter to not freeMsgUnitArr twice */ 00111 xa->lowLevelAutoAck = false; 00112 00113 /* We shouldn't do much logging here, as the caller had no chance to redirect it up to now */ 00114 if (xa->callbackMultiThreaded == true) { 00115 if (xa->logLevel>=XMLBLASTER_LOG_DUMP) 00116 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__, "Multi threaded callback delivery is activated with -plugin/socket/multiThreaded true"); 00117 /*xa->callbackMultiThreaded = false;*/ 00118 } 00119 /* stdint.h: # define INT32_MAX (2147483647) */ 00120 xa->responseTimeout = xa->props->getLong(xa->props, "plugin/socket/responseTimeout", 2147483647L); /* Before xmlBlaster 1.1: One minute (given in millis) */ 00121 xa->responseTimeout = xa->props->getLong(xa->props, "dispatch/connection/plugin/socket/responseTimeout", xa->responseTimeout); 00122 /* ERROR HANDLING ? xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Your configuration '-plugin/socket/responseTimeout %s' is invalid", argv[iarg]); */ 00123 memset(&xa->callbackThreadId, 0, sizeof(pthread_t)); 00124 xa->threadCounter = 0; 00125 00126 if (xa->logLevel>=XMLBLASTER_LOG_DUMP) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__, 00127 "Created handle: -logLevel=%s -plugin/socket/responseTimeout=%ld", 00128 getLogLevelStr(xa->logLevel), xa->responseTimeout); 00129 00130 /* See: http://www.llnl.gov/computing/tutorials/workshops/workshop/pthreads/MAIN.html */ 00131 pthread_mutex_init(&xa->writenMutex, NULL); /* returns always 0 */ 00132 pthread_mutex_init(&xa->readnMutex, NULL); 00133 return xa; 00134 } 00135 00136 Dll_Export void freeXmlBlasterAccessUnparsed(XmlBlasterAccessUnparsed *xa) 00137 { 00138 int rc; 00139 00140 if (xa == 0) { 00141 char *stack = getStackTrace(10); 00142 printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to freeXmlBlasterAccessUnparsed() %s", 00143 __FILE__, __LINE__, stack); 00144 free(stack); 00145 return; 00146 } 00147 00148 if (xa->isShutdown) return; /* Avoid simultaneous multiple calls */ 00149 xa->isShutdown = true; /* Inhibit access to xa */ 00150 00151 if (xa->callbackP != 0) { 00152 xa->callbackP->shutdown(xa->callbackP); 00153 } 00154 if (xa->connectionP != 0) { 00155 xa->connectionP->shutdown(xa->connectionP); 00156 } 00157 00158 if (xa->callbackP != 0) { 00159 /* Detach or join? On Linux both work fine. On Windows it blocks sometimes forever during join */ 00160 const bool USE_DETACH_MODE = xa->props->getBool(xa->props, "plugin/socket/detachCbThread", true); 00161 int retVal; 00162 if (!xa->callbackP->isShutdown) { 00163 00164 { /* Wait for any pending update() dispatcher threads to die */ 00165 int i; 00166 int num = 200; 00167 int interval = 10; 00168 for (i=0; i<num; i++) { 00169 if (xa->callbackP->isShutdown) 00170 break; 00171 /*pthread_yield(0);*/ 00172 sleepMillis(interval); 00173 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00174 "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for callback thread to join. %d/%d", interval, i, num); 00175 } 00176 if (i == num) { 00177 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Proper shutdown of callback thread failed, it seems to block on the socket"); 00178 } 00179 } 00180 00181 if (!USE_DETACH_MODE) { 00182 /* pthread_cancel() does not block. Who cleans up open resources? TODO: pthread_cleanup_push() */ 00183 /* On Linux all works fine without pthread_cancel() but on Windows the later pthread_join() sometimes hangs without a pthread_cancel() */ 00184 /* 00185 retVal = pthread_cancel(xa->callbackThreadId); 00186 if (retVal != 0) { 00187 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cancel problem return value is %d", retVal); 00188 } 00189 */ 00190 } 00191 } 00192 00193 if (USE_DETACH_MODE) { 00194 retVal = pthread_detach(xa->callbackThreadId); /* Frees resources (even if thread has died already), don't call multiple times on same thread! */ 00195 if (retVal != 0) { 00196 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching callback thread 0x%x failed with error number %d", __LINE__, get_pthread_id(xa->callbackThreadId), retVal); 00197 } 00198 else { 00199 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00200 "pthread_detach(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId)); 00201 } 00202 } 00203 else { /* JOIN mode */ 00204 retVal = pthread_join(xa->callbackThreadId, 0); 00205 if (retVal != 0) { 00206 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_join problem return value is %d", retVal); 00207 } 00208 else { 00209 if (xa->logLevel>=XMLBLASTER_LOG_INFO) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00210 "pthread_join(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId)); 00211 } 00212 } 00213 00214 memset(&xa->callbackThreadId, 0, sizeof(pthread_t)); 00215 } 00216 00217 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterAccessUnparsed() conP=0x%x cbP=0x%x", xa->connectionP, xa->callbackP); 00218 00219 { /* Wait for any pending update() dispatcher threads to die */ 00220 int i; 00221 int num = 1000; 00222 int interval = 10; 00223 for (i=0; i<num; i++) { 00224 if ((int)xa->threadCounter < 1) 00225 break; 00226 sleepMillis(interval); 00227 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00228 "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for update thread to join. %d/%d", interval, i, num); 00229 } 00230 if (i >= num) { 00231 if (xa->logLevel>=XMLBLASTER_LOG_ERROR) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, 00232 "freeXmlBlasterAccessUnparsed(): There are active callback threads in user code which didn't return after sleeping for %ld millis, we continue now to shutdown ...", (long)interval*num); 00233 } 00234 } 00235 00236 if (xa->connectionP != 0) { 00237 freeXmlBlasterConnectionUnparsed(&xa->connectionP); 00238 } 00239 00240 if (xa->callbackP != 0) { 00241 freeCallbackServerUnparsed(&xa->callbackP); 00242 } 00243 00244 freeProperties(xa->props); 00245 00246 rc = pthread_mutex_destroy(&xa->writenMutex); /* On Linux this does nothing, but returns an error code EBUSY if the mutex was locked */ 00247 if (rc != 0) /* EBUSY */ 00248 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(writenMutex) returned %d, we ignore it", rc); 00249 00250 rc = pthread_mutex_destroy(&xa->readnMutex); 00251 if (rc != 0) /* EBUSY */ 00252 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(readnMutex) returned %d, we ignore it", rc); 00253 00254 free(xa); 00255 } 00256 00257 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp clientUpdateFp, XmlBlasterException *exception) 00258 { 00259 int threadRet = 0; 00260 const char *compressType = 0; 00261 00262 if (checkArgs(xa, "initialize", false, exception) == false) return false; 00263 00264 if (xa->isInitialized) { 00265 return true; 00266 } 00267 00268 if (clientUpdateFp == 0) { 00269 xa->clientsUpdateFp = 0; 00270 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, "", 00271 "Your callback UpdateFp pointer is NULL, we use our default callback handler"); 00272 } 00273 else { 00274 xa->clientsUpdateFp = clientUpdateFp; 00275 } 00276 00277 if (xa->connectionP) { 00278 freeXmlBlasterConnectionUnparsed(&xa->connectionP); 00279 } 00280 xa->connectionP = getXmlBlasterConnectionUnparsed(xa->argc, xa->argv); 00281 if (xa->connectionP == 0) { 00282 strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00283 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00284 "[%.100s:%d] Creating XmlBlasterConnectionUnparsed failed", __FILE__, __LINE__); 00285 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00286 return false; 00287 } 00288 xa->connectionP->log = xa->log; 00289 xa->connectionP->logUserP = xa->logUserP; 00290 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Created XmlBlasterConnectionUnparsed"); 00291 00292 00293 /* Switch on compression? */ 00294 compressType = xa->props->getString(xa->props, "plugin/socket/compress/type", ""); 00295 compressType = xa->props->getString(xa->props, "dispatch/connection/plugin/socket/compress/type", compressType); 00296 00297 if (!strcmp(compressType, "zlib:stream")) { 00298 xa->connectionP->writeToSocket.writeToSocketFuncP = writenCompressed; 00299 xa->connectionP->writeToSocket.userP = xa; 00300 xa->connectionP->readFromSocket.readFromSocketFuncP = readnCompressed; 00301 xa->connectionP->readFromSocket.userP = xa; 00302 } 00303 else { 00304 if (strcmp(compressType, "")) { 00305 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode", compressType); 00306 } 00307 xa->connectionP->writeToSocket.writeToSocketFuncP = writenPlain; 00308 xa->connectionP->writeToSocket.userP = xa; 00309 xa->connectionP->readFromSocket.readFromSocketFuncP = readnPlain; 00310 xa->connectionP->readFromSocket.userP = xa; 00311 } 00312 00313 if (xa->connectionP->initConnection(xa->connectionP, exception) == false) /* Establish low level IP connection */ 00314 return false; 00315 00316 /* the fourth arg 'xa' is returned as 'void *userData' in update() method */ 00317 if (xa->callbackP != 0) { 00318 freeCallbackServerUnparsed(&xa->callbackP); 00319 } 00320 xa->callbackP = getCallbackServerUnparsed(xa->argc, xa->argv, interceptUpdate, xa); 00321 if (xa->callbackP == 0) { 00322 strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00323 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00324 "[%.100s:%d] Creating CallbackServerUnparsed failed", __FILE__, __LINE__); 00325 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00326 freeXmlBlasterConnectionUnparsed(&xa->connectionP); 00327 return false; 00328 } 00329 xa->callbackP->log = xa->log; 00330 xa->callbackP->logUserP = xa->logUserP; 00331 00332 if (!strcmp(compressType, "zlib:stream")) { 00333 xa->callbackP->writeToSocket.writeToSocketFuncP = writenCompressed; 00334 xa->callbackP->writeToSocket.userP = xa; 00335 xa->callbackP->readFromSocket.readFromSocketFuncP = readnCompressed; 00336 xa->callbackP->readFromSocket.userP = xa; 00337 } 00338 else { 00339 xa->callbackP->writeToSocket.writeToSocketFuncP = writenPlain; 00340 xa->callbackP->writeToSocket.userP = xa; 00341 xa->callbackP->readFromSocket.readFromSocketFuncP = readnPlain; 00342 xa->callbackP->readFromSocket.userP = xa; 00343 } 00344 00345 xa->callbackP->useThisSocket(xa->callbackP, xa->connectionP->socketToXmlBlaster, xa->connectionP->socketToXmlBlasterUdp); 00346 00347 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__, 00348 "Created CallbackServerUnparsed instance, creating on a separate thread a listener on socket://%s:%d...", 00349 (xa->callbackP->hostCB == 0) ? "" : xa->callbackP->hostCB, xa->callbackP->portCB); 00350 00351 /* Register our callback funtion which is called just before sending a message */ 00352 xa->connectionP->preSendEvent = preSendEvent; 00353 xa->connectionP->preSendEvent_userP = xa; 00354 00355 /* Register our callback funtion which is called just after sending a message */ 00356 xa->connectionP->postSendEvent = postSendEvent; 00357 xa->connectionP->postSendEvent_userP = xa; 00358 00359 /* thread blocks on socket listener */ 00360 threadRet = pthread_create(&xa->callbackThreadId, (const pthread_attr_t *)0, (void * (*)(void *))xa->callbackP->runCallbackServer, (void *)xa->callbackP); 00361 if (threadRet != 0) { 00362 strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00363 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00364 "[%.100s:%d] Creating thread failed with error number %d", 00365 __FILE__, __LINE__, threadRet); 00366 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00367 freeCallbackServerUnparsed(&xa->callbackP); 00368 freeXmlBlasterConnectionUnparsed(&xa->connectionP); 00369 return false; 00370 } 00371 00372 xa->isInitialized = true; 00373 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00374 "initialize() successful"); 00375 return xa->isInitialized; 00376 } 00377 00378 static bool isConnected(XmlBlasterAccessUnparsed *xa) 00379 { 00380 if (xa == 0 || xa->isShutdown || xa->connectionP == 0) { 00381 return false; 00382 } 00383 return xa->connectionP->isConnected(xa->connectionP); 00384 } 00385 00396 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception) 00397 { 00398 bool retVal; 00399 XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa; 00400 00401 /* if (!strcmp(XMLBLASTER_PUBLISH_ONEWAY, msgRequestInfoP->methodName)) */ 00402 if (xbl_isOneway(MSG_TYPE_INVOKE, msgRequestInfoP->methodName)) 00403 return msgRequestInfoP; 00404 00405 /* ======== Initialize threading ====== */ 00406 msgRequestInfoP->responseMutexIsLocked = false; /* Only to remember if the client thread holds the lock */ 00407 00408 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00409 "preSendEvent(%s) occurred", msgRequestInfoP->methodName); 00410 retVal = xa->callbackP->addResponseListener(xa->callbackP, msgRequestInfoP, responseEvent); 00411 if (retVal == false) { 00412 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00413 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00414 "[%.100s:%d] Couldn't register as response listener", __FILE__, __LINE__); 00415 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00416 return (MsgRequestInfo *)0; 00417 } 00418 00419 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00420 "preSendEvent(requestId=%s, msgRequestInfoP->responseBlob.dataLen=%d), entering lock", 00421 msgRequestInfoP->requestIdStr, msgRequestInfoP->responseBlob.dataLen); 00422 pthread_mutex_init(&msgRequestInfoP->responseMutex, NULL); /* returns always 0 */ 00423 if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) { 00424 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN); 00425 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, 00426 "[%.100s:%d] Error trying to lock responseMutex %d", __FILE__, __LINE__, retVal); 00427 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message); 00428 return (MsgRequestInfo *)0; 00429 } 00430 msgRequestInfoP->responseMutexIsLocked = true; /* Only if the client thread holds the lock */ 00431 00432 return msgRequestInfoP; 00433 } 00434 00443 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder) { 00444 int retVal; 00445 SocketDataHolder *s = (SocketDataHolder *)socketDataHolder; 00446 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa; 00447 00448 if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) { 00449 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to lock responseMutex in responseEvent() failed %d", retVal); 00450 /* return; */ 00451 } 00452 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is LOCKED"); 00453 00454 blobcpyAlloc(&msgRequestInfoP->responseBlob, s->blob.data, s->blob.dataLen); 00455 msgRequestInfoP->responseType = s->type; 00456 00457 if ((retVal = pthread_cond_signal(&msgRequestInfoP->responseCond)) != 0) { 00458 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d", retVal); 00459 /* return; */ 00460 } 00461 00462 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, 00463 "responseEvent(requestId '%s', msgType=%c, dataLen=%d) occurred, wake up signal sent", 00464 s->requestId, msgRequestInfoP->responseType, msgRequestInfoP->responseBlob.dataLen); 00465 00466 if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) { 00467 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to unlock responseMutex in responseEvent() failed %d", retVal); 00468 /* return; */ 00469 } 00470 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is UNLOCKED"); 00471 } 00472 00480 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception) 00481 { 00482 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa; 00483 struct timespec abstime; 00484 bool useTimeout = false; 00485 int retVal; 00486 00487 if (msgRequestInfoP->rollback) { 00488 mutexUnlock(msgRequestInfoP, exception); 00489 return (MsgRequestInfo *)0; 00490 } 00491 00492 if (xa->responseTimeout > 0 && getAbsoluteTime(xa->responseTimeout, &abstime) == true) { 00493 useTimeout = true; 00494 } 00495 00496 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) responseMutex is LOCKED, entering wait ...", msgRequestInfoP->requestIdStr); 00497 00498 if ((retVal = pthread_cond_init(&msgRequestInfoP->responseCond, NULL)) != 0) { 00499 xa->