1 /*----------------------------------------------------------------------------
2 Name: XmlBlasterAccessUnparsed.c
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Wraps raw socket connection to xmlBlaster
6 Implements sync connection and async callback
7 Needs pthread to compile (multi threading).
8 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info>
9 Compile:
10 LINUX: gcc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -export-dynamic -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
11 g++ -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
12 icc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
13 WIN: cl /MT /W4 -DXmlBlasterAccessUnparsedMain -D_WINDOWS -I.. -I../pthreads /FeXmlBlasterAccessUnparsedMain.exe XmlBlasterAccessUnparsed.c ..\util\msgUtil.c ..\util\Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c ws2_32.lib pthreadVC2.lib
14 (download pthread for Windows and WinCE from http://sources.redhat.com/pthreads-win32)
15 Solaris: cc -DXmlBlasterAccessUnparsedMain -v -Xc -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
16 CC -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
17
18 Linux with libxmlBlasterC.so:
19 gcc -DXmlBlasterAccessUnparsedMain -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c -L../../../lib -lxmlBlasterClientC -I.. -Wl,-rpath=../../../lib -D_REENTRANT -lpthread
20 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
21 -----------------------------------------------------------------------------*/
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #if defined(WINCE)
26 # if defined(XB_USE_PTHREADS)
27 # include <pthreads/pthread.h>
28 # else
29 /*#include <pthreads/need_errno.h> */
30 static int errno=0; /* single threaded workaround*/
31 # endif
32 #else
33 # include <errno.h>
34 # include <sys/types.h>
35 #endif
36 #include <socket/xmlBlasterSocket.h>
37 #include <socket/xmlBlasterZlib.h>
38 #include <XmlBlasterAccessUnparsed.h>
39
40 /**
41 * Little helper to collect args for the new created thread
42 */
43 typedef struct Dll_Export UpdateContainer {
44 XmlBlasterAccessUnparsed *xa;
45 MsgUnitArr *msgUnitArrP;
46 void *userData;
47 XmlBlasterException exception; /* Holding a clone from the original as the callback thread may use it for another message */
48 SocketDataHolder socketDataHolder; /* Holding a clone from the original */
49 } UpdateContainer;
50
51 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp update, XmlBlasterException *exception);
52 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos, UpdateFp update, XmlBlasterException *exception);
53 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
54 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception);
55 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
56 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
57 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
58 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
59 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
60 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
61 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
62 static bool isConnected(XmlBlasterAccessUnparsed *xa);
63 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder);
64 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
65 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
66 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName, bool checkIsConnected, XmlBlasterException *exception);
67 static void interceptUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException, void/*SocketDataHolder*/ *socketDataHolder);
68 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception);
69 static ssize_t writenPlain(void *xa, const int fd, const char *ptr, const size_t nbytes);
70 static ssize_t writenCompressed(void *xa, const int fd, const char *ptr, const size_t nbytes);
71 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
72 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
73
74 Dll_Export XmlBlasterAccessUnparsed *getXmlBlasterAccessUnparsed(int argc, const char* const* argv) {
75 XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)calloc(1, sizeof(XmlBlasterAccessUnparsed));
76 if (xa == 0) return xa;
77 xa->argc = argc;
78 xa->argv = argv;
79 xa->props = createProperties(xa->argc, xa->argv);
80 if (xa->props == 0) {
81 freeXmlBlasterAccessUnparsed(xa);
82 return (XmlBlasterAccessUnparsed *)0;
83 }
84 xa->isInitialized = false;
85 xa->isShutdown = false;
86 xa->connectionP = 0;
87 xa->callbackP = 0;
88 xa->userObject = 0; /* A client can use this pointer to point to any client specific information */
89 xa->userFp = 0;
90 xa->connect = xmlBlasterConnect;
91 xa->initialize = initialize;
92 xa->disconnect = xmlBlasterDisconnect;
93 xa->publish = xmlBlasterPublish;
94 xa->publishArr = xmlBlasterPublishArr;
95 xa->publishOneway = xmlBlasterPublishOneway;
96 xa->subscribe = xmlBlasterSubscribe;
97 xa->unSubscribe = xmlBlasterUnSubscribe;
98 xa->erase = xmlBlasterErase;
99 xa->get = xmlBlasterGet;
100 xa->ping = xmlBlasterPing;
101 xa->isConnected = isConnected;
102 xa->logLevel = parseLogLevel(xa->props->getString(xa->props, "logLevel", "WARN"));
103 xa->log = xmlBlasterDefaultLogging;
104 xa->logUserP = 0;
105 xa->clientsUpdateFp = 0;
106 xa->callbackMultiThreaded = xa->props->getBool(xa->props, "plugin/socket/multiThreaded", true);
107 xa->callbackMultiThreaded = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/multiThreaded", xa->callbackMultiThreaded);
108 /* xa->lowLevelAutoAck = xa->props->getBool(xa->props, "plugin/socket/lowLevelAutoAck", false); */
109 /* xa->lowLevelAutoAck = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/lowLevelAutoAck", xa->lowLevelAutoAck); */
110 /* Currently forced to false: needs mutex and reference counter to not freeMsgUnitArr twice */
111 xa->lowLevelAutoAck = false;
112
113 /* We shouldn't do much logging here, as the caller had no chance to redirect it up to now */
114 if (xa->callbackMultiThreaded == true) {
115 if (xa->logLevel>=XMLBLASTER_LOG_DUMP)
116 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__, "Multi threaded callback delivery is activated with -plugin/socket/multiThreaded true");
117 /*xa->callbackMultiThreaded = false;*/
118 }
119 /* stdint.h: # define INT32_MAX (2147483647) */
120 xa->responseTimeout = xa->props->getLong(xa->props, "plugin/socket/responseTimeout", 2147483647L); /* Before xmlBlaster 1.1: One minute (given in millis) */
121 xa->responseTimeout = xa->props->getLong(xa->props, "dispatch/connection/plugin/socket/responseTimeout", xa->responseTimeout);
122 /* ERROR HANDLING ? xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Your configuration '-plugin/socket/responseTimeout %s' is invalid", argv[iarg]); */
123 memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
124 xa->threadCounter = 0;
125
126 if (xa->logLevel>=XMLBLASTER_LOG_DUMP) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__,
127 "Created handle: -logLevel=%s -plugin/socket/responseTimeout=%ld",
128 getLogLevelStr(xa->logLevel), xa->responseTimeout);
129
130 /* See: http://www.llnl.gov/computing/tutorials/workshops/workshop/pthreads/MAIN.html */
131 pthread_mutex_init(&xa->writenMutex, NULL); /* returns always 0 */
132 pthread_mutex_init(&xa->readnMutex, NULL);
133 return xa;
134 }
135
136 Dll_Export void freeXmlBlasterAccessUnparsed(XmlBlasterAccessUnparsed *xa)
137 {
138 int rc;
139
140 if (xa == 0) {
141 char *stack = getStackTrace(10);
142 printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to freeXmlBlasterAccessUnparsed() %s",
143 __FILE__, __LINE__, stack);
144 free(stack);
145 return;
146 }
147
148 if (xa->isShutdown) return; /* Avoid simultaneous multiple calls */
149 xa->isShutdown = true; /* Inhibit access to xa */
150
151 if (xa->callbackP != 0) {
152 xa->callbackP->shutdown(xa->callbackP);
153 }
154 if (xa->connectionP != 0) {
155 xa->connectionP->shutdown(xa->connectionP);
156 }
157
158 if (xa->callbackP != 0) {
159 /* Detach or join? On Linux both work fine. On Windows it blocks sometimes forever during join */
160 const bool USE_DETACH_MODE = xa->props->getBool(xa->props, "plugin/socket/detachCbThread", true);
161 int retVal;
162 if (!xa->callbackP->isShutdown) {
163
164 { /* Wait for any pending update() dispatcher threads to die */
165 int i;
166 int num = 200;
167 int interval = 10;
168 for (i=0; i<num; i++) {
169 if (xa->callbackP->isShutdown)
170 break;
171 /*pthread_yield(0);*/
172 sleepMillis(interval);
173 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
174 "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for callback thread to join. %d/%d", interval, i, num);
175 }
176 if (i == num) {
177 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Proper shutdown of callback thread failed, it seems to block on the socket");
178 }
179 }
180
181 if (!USE_DETACH_MODE) {
182 /* pthread_cancel() does not block. Who cleans up open resources? TODO: pthread_cleanup_push() */
183 /* On Linux all works fine without pthread_cancel() but on Windows the later pthread_join() sometimes hangs without a pthread_cancel() */
184 /*
185 retVal = pthread_cancel(xa->callbackThreadId);
186 if (retVal != 0) {
187 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cancel problem return value is %d", retVal);
188 }
189 */
190 }
191 }
192
193 if (USE_DETACH_MODE) {
194 retVal = pthread_detach(xa->callbackThreadId); /* Frees resources (even if thread has died already), don't call multiple times on same thread! */
195 if (retVal != 0) {
196 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching callback thread 0x%x failed with error number %d", __LINE__, get_pthread_id(xa->callbackThreadId), retVal);
197 }
198 else {
199 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
200 "pthread_detach(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
201 }
202 }
203 else { /* JOIN mode */
204 retVal = pthread_join(xa->callbackThreadId, 0);
205 if (retVal != 0) {
206 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_join problem return value is %d", retVal);
207 }
208 else {
209 if (xa->logLevel>=XMLBLASTER_LOG_INFO) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
210 "pthread_join(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
211 }
212 }
213
214 memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
215 }
216
217 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterAccessUnparsed() conP=0x%x cbP=0x%x", xa->connectionP, xa->callbackP);
218
219 { /* Wait for any pending update() dispatcher threads to die */
220 int i;
221 int num = 1000;
222 int interval = 10;
223 for (i=0; i<num; i++) {
224 if ((int)xa->threadCounter < 1)
225 break;
226 sleepMillis(interval);
227 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
228 "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for update thread to join. %d/%d", interval, i, num);
229 }
230 if (i >= num) {
231 if (xa->logLevel>=XMLBLASTER_LOG_ERROR) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
232 "freeXmlBlasterAccessUnparsed(): There are active callback threads in user code which didn't return after sleeping for %ld millis, we continue now to shutdown ...", (long)interval*num);
233 }
234 }
235
236 if (xa->connectionP != 0) {
237 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
238 }
239
240 if (xa->callbackP != 0) {
241 freeCallbackServerUnparsed(&xa->callbackP);
242 }
243
244 freeProperties(xa->props);
245
246 rc = pthread_mutex_destroy(&xa->writenMutex); /* On Linux this does nothing, but returns an error code EBUSY if the mutex was locked */
247 if (rc != 0) /* EBUSY */
248 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(writenMutex) returned %d, we ignore it", rc);
249
250 rc = pthread_mutex_destroy(&xa->readnMutex);
251 if (rc != 0) /* EBUSY */
252 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(readnMutex) returned %d, we ignore it", rc);
253
254 free(xa);
255 }
256
257 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp clientUpdateFp, XmlBlasterException *exception)
258 {
259 int threadRet = 0;
260 const char *compressType = 0;
261
262 if (checkArgs(xa, "initialize", false, exception) == false) return false;
263
264 if (xa->isInitialized) {
265 return true;
266 }
267
268 if (clientUpdateFp == 0) {
269 xa->clientsUpdateFp = 0;
270 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, "",
271 "Your callback UpdateFp pointer is NULL, we use our default callback handler");
272 }
273 else {
274 xa->clientsUpdateFp = clientUpdateFp;
275 }
276
277 if (xa->connectionP) {
278 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
279 }
280 xa->connectionP = getXmlBlasterConnectionUnparsed(xa->argc, xa->argv);
281 if (xa->connectionP == 0) {
282 strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
283 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
284 "[%.100s:%d] Creating XmlBlasterConnectionUnparsed failed", __FILE__, __LINE__);
285 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
286 return false;
287 }
288 xa->connectionP->log = xa->log;
289 xa->connectionP->logUserP = xa->logUserP;
290 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Created XmlBlasterConnectionUnparsed");
291
292
293 /* Switch on compression? */
294 compressType = xa->props->getString(xa->props, "plugin/socket/compress/type", "");
295 compressType = xa->props->getString(xa->props, "dispatch/connection/plugin/socket/compress/type", compressType);
296
297 if (!strcmp(compressType, "zlib:stream")) {
298 xa->connectionP->writeToSocket.writeToSocketFuncP = writenCompressed;
299 xa->connectionP->writeToSocket.userP = xa;
300 xa->connectionP->readFromSocket.readFromSocketFuncP = readnCompressed;
301 xa->connectionP->readFromSocket.userP = xa;
302 }
303 else {
304 if (strcmp(compressType, "")) {
305 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode", compressType);
306 }
307 xa->connectionP->writeToSocket.writeToSocketFuncP = writenPlain;
308 xa->connectionP->writeToSocket.userP = xa;
309 xa->connectionP->readFromSocket.readFromSocketFuncP = readnPlain;
310 xa->connectionP->readFromSocket.userP = xa;
311 }
312
313 if (xa->connectionP->initConnection(xa->connectionP, exception) == false) /* Establish low level IP connection */
314 return false;
315
316 /* the fourth arg 'xa' is returned as 'void *userData' in update() method */
317 if (xa->callbackP != 0) {
318 freeCallbackServerUnparsed(&xa->callbackP);
319 }
320 xa->callbackP = getCallbackServerUnparsed(xa->argc, xa->argv, interceptUpdate, xa);
321 if (xa->callbackP == 0) {
322 strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
323 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
324 "[%.100s:%d] Creating CallbackServerUnparsed failed", __FILE__, __LINE__);
325 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
326 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
327 return false;
328 }
329 xa->callbackP->log = xa->log;
330 xa->callbackP->logUserP = xa->logUserP;
331
332 if (!strcmp(compressType, "zlib:stream")) {
333 xa->callbackP->writeToSocket.writeToSocketFuncP = writenCompressed;
334 xa->callbackP->writeToSocket.userP = xa;
335 xa->callbackP->readFromSocket.readFromSocketFuncP = readnCompressed;
336 xa->callbackP->readFromSocket.userP = xa;
337 }
338 else {
339 xa->callbackP->writeToSocket.writeToSocketFuncP = writenPlain;
340 xa->callbackP->writeToSocket.userP = xa;
341 xa->callbackP->readFromSocket.readFromSocketFuncP = readnPlain;
342 xa->callbackP->readFromSocket.userP = xa;
343 }
344
345 xa->callbackP->useThisSocket(xa->callbackP, xa->connectionP->socketToXmlBlaster, xa->connectionP->socketToXmlBlasterUdp);
346
347 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
348 "Created CallbackServerUnparsed instance, creating on a separate thread a listener on socket://%s:%d...",
349 (xa->callbackP->hostCB == 0) ? "" : xa->callbackP->hostCB, xa->callbackP->portCB);
350
351 /* Register our callback funtion which is called just before sending a message */
352 xa->connectionP->preSendEvent = preSendEvent;
353 xa->connectionP->preSendEvent_userP = xa;
354
355 /* Register our callback funtion which is called just after sending a message */
356 xa->connectionP->postSendEvent = postSendEvent;
357 xa->connectionP->postSendEvent_userP = xa;
358
359 /* thread blocks on socket listener */
360 threadRet = pthread_create(&xa->callbackThreadId, (const pthread_attr_t *)0, (void * (*)(void *))xa->callbackP->runCallbackServer, (void *)xa->callbackP);
361 if (threadRet != 0) {
362 strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
363 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
364 "[%.100s:%d] Creating thread failed with error number %d",
365 __FILE__, __LINE__, threadRet);
366 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
367 freeCallbackServerUnparsed(&xa->callbackP);
368 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
369 return false;
370 }
371
372 xa->isInitialized = true;
373 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
374 "initialize() successful");
375 return xa->isInitialized;
376 }
377
378 static bool isConnected(XmlBlasterAccessUnparsed *xa)
379 {
380 if (xa == 0 || xa->isShutdown || xa->connectionP == 0) {
381 return false;
382 }
383 return xa->connectionP->isConnected(xa->connectionP);
384 }
385
386 /**
387 * Callback from #XmlBlasterConnectionUnparsed just before a message is sent,
388 * the msgRequestInfo contains the requestId used.
389 * This is the clients calling thread.
390 * @param msgRequestInfoP Contains some informations about the request, may not be NULL
391 * @param exception May not be NULL
392 * @return The same (or a manipulated/encrypted) msgRequestInfo, if NULL the exception is filled.
393 * If msgRequestInfoP->blob.data was changed and malloc()'d by you, the caller will free() it.
394 * If you return NULL you need to call removeResponseListener() to avoid a memory leak.
395 */
396 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
397 {
398 bool retVal;
399 XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
400
401 /* if (!strcmp(XMLBLASTER_PUBLISH_ONEWAY, msgRequestInfoP->methodName)) */
402 if (xbl_isOneway(MSG_TYPE_INVOKE, msgRequestInfoP->methodName))
403 return msgRequestInfoP;
404
405 /* ======== Initialize threading ====== */
406 msgRequestInfoP->responseMutexIsLocked = false; /* Only to remember if the client thread holds the lock */
407
408 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
409 "preSendEvent(%s) occurred", msgRequestInfoP->methodName);
410 retVal = xa->callbackP->addResponseListener(xa->callbackP, msgRequestInfoP, responseEvent);
411 if (retVal == false) {
412 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
413 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
414 "[%.100s:%d] Couldn't register as response listener", __FILE__, __LINE__);
415 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
416 return (MsgRequestInfo *)0;
417 }
418
419 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
420 "preSendEvent(requestId=%s, msgRequestInfoP->responseBlob.dataLen=%d), entering lock",
421 msgRequestInfoP->requestIdStr, msgRequestInfoP->responseBlob.dataLen);
422 pthread_mutex_init(&msgRequestInfoP->responseMutex, NULL); /* returns always 0 */
423 if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
424 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
425 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
426 "[%.100s:%d] Error trying to lock responseMutex %d", __FILE__, __LINE__, retVal);
427 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
428 return (MsgRequestInfo *)0;
429 }
430 msgRequestInfoP->responseMutexIsLocked = true; /* Only if the client thread holds the lock */
431
432 return msgRequestInfoP;
433 }
434
435 /**
436 * This function is called by the callback server when a response message arrived (after we send a request).
437 * The xa->responseBlob->data is malloc()'d with the response string, you need to free it.
438 * This method is executed by the callback server thread.
439 * @param msgRequestInfoP May not be NULL
440 * @param socketDataHolder