1 /*----------------------------------------------------------------------------
2 Name: XmlBlasterAccessUnparsed.c
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Wraps raw socket connection to xmlBlaster
6 Implements sync connection and async callback
7 Needs pthread to compile (multi threading).
8 Author: "Marcel Ruff" <xmlBlaster@marcelruff.info>
9 Compile:
10 LINUX: gcc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -export-dynamic -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
11 g++ -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
12 icc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
13 WIN: cl /MT /W4 -DXmlBlasterAccessUnparsedMain -D_WINDOWS -I.. -I../pthreads /FeXmlBlasterAccessUnparsedMain.exe XmlBlasterAccessUnparsed.c ..\util\msgUtil.c ..\util\Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c ws2_32.lib pthreadVC2.lib
14 (download pthread for Windows and WinCE from http://sources.redhat.com/pthreads-win32)
15 Solaris: cc -DXmlBlasterAccessUnparsedMain -v -Xc -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
16 CC -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
17
18 Linux with libxmlBlasterC.so:
19 gcc -DXmlBlasterAccessUnparsedMain -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c -L../../../lib -lxmlBlasterClientC -I.. -Wl,-rpath=../../../lib -D_REENTRANT -lpthread
20 See: http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
21 -----------------------------------------------------------------------------*/
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #if defined(WINCE)
26 # if defined(XB_USE_PTHREADS)
27 # include <pthreads/pthread.h>
28 # else
29 /*#include <pthreads/need_errno.h> */
30 static int errno=0; /* single threaded workaround*/
31 # endif
32 #else
33 # include <errno.h>
34 # include <sys/types.h>
35 #endif
36 #include <socket/xmlBlasterSocket.h>
37 #include <socket/xmlBlasterZlib.h>
38 #include <XmlBlasterAccessUnparsed.h>
39 #include <util/Timestampc.h>
40
41 /**
42 * Little helper to collect args for the new created thread
43 */
44 typedef struct Dll_Export UpdateContainer {
45 XmlBlasterAccessUnparsed *xa;
46 MsgUnitArr *msgUnitArrP;
47 void *userData;
48 XmlBlasterException exception; /* Holding a clone from the original as the callback thread may use it for another message */
49 SocketDataHolder socketDataHolder; /* Holding a clone from the original */
50 } UpdateContainer;
51
52 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp update, XmlBlasterException *exception);
53 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos, UpdateFp update, XmlBlasterException *exception);
54 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
55 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception);
56 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
57 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
58 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
59 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
60 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
61 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
62 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
63 static bool isConnected(XmlBlasterAccessUnparsed *xa);
64 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder);
65 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
66 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
67 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName, bool checkIsConnected, XmlBlasterException *exception);
68 static void interceptUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException, void/*SocketDataHolder*/ *socketDataHolder);
69 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception);
70 static ssize_t writenPlain(void *xa, const int fd, const char *ptr, const size_t nbytes);
71 static ssize_t writenCompressed(void *xa, const int fd, const char *ptr, const size_t nbytes);
72 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
73 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
74
75 Dll_Export XmlBlasterAccessUnparsed *getXmlBlasterAccessUnparsed(int argc, const char* const* argv) {
76 XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)calloc(1, sizeof(XmlBlasterAccessUnparsed));
77 if (xa == 0) return xa;
78 xa->argc = argc;
79 xa->argv = argv;
80 xa->props = createProperties(xa->argc, xa->argv);
81 if (xa->props == 0) {
82 freeXmlBlasterAccessUnparsed(xa);
83 return (XmlBlasterAccessUnparsed *)0;
84 }
85 xa->isInitialized = false;
86 xa->isShutdown = false;
87 xa->connectionP = 0;
88 xa->callbackP = 0;
89 xa->userObject = 0; /* A client can use this pointer to point to any client specific information */
90 xa->userFp = 0;
91 xa->connect = xmlBlasterConnect;
92 xa->initialize = initialize;
93 xa->disconnect = xmlBlasterDisconnect;
94 xa->publish = xmlBlasterPublish;
95 xa->publishArr = xmlBlasterPublishArr;
96 xa->publishOneway = xmlBlasterPublishOneway;
97 xa->subscribe = xmlBlasterSubscribe;
98 xa->unSubscribe = xmlBlasterUnSubscribe;
99 xa->erase = xmlBlasterErase;
100 xa->get = xmlBlasterGet;
101 xa->ping = xmlBlasterPing;
102 xa->isConnected = isConnected;
103 xa->logLevel = parseLogLevel(xa->props->getString(xa->props, "logLevel", "WARN"));
104 xa->log = xmlBlasterDefaultLogging;
105 xa->logUserP = 0;
106 xa->clientsUpdateFp = 0;
107 xa->callbackMultiThreaded = xa->props->getBool(xa->props, "plugin/socket/multiThreaded", true);
108 xa->callbackMultiThreaded = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/multiThreaded", xa->callbackMultiThreaded);
109 /* xa->lowLevelAutoAck = xa->props->getBool(xa->props, "plugin/socket/lowLevelAutoAck", false); */
110 /* xa->lowLevelAutoAck = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/lowLevelAutoAck", xa->lowLevelAutoAck); */
111 /* Currently forced to false: needs mutex and reference counter to not freeMsgUnitArr twice */
112 xa->lowLevelAutoAck = false;
113
114 /* We shouldn't do much logging here, as the caller had no chance to redirect it up to now */
115 if (xa->callbackMultiThreaded == true) {
116 if (xa->logLevel>=XMLBLASTER_LOG_DUMP)
117 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__, "Multi threaded callback delivery is activated with -plugin/socket/multiThreaded true");
118 /*xa->callbackMultiThreaded = false;*/
119 }
120 /* stdint.h: # define INT32_MAX (2147483647) */
121 xa->responseTimeout = xa->props->getLong(xa->props, "plugin/socket/responseTimeout", 2147483647L); /* Before xmlBlaster 1.1: One minute (given in millis) */
122 xa->responseTimeout = xa->props->getLong(xa->props, "dispatch/connection/plugin/socket/responseTimeout", xa->responseTimeout);
123 /* ERROR HANDLING ? xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Your configuration '-plugin/socket/responseTimeout %s' is invalid", argv[iarg]); */
124 memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
125 xa->threadCounter = 0;
126
127 if (xa->logLevel>=XMLBLASTER_LOG_DUMP) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__,
128 "Created handle: -logLevel=%s -plugin/socket/responseTimeout=%ld",
129 getLogLevelStr(xa->logLevel), xa->responseTimeout);
130
131 /* See: http://www.llnl.gov/computing/tutorials/workshops/workshop/pthreads/MAIN.html */
132 pthread_mutex_init(&xa->writenMutex, NULL); /* returns always 0 */
133 pthread_mutex_init(&xa->readnMutex, NULL);
134 return xa;
135 }
136
137 Dll_Export void freeXmlBlasterAccessUnparsed(XmlBlasterAccessUnparsed *xa)
138 {
139 int rc;
140
141 if (xa == 0) {
142 char *stack = getStackTrace(10);
143 printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to freeXmlBlasterAccessUnparsed() %s",
144 __FILE__, __LINE__, stack);
145 free(stack);
146 return;
147 }
148
149 if (xa->isShutdown) return; /* Avoid simultaneous multiple calls */
150 xa->isShutdown = true; /* Inhibit access to xa */
151
152 if (xa->callbackP != 0) {
153 xa->callbackP->shutdown(xa->callbackP);
154 }
155 if (xa->connectionP != 0) {
156 xa->connectionP->shutdown(xa->connectionP);
157 }
158
159 if (xa->callbackP != 0) {
160 /* Detach or join? On Linux both work fine. On Windows it blocks sometimes forever during join */
161 const bool USE_DETACH_MODE = xa->props->getBool(xa->props, "plugin/socket/detachCbThread", true);
162 int retVal;
163 if (xa->callbackP->threadIsAlive && !USE_DETACH_MODE) {
164 /* pthread_cancel() does not block. Who cleans up open resources? TODO: pthread_cleanup_push() */
165 /* On Linux all works fine without pthread_cancel() but on Windows the later pthread_join() sometimes hangs without a pthread_cancel() */
166 /*
167 retVal = pthread_cancel(xa->callbackThreadId);
168 if (retVal != 0) {
169 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cancel problem return value is %d", retVal);
170 }
171 */
172 }
173
174 if (USE_DETACH_MODE) {
175 /* Check if above xa->callbackP->shutdown(xa->callbackP) thread has finished: */
176 /*bool hasTerminated = */xa->callbackP->waitOnCallbackThreadTermination(xa->callbackP, 2000);
177
178 retVal = pthread_detach(xa->callbackThreadId); /* Frees resources (even if thread has died already), don't call multiple times on same thread! */
179 if (retVal != 0) {
180 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching callback thread 0x%x failed with error number %d", __LINE__, get_pthread_id(xa->callbackThreadId), retVal);
181 }
182 else {
183 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
184 "pthread_detach(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
185 }
186 }
187 else { /* JOIN mode */
188 retVal = pthread_join(xa->callbackThreadId, 0);
189 if (retVal != 0) {
190 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_join problem return value is %d", retVal);
191 }
192 else {
193 if (xa->logLevel>=XMLBLASTER_LOG_INFO) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
194 "pthread_join(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
195 }
196 }
197
198 memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
199 }
200
201 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterAccessUnparsed() conP=0x%x cbP=0x%x", xa->connectionP, xa->callbackP);
202
203 { /* Wait for any pending update() dispatcher threads to die */
204 int i;
205 int num = 1000;
206 int interval = 10;
207 for (i=0; i<num; i++) {
208 if ((int)xa->threadCounter < 1)
209 break;
210 sleepMillis(interval);
211 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
212 "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for update thread to join. %d/%d", interval, i, num);
213 }
214 if (i >= num) {
215 if (xa->logLevel>=XMLBLASTER_LOG_ERROR) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
216 "freeXmlBlasterAccessUnparsed(): There are active callback threads in user code which didn't return after sleeping for %ld millis, we continue now to shutdown ...", (long)interval*num);
217 }
218 }
219
220 if (xa->connectionP != 0) {
221 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
222 }
223
224 if (xa->callbackP != 0) {
225 freeCallbackServerUnparsed(&xa->callbackP);
226 }
227
228 freeProperties(xa->props);
229
230 rc = pthread_mutex_destroy(&xa->writenMutex); /* On Linux this does nothing, but returns an error code EBUSY if the mutex was locked */
231 if (rc != 0) /* EBUSY=16 "Device or resource busy": char *strerror_r(int errnum, char *buf, size_t buflen); */
232 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(writenMutex) returned %d, we ignore it", rc);
233
234 rc = pthread_mutex_destroy(&xa->readnMutex);
235 if (rc != 0) /* EBUSY */
236 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(readnMutex) returned %d, we ignore it", rc);
237
238 free(xa);
239 }
240
241 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp clientUpdateFp, XmlBlasterException *exception)
242 {
243 int threadRet = 0;
244 const char *compressType = 0;
245
246 if (checkArgs(xa, "initialize", false, exception) == false) return false;
247
248 if (xa->isInitialized) {
249 return true;
250 }
251
252 if (clientUpdateFp == 0) {
253 xa->clientsUpdateFp = 0;
254 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, "",
255 "Your callback UpdateFp pointer is NULL, we use our default callback handler");
256 }
257 else {
258 xa->clientsUpdateFp = clientUpdateFp;
259 }
260
261 if (xa->connectionP) {
262 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
263 }
264 xa->connectionP = getXmlBlasterConnectionUnparsed(xa->argc, xa->argv);
265 if (xa->connectionP == 0) {
266 strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
267 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
268 "[%.100s:%d] Creating XmlBlasterConnectionUnparsed failed", __FILE__, __LINE__);
269 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
270 return false;
271 }
272 xa->connectionP->log = xa->log;
273 xa->connectionP->logUserP = xa->logUserP;
274 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Created XmlBlasterConnectionUnparsed");
275
276
277 /* Switch on compression? */
278 compressType = xa->props->getString(xa->props, "plugin/socket/compress/type", "");
279 compressType = xa->props->getString(xa->props, "dispatch/connection/plugin/socket/compress/type", compressType);
280
281 if (!strcmp(compressType, "zlib:stream")) {
282 xa->connectionP->writeToSocket.writeToSocketFuncP = writenCompressed;
283 xa->connectionP->writeToSocket.userP = xa;
284 xa->connectionP->readFromSocket.readFromSocketFuncP = readnCompressed;
285 xa->connectionP->readFromSocket.userP = xa;
286 }
287 else {
288 if (strcmp(compressType, "")) {
289 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode", compressType);
290 }
291 xa->connectionP->writeToSocket.writeToSocketFuncP = writenPlain;
292 xa->connectionP->writeToSocket.userP = xa;
293 xa->connectionP->readFromSocket.readFromSocketFuncP = readnPlain;
294 xa->connectionP->readFromSocket.userP = xa;
295 }
296
297 if (xa->connectionP->initConnection(xa->connectionP, exception) == false) /* Establish low level IP connection */
298 return false;
299
300 /* the fourth arg 'xa' is returned as 'void *userData' in update() method */
301 if (xa->callbackP != 0) {
302 freeCallbackServerUnparsed(&xa->callbackP);
303 }
304 xa->callbackP = getCallbackServerUnparsed(xa->argc, xa->argv, interceptUpdate, xa);
305 if (xa->callbackP == 0) {
306 strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
307 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
308 "[%.100s:%d] Creating CallbackServerUnparsed failed", __FILE__, __LINE__);
309 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
310 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
311 return false;
312 }
313 xa->callbackP->log = xa->log;
314 xa->callbackP->logUserP = xa->logUserP;
315
316 if (!strcmp(compressType, "zlib:stream")) {
317 xa->callbackP->writeToSocket.writeToSocketFuncP = writenCompressed;
318 xa->callbackP->writeToSocket.userP = xa;
319 xa->callbackP->readFromSocket.readFromSocketFuncP = readnCompressed;
320 xa->callbackP->readFromSocket.userP = xa;
321 }
322 else {
323 xa->callbackP->writeToSocket.writeToSocketFuncP = writenPlain;
324 xa->callbackP->writeToSocket.userP = xa;
325 xa->callbackP->readFromSocket.readFromSocketFuncP = readnPlain;
326 xa->callbackP->readFromSocket.userP = xa;
327 }
328
329 xa->callbackP->useThisSocket(xa->callbackP, xa->connectionP->socketToXmlBlaster, xa->connectionP->socketToXmlBlasterUdp);
330
331 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
332 "Created CallbackServerUnparsed instance, creating on a separate thread a listener on socket://%s:%d...",
333 (xa->callbackP->hostCB == 0) ? "" : xa->callbackP->hostCB, xa->callbackP->portCB);
334
335 /* Register our callback funtion which is called just before sending a message */
336 xa->connectionP->preSendEvent = preSendEvent;
337 xa->connectionP->preSendEvent_userP = xa;
338
339 /* Register our callback funtion which is called just after sending a message */
340 xa->connectionP->postSendEvent = postSendEvent;
341 xa->connectionP->postSendEvent_userP = xa;
342
343 /* thread blocks on socket listener or on socket read (if useThisSocket) */
344 threadRet = pthread_create(&xa->callbackThreadId, (const pthread_attr_t *)0, (void * (*)(void *))xa->callbackP->runCallbackServer, (void *)xa->callbackP);
345 if (threadRet != 0) {
346 strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
347 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
348 "[%.100s:%d] Creating thread failed with error number %d",
349 __FILE__, __LINE__, threadRet);
350 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
351 freeCallbackServerUnparsed(&xa->callbackP);
352 freeXmlBlasterConnectionUnparsed(&xa->connectionP);
353 return false;
354 }
355 /* bool hasStarted = */xa->callbackP->waitOnCallbackThreadAlive(xa->callbackP, 5000);
356
357 xa->isInitialized = true;
358 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
359 "initialize() successful");
360 return xa->isInitialized;
361 }
362
363 static bool isConnected(XmlBlasterAccessUnparsed *xa)
364 {
365 if (xa == 0 || xa->isShutdown || xa->connectionP == 0) {
366 return false;
367 }
368 return xa->connectionP->isConnected(xa->connectionP);
369 }
370
371 /**
372 * Callback from #XmlBlasterConnectionUnparsed just before a message is sent,
373 * the msgRequestInfo contains the requestId used.
374 * This is the clients calling thread.
375 * @param msgRequestInfoP Contains some informations about the request, may not be NULL
376 * @param exception May not be NULL
377 * @return The same (or a manipulated/encrypted) msgRequestInfo, if NULL the exception is filled.
378 * If msgRequestInfoP->blob.data was changed and malloc()'d by you, the caller will free() it.
379 * If you return NULL you need to call removeResponseListener() to avoid a memory leak.
380 */
381 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
382 {
383 bool retBool;
384 int retInt;
385 XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
386
387 /* if (!strcmp(XMLBLASTER_PUBLISH_ONEWAY, msgRequestInfoP->methodName)) */
388 if (xbl_isOneway(MSG_TYPE_INVOKE, msgRequestInfoP->methodName))
389 return msgRequestInfoP;
390
391 /* ======== Initialize threading ====== */
392 msgRequestInfoP->responseMutexIsValid = false; /* Only to remember if the client thread holds the lock */
393
394 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
395 "preSendEvent(%s) occurred", msgRequestInfoP->methodName);
396 retBool = xa->callbackP->addResponseListener(xa->callbackP, msgRequestInfoP, responseEvent);
397 if (retBool == false) {
398 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
399 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
400 "[%.100s:%d] Couldn't register as response listener", __FILE__, __LINE__);
401 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
402 return (MsgRequestInfo *)0;
403 }
404
405 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
406 "preSendEvent(requestId=%s, msgRequestInfoP->responseBlob.dataLen=%d), entering lock",
407 msgRequestInfoP->requestIdStr, msgRequestInfoP->responseBlob.dataLen);
408 pthread_mutex_init(&msgRequestInfoP->responseMutex, NULL); /* returns always 0 */
409 if ((retInt = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
410 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
411 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
412 "[%.100s:%d] Error trying to lock responseMutex %d", __FILE__, __LINE__, retInt);
413 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
414 return (MsgRequestInfo *)0;
415 }
416 msgRequestInfoP->responseMutexIsValid = true; /* Only if the client thread holds the lock */
417
418 return msgRequestInfoP;
419 }
420
421 /**
422 * This function is called by the callback server when a response message arrived (after we send a request).
423 * The xa->responseBlob->data is malloc()'d with the response string, you need to free it.
424 * This method is executed by the callback server thread.
425 * @param msgRequestInfoP May not be NULL
426 * @param socketDataHolder is on the stack and does not need to be freed, the 'data' member is
427 * malloc()'d and must be freed by the caller.
428 */
429 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder) {
430 int retVal;
431 SocketDataHolder *s = (SocketDataHolder *)socketDataHolder;
432 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
433
434 if (msgRequestInfoP == 0)
435 return;
436
437 if (msgRequestInfoP->responseMutexIsValid == false)
438 return;
439
440 if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
441 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to lock responseMutex in responseEvent() failed %d", retVal);
442 if (msgRequestInfoP->responseMutexIsValid == false)
443 return;
444 }
445 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is LOCKED");
446
447 blobcpyAlloc(&msgRequestInfoP->responseBlob, s->blob.data, s->blob.dataLen);
448 msgRequestInfoP->responseType = s->type;
449
450 if ((retVal = pthread_cond_signal(&msgRequestInfoP->responseCond)) != 0) {
451 if (retVal == EINVAL)
452 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d EINVAL: responseCond is not valid", retVal);
453 else if (retVal == EFAULT)
454 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d EFAULT: responseCond points to illegal address", retVal);
455 else
456 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d", retVal);
457 /*return; we need to unlock the mutex */
458 }
459
460 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
461 "responseEvent(requestId '%s', msgType=%c, dataLen=%d) occurred, wake up signal sent",
462 s->requestId, msgRequestInfoP->responseType, msgRequestInfoP->responseBlob.dataLen);
463
464 if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) {
465 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to unlock responseMutex in responseEvent() failed %d", retVal);
466 /* return; */
467 }
468 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is UNLOCKED");
469 }
470
471 /**
472 * Callback function (wait for response) called directly after a message is sent.
473 * @param msgRequestInfoP Contains some informations about the request, may not be NULL
474 * @param exception May not be NULL
475 * @return The returned string from a request is written into msgRequestInfoP->data,
476 * the caller needs to free() it.
477 */
478 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
479 {
480 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
481 struct timespec abstime;
482 bool useTimeout = false;
483 int retVal, i;
484
485 if (msgRequestInfoP->rollback) {
486 xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
487 /* cb->shutdown(), cb->waitOnCallbackThreadTermination() */
488 mutexUnlock(msgRequestInfoP, exception);
489 return (MsgRequestInfo *)0;
490 }
491
492 if (xa->responseTimeout > 0 && getAbsoluteTime(xa->responseTimeout, &abstime) == true) {
493 useTimeout = true;
494 }
495
496 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) responseMutex is LOCKED, entering wait ...", msgRequestInfoP->requestIdStr);
497
498 if ((retVal = pthread_cond_init(&msgRequestInfoP->responseCond, NULL)) != 0) {
499 xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
500 strncpy0(exception->errorCode, "resource.exhaust", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */
501 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] pthread_cond_init() for '%s()' with requestId=%s returned %d.",
502 __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
503 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
504 return (MsgRequestInfo *)0;
505 }
506
507 /* Wait for response, the callback server delivers it */
508 while (msgRequestInfoP->responseType == 0) { /* Protect for spurious wake ups (e.g. by SIGUSR1) */
509 if (useTimeout == true) {
510 int error = pthread_cond_timedwait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex, &abstime);
511 if (error == ETIMEDOUT) {
512 /*
513 * TODO: msgRequestInfoP is on the stack and if we now return
514 * it will be invalid:
515 * removeResponseListener() removes it from the callback thread
516 * but what if the callback thread currently uses it?
517 */
518 xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
519 strncpy0(exception->errorCode, "communication.responseTimeout", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */
520 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Waiting on response for '%s()' with requestId=%s timed out after blocking %ld millis",
521 __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, xa->responseTimeout);
522 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
523 return (MsgRequestInfo *)0;
524 }
525 }
526 else {
527 pthread_cond_wait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex); /* Wakes up from responseEvent() */
528 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
529 "Wake up tread, response of length %d arrived", msgRequestInfoP->responseBlob.dataLen);
530 }
531 }
532
533 for (i=0; i<10; i++) { /* Error recovery loop */
534 if ((retVal = pthread_cond_destroy(&msgRequestInfoP->responseCond)) != 0) {
535 if (retVal == EBUSY) { /* Is in use by another thread */
536 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cond_destroy() for '%s()' with requestId=%s returned EBUSY=%d, we try again #%d/10",
537 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal, i);
538 sleepMillis(10);
539 continue;
540 }
541 else {
542 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cond_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
543 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
544 }
545 }
546 break;
547 }
548
549 msgRequestInfoP->blob.dataLen = msgRequestInfoP->responseBlob.dataLen;
550 msgRequestInfoP->blob.data = msgRequestInfoP->responseBlob.data;
551 msgRequestInfoP->responseBlob.dataLen = 0;
552 msgRequestInfoP->responseBlob.data = 0; /* msgRequestInfoP->blob.data is now responsible to free() the data */
553
554 if (xa->logLevel>=XMLBLASTER_LOG_TRACE)
555 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
556 "Thread #%ld woke up in postSendEvent() for msgType=%c and dataLen=%d",
557 msgRequestInfoP->requestIdStr, msgRequestInfoP->responseType, msgRequestInfoP->blob.dataLen);
558
559
560 if (msgRequestInfoP->responseType == (char)MSG_TYPE_EXCEPTION) {
561 convertToXmlBlasterException(&msgRequestInfoP->blob, exception, false);
562 freeBlobHolderContent(&msgRequestInfoP->blob);
563 msgRequestInfoP->responseType = 0;
564 return (MsgRequestInfo *)0;
565 }
566
567 msgRequestInfoP->responseType = 0;
568
569 /* if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) i woke up, entering unlock ...", msgRequestInfoP->requestIdStr); */
570 if (mutexUnlock(msgRequestInfoP, exception) == false)
571 return (MsgRequestInfo *)0;
572
573 return msgRequestInfoP;
574 }
575
576 /**
577 * Free lock.
578 * @param msgRequestInfoP Transporting data
579 * @param exception The exception struct, can be null
580 * @return false on error, the exception struct is filled in this case and the lock is not released
581 */
582 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception) {
583 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
584 int retVal;
585 if (msgRequestInfoP->responseMutexIsValid == false)
586 return true;
587 msgRequestInfoP->responseMutexIsValid = false;
588 if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) {
589 char embeddedText[XMLBLASTEREXCEPTION_MESSAGE_LEN];
590 if (exception == 0) {
591 if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
592 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
593 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
594 }
595 return false;
596 }
597 if (*exception->errorCode != 0) {
598 SNPRINTF(embeddedText, XMLBLASTEREXCEPTION_MESSAGE_LEN, "{%s:%s}", exception->errorCode, exception->message);
599 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Ignoring embedded exception %s: %s", exception->errorCode, exception->message);
600 }
601 else
602 *embeddedText = 0;
603 strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
604 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] ERROR trying to unlock responseMutex, return=%d. Embedded %s", __FILE__, __LINE__, retVal, embeddedText);
605 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
606
607 if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
608 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
609 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
610 }
611 return false;
612 }
613 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent() responseMutex is UNLOCKED");
614
615 if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
616 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
617 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
618 }
619 return true;
620 }
621
622 Dll_Export const char *xmlBlasterAccessUnparsedUsage(char *usage)
623 {
624 /* take care not to exceed XMLBLASTER_MAX_USAGE_LEN */
625 SNPRINTF(usage, XMLBLASTER_MAX_USAGE_LEN, "%.800s%.800s%.400s", xmlBlasterConnectionUnparsedUsage(), callbackServerRawUsage(),
626 "\n -plugin/socket/multiThreaded [true]"
627 "\n If true the update() call to your client code is a separate thread."
628 "\n -plugin/socket/responseTimeout [60000 (one minute)]"
629 "\n The time in millis to wait on a response, 0 is forever."
630 "\n -logLevel ERROR | WARN | INFO | TRACE | DUMP [WARN]"
631 );
632
633 return usage;
634 }
635
636 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos,
637 UpdateFp clientUpdateFp, XmlBlasterException *exception)
638 {
639 char *response = 0;
640 char *qos_;
641
642 if (checkArgs(xa, "connect", false, exception) == false) return 0;
643
644 /* Is allowed, we use our default handler in this case
645 if (clientUpdateFp == 0) {
646 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
647 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'updateFp' to connect()", __FILE__, __LINE__);
648 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
649 return false;
650 }
651 */
652
653 if (qos == 0) {
654 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
655 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'qos' to connect()", __FILE__, __LINE__);
656 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
657 return false;
658 }
659
660 if (initialize(xa, clientUpdateFp, exception) == false) {
661 return false;
662 }
663
664 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Invoking connect()");
665
666 if (strstr(qos, "<callback") != 0) {
667 /* User has given us a callback address */
668 qos_ = strcpyAlloc(qos);
669 }
670 else {
671 /* We add the callback sequence with our tunnel callback host and port
672 HACK: This is error prone depending on the given qos */
673 const char *pos;
674 enum { SIZE=1024 };
675 char callbackQos[SIZE];
676 snprintf0(callbackQos, SIZE,
677 "<queue relating='callback'>" /* maxEntries='100' maxEntriesCache='100'>" */
678 " <callback type='SOCKET' sessionId='%s'>"
679 " socket://%.120s:%d"
680 " </callback>"
681 "</queue>",
682 "NoCallbackSessionId", xa->callbackP->hostCB, xa->callbackP->portCB);
683 qos_ = (char *)calloc(strlen(qos) + SIZE, sizeof(char *));
684 pos = strstr(qos, "</qos>");
685 if (pos == 0) {
686 strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
687 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid 'qos' markup to connect()", __FILE__, __LINE__);
688 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
689 return false;
690 }
691 strncpy0(qos_, qos, pos-qos+1);
692 strncat0(qos_, callbackQos, SIZE-strlen(qos_));
693 strncat0(qos_, "</qos>", 8);
694 }
695 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connecting with qos=%s", qos_);
696
697 /* Register our function responseEvent() to be notified when the response arrives,
698 this is done by preSendEvent() callback called during connect() */
699
700 response = xa->connectionP->connect(xa->connectionP, qos_, exception);
701
702 free(qos_);
703 /* freeBlobHolderContent(&xa->responseBlob); */
704
705 /* The response was handled by a callback to postSendEvent */
706
707 if (response == 0) return response;
708
709 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
710 "Got response for connect(secretSessionId=%s)", xa->connectionP->secretSessionId);
711 return response;
712 }
713
714 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception)
715 {
716 bool p;
717 if (checkArgs(xa, "disconnect", true, exception) == false ) return 0;
718 p = xa->connectionP->disconnect(xa->connectionP, qos, exception);
719 return p;
720 }
721
722 /**
723 * Publish a message to the server.
724 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
725 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
726 * @see XmlBlasterConnectionUnparsed#publish() for a function documentation
727 */
728 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception)
729 {
730 char *p;
731 if (checkArgs(xa, "publish", true, exception) == false ) return 0;
732 p = xa->connectionP->publish(xa->connectionP, msgUnit, exception);
733 return p;
734 }
735
736 /**
737 * Publish a message array in a bulk to the server.
738 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
739 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
740 * @see XmlBlasterConnectionUnparsed#publishArr() for a function documentation
741 */
742 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
743 {
744 QosArr *p;
745 if (checkArgs(xa, "publishArr", true, exception) == false ) return 0;
746 p = xa->connectionP->publishArr(xa->connectionP, msgUnitArr, exception);
747 return p;
748 }
749
750 /**
751 * Publish a message array in a bulk to the server, we don't receive an ACK.
752 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
753 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
754 * @see XmlBlasterConnectionUnparsed#publishOneway() for a function documentation
755 */
756 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
757 {
758 if (checkArgs(xa, "publishOneway", true, exception) == false ) return;
759 xa->connectionP->publishOneway(xa->connectionP, msgUnitArr, exception);
760 }
761
762 /**
763 * Subscribe a message.
764 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.subscribe.html
765 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
766 */
767 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
768 {
769 char *p;
770 if (checkArgs(xa, "subscribe", true, exception) == false ) return 0;
771 p = xa->connectionP->subscribe(xa->connectionP, key, qos, exception);
772 return p;
773 }
774
775 /**
776 * UnSubscribe a message from the server.
777 * @return The raw QoS XML strings returned from xmlBlaster, only NULL if an exception is thrown
778 * You need to free it with freeQosArr() after usage
779 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.unSubscribe.html
780 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
781 */
782 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
783 {
784 QosArr *p;
785 if (checkArgs(xa, "unSubscribe", true, exception) == false ) return 0;
786 p = xa->connectionP->unSubscribe(xa->connectionP, key, qos, exception);
787 return p;
788 }
789
790 /**
791 * Erase a message from the server.
792 * @return A struct holding the raw QoS XML strings returned from xmlBlaster,
793 * only NULL if an exception is thrown.
794 * You need to freeQosArr() it
795 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.erase.html
796 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
797 */
798 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
799 {
800 QosArr *p;
801 if (checkArgs(xa, "erase", true, exception) == false ) return 0;
802 p = xa->connectionP->erase(xa->connectionP, key, qos, exception);
803 return p;
804 }
805
806 /**
807 * Ping the server.
808 * @param xa The 'this' pointer
809 * @param qos The QoS or 0
810 * @param exception *errorCode!=0 on failure
811 * @return The ping return QoS raw xml string, you need to free() it
812 * or 0 on failure (in which case *exception.errorCode!=0)
813 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
814 */
815 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception)
816 {
817 char *p;
818 if (checkArgs(xa, "ping", true, exception) == false ) return 0;
819 p = xa->connectionP->ping(xa->connectionP, qos, exception);
820 return p;
821 }
822
823 /**
824 * Get a message.
825 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.get.html
826 * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
827 * @return NULL on error, please check exception in such a case
828 */
829 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
830 {
831 MsgUnitArr *msgUnitArr;
832 if (checkArgs(xa, "get", true, exception) == false ) return 0;
833 msgUnitArr = xa->connectionP->get(xa->connectionP, key, qos, exception);
834 return msgUnitArr;
835 }
836
837 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName,
838 bool checkIsConnected, XmlBlasterException *exception)
839 {
840 if (xa == 0) {
841 char *stack = getStackTrace(10);
842 if (exception == 0) {
843 printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %s() %s",
844 __FILE__, __LINE__, methodName, stack);
845 }
846 else {
847 strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
848 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
849 "[%.100s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %.16s() %s",
850 __FILE__, __LINE__, methodName, stack);
851 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message);
852 }
853 free(stack);
854 return false;
855 }
856
857 if (exception == 0) {
858 char *stack = getStackTrace(10);
859 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception pointer to %s() %s",
860 __FILE__, __LINE__, methodName, stack);
861 free(stack);
862 return false;
863 }
864
865 if (xa->isShutdown || (checkIsConnected && !xa->isConnected(xa))) {
866 char *stack = getStackTrace(10);
867 strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
868 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
869 "[%.100s:%d] Not connected to xmlBlaster, %s() failed %s",
870 __FILE__, __LINE__, methodName, stack);
871 free(stack);
872 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
873 return false;
874 }
875
876 initializeXmlBlasterException(exception);
877
878 return true;
879 }
880
881 /**
882 * Run by the new created thread, calls the clients update method.
883 * Leaving this pthread start routine does an implicit pthread_exit().
884 * @param container Holding all necessary informations, we free it when we are done
885 * @return 0 on success, 1 on error. The return value is the exit value returned by pthread_join()
886 */
887 static int runUpdate(UpdateContainer *container)
888 {
889 XmlBlasterAccessUnparsed *xa = container->xa;
890 MsgUnitArr *msgUnitArrP = container->msgUnitArrP;
891 void *userData = container->userData;
892 CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData;
893 XmlBlasterException *exception = &container->exception;
894 SocketDataHolder *socketDataHolder = &container->socketDataHolder;
895 XMLBLASTER_C_bool retVal;
896
897 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Entering runUpdate()");
898
899 retVal = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
900
901 if (xa->lowLevelAutoAck) { /* returned already */
902 }
903 else {
904 cb->sendResponseOrException(retVal, cb, socketDataHolder, msgUnitArrP, exception);
905 }
906
907 free(container);
908
909 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
910 "runUpdate: Update thread 0x%x is exiting", get_pthread_id(pthread_self()));
911 xa->threadCounter--;
912 return (retVal==true) ? 0 : 1;
913 }
914
915 /**
916 * Here we receive the callback messages from xmlBlaster, create a thread and dispatch
917 * it to the clients update.
918 * @see UpdateFp in CallbackServerUnparsed.h
919 */
920 static void interceptUpdate(MsgUnitArr *msgUnitArrP, void *userData,
921 XmlBlasterException *exception, void /*SocketDataHolder*/ *socketDataHolder)
922 {
923 CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData;
924 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)cb->updateCbUserData;
925
926 if (xa->clientsUpdateFp == 0) { /* Client has not registered an update() */
927 size_t i;
928 bool testException = false;
929 bool success = true;
930
931 for (i=0; i<msgUnitArrP->len; i++) {
932 const char *key = msgUnitArrP->msgUnitArr[i].key;
933 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
934 "CALLBACK update() default handler: Asynchronous message update arrived:%s id=%s, we ignore it in this default handler\n",
935 key, ((SocketDataHolder*)socketDataHolder)->requestId);
936 msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
937 /* Return QoS: Everything is OK */
938 }
939 if (testException) {
940 strncpy0(exception->errorCode, "user.clientCode",
941 XMLBLASTEREXCEPTION_ERRORCODE_LEN);
942 strncpy0(exception->message, "I don't want these messages",
943 XMLBLASTEREXCEPTION_MESSAGE_LEN);
944 success = false;
945 }
946 cb->sendResponseOrException(success, cb, socketDataHolder, msgUnitArrP, exception);
947 return;
948 }
949
950 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "interceptUpdate(): Received message");
951
952 if (xa->callbackMultiThreaded == false) {
953 XMLBLASTER_C_bool ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
954 cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception);
955 return;
956 }
957
958 {
959 pthread_t tid;
960 int threadRet = 0;
961 UpdateContainer *container = (UpdateContainer*)malloc(sizeof(UpdateContainer));
962 pthread_attr_t attr;
963
964 pthread_attr_init(&attr);
965 /* Cleanup all resources after ending the thread, instead of calling pthread_join() */
966 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
967
968 container->xa = xa;
969 container->msgUnitArrP = msgUnitArrP;
970 container->userData = userData;
971 memcpy(&container->exception, exception, sizeof(XmlBlasterException));
972 memcpy(&container->socketDataHolder, socketDataHolder, sizeof(SocketDataHolder)); /* The blob pointer is freed already by CallbackServerUnparsed */
973
974 if (xa->lowLevelAutoAck) {
975 size_t i;
976 for (i=0; i<msgUnitArrP->len; i++) {
977 msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
978 }
979 }
980
981 /*
982 Guaranteed sequence:
983 The server uses max one thread to deliver update() for each client
984 If the update contains an array of messages those are handled as a
985 complete bulk in the correct sequence here.
986 */
987
988 /* this thread will deliver the update message to the client code,
989 Note: we need a thread pool cache for better performance */
990 xa->threadCounter++;
991 threadRet = pthread_create(&tid, &attr,
992 (void * (*)(void *))runUpdate, (void *)container);
993 if (threadRet != 0) {
994 XMLBLASTER_C_bool ret = false;
995 free(container);
996 strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
997 SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
998 "[%.100s:%d] Creating thread failed with error number %d, we deliver the message in the same thread",
999 __FILE__, __LINE__, threadRet);
1000 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message);
1001 ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
1002 cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception);
1003 xa->threadCounter--;
1004 pthread_attr_destroy(&attr);
1005 return;
1006 }
1007
1008 /* Is done already with above PTHREAD_CREATE_DETACHED
1009 threadRet = pthread_detach(tid);
1010 if (threadRet != 0) {
1011 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching thread failed with error number %d", __LINE__, threadRet);
1012 }
1013 */
1014
1015 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1016 "interceptUpdate: Received message and delegated it to a separate thread 0x%x to deliver", get_pthread_id(tid));
1017
1018 pthread_attr_destroy(&attr);
1019 }
1020
1021 if (xa->lowLevelAutoAck) {
1022 *exception->errorCode = 0;
1023 cb->sendResponseOrException(true, cb, socketDataHolder, msgUnitArrP, exception);
1024 }
1025 }
1026
1027 /**
1028 * Write uncompressed to socket (thread safe)
1029 */
1030 static ssize_t writenPlain(void * userP, const int fd, const char *ptr, const size_t nbytes) {
1031 int rc;
1032 ssize_t ret;
1033
1034 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1035
1036 /* Start mutex */
1037 rc = pthread_mutex_lock(&xa->writenMutex);
1038 if (rc != 0) /* EINVAL */
1039 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc);
1040
1041 /* Send data */
1042 ret = writen(fd, ptr, nbytes);
1043
1044 /* End mutex */
1045 rc = pthread_mutex_unlock(&xa->writenMutex);
1046 if (rc != 0) /* EPERM */
1047 xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc);
1048
1049 return ret;
1050
1051 }
1052
1053 /**
1054 * Compress data and send to socket.
1055 */
1056 static ssize_t writenCompressed(void *userP, const int fd, const char *ptr, const size_t nbytes) {
1057 int rc;
1058 ssize_t ret;
1059
1060 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1061 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "writenCompressed(%u)", nbytes);
1062
1063 /* Start mutex */
1064 rc = pthread_mutex_lock(&xa->writenMutex);
1065 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc);
1066
1067 /* Send data */
1068 ret = xmlBlaster_writenCompressed(xa->connectionP->zlibWriteBuf, fd, ptr, nbytes);
1069
1070 /* End mutex */
1071 rc = pthread_mutex_unlock(&xa->writenMutex);
1072 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc);
1073
1074 return ret;
1075 }
1076
1077 /**
1078 * Read uncompressed to socket (thread safe)
1079 */
1080 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1081 int rc;
1082 ssize_t ret;
1083
1084 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1085
1086 rc = pthread_mutex_lock(&xa->readnMutex);
1087 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc);
1088
1089 ret = readn(fd, ptr, nbytes, fpNumRead, userP2);
1090
1091 rc = pthread_mutex_unlock(&xa->readnMutex);
1092 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc);
1093
1094 return ret;
1095 }
1096
1097 /**
1098 * Read data from socket, uncompress it if necessary.
1099 */
1100 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1101 int rc;
1102 ssize_t ret;
1103
1104 XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1105 if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "readnCompressed(%u)", nbytes);
1106
1107 rc = pthread_mutex_lock(&xa->readnMutex);
1108 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc);
1109
1110 ret = xmlBlaster_readnCompressed(xa->connectionP->zlibReadBuf, fd, ptr, nbytes, fpNumRead, userP2);
1111
1112 rc = pthread_mutex_unlock(&xa->readnMutex);
1113 if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc);
1114
1115 return ret;
1116 }
1117
1118 #ifdef XmlBlasterAccessUnparsedMain /* compile a standalone test program */
1119
1120 /**
1121 * Here we receive the callback messages from xmlBlaster
1122 * FOR TESTING ONLY
1123 * @see UpdateFp in CallbackServerUnparsed.h
1124 */
1125 static bool myUpdate(MsgUnitArr *msgUnitArrP, void *userData, XmlBlasterException *xmlBlasterException)
1126 {
1127 size_t i;
1128 bool testException = false;
1129 if (userData != 0) ; /* to avoid compiler warning (we don't need it here) */
1130 for (i=0; i<msgUnitArrP->len; i++) {
1131 char *xml = messageUnitToXml(&msgUnitArrP->msgUnitArr[i]);
1132 printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n", xml);
1133 free(xml);
1134 msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
1135 /* Return QoS: Everything is OK */
1136 }
1137 if (testException) {
1138 strncpy0(xmlBlasterException->errorCode, "user.clientCode",
1139 XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1140 strncpy0(xmlBlasterException->message, "I don't want these messages",
1141 XMLBLASTEREXCEPTION_MESSAGE_LEN);
1142 return false;
1143 }
1144 return true;
1145 }
1146
1147 /**
1148 * Invoke: XmlBlasterAccessUnparsedMain -logLevel TRACE -numTests 10
1149 */
1150 int main(int argc, char** argv)
1151 {
1152 int ii;
1153 int numTests = 1;
1154 bool testCallInitialize = false;
1155
1156 for (ii=0; ii < argc-1; ii++)
1157 if (strcmp(argv[ii], "-numTests") == 0) {
1158 if (strToInt(&numTests, argv[++ii]) == false)
1159 printf("[XmlBlasterAccessUnparsed] WARN '-numTests %s' is invalid\n", argv[ii]);
1160 }
1161
1162 for (ii=0; ii<numTests; ii++) {
1163 int iarg;
1164 char *response = (char *)0;
1165 /*
1166 * callbackSessionId:
1167 * Is created by the client and used to validate callback messages in update.
1168 * This is sent on connect in ConnectQos.
1169 * (Is different from the xmlBlaster secret session ID)
1170 */
1171 const char *callbackSessionId = "topSecret";
1172 XmlBlasterException xmlBlasterException;
1173 XmlBlasterAccessUnparsed *xa = 0;
1174
1175 /*
1176 const char *tmp = getStackTrace(20);
1177 printf("[client] stackTrace=%s\n", tmp);
1178 free(tmp);
1179 */
1180
1181 # ifdef PTHREAD_THREADS_MAX
1182 printf("[client] Try option '-help' if you need usage informations, max %d"
1183 " threads per process are supported on this OS\n", PTHREAD_THREADS_MAX);
1184 # else
1185 printf("[client] Try option '-help' if you need usage informations\n");
1186 # endif
1187
1188 for (iarg=0; iarg < argc; iarg++) {
1189 if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
1190 char usage[XMLBLASTER_MAX_USAGE_LEN];
1191 const char *pp =
1192 "\n -logLevel ERROR | WARN | INFO | TRACE | DUMP [WARN]"
1193 "\n -numTests How often to run the same tests [1]"
1194 "\n\nExample:"
1195 "\n XmlBlasterAccessUnparsedMain -logLevel TRACE"
1196 " -dispatch/connection/plugin/socket/hostname server.mars.universe";
1197 printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n",
1198 getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp);
1199 exit(1);
1200 }
1201 }
1202
1203 xa = getXmlBlasterAccessUnparsed(argc, argv);
1204
1205 if (testCallInitialize) {
1206 if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) {
1207 printf("[client] Connection to xmlBlaster failed,"
1208 " please start the server or check your configuration\n");
1209 freeXmlBlasterAccessUnparsed(xa);
1210 exit(1);
1211 }
1212 }
1213
1214 { /* connect */
1215 char connectQos[2048];
1216 char callbackQos[1024];
1217
1218 if (testCallInitialize) {
1219 SNPRINTF(callbackQos, 1024,
1220 "<queue relating='callback' maxEntries='100' maxEntriesCache='100'>"
1221 " <callback type='SOCKET' sessionId='%s'>"
1222 " socket://%.120s:%d"
1223 " </callback>"
1224 "</queue>",
1225 callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB);
1226 }
1227 else
1228 *callbackQos = '\0';
1229
1230 SNPRINTF(connectQos, 2048,
1231 "<qos>"
1232 " <securityService type='htpasswd' version='1.0'>"
1233 " <![CDATA["
1234 " <user>fritz</user>"
1235 " <passwd>secret</passwd>"
1236 " ]]>"
1237 " </securityService>"
1238 "%.1024s"
1239 "</qos>", callbackQos);
1240
1241 response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException);
1242 if (*xmlBlasterException.errorCode != 0) {
1243 printf("[client] Caught exception during connect errorCode=%s, message=%s\n",
1244 xmlBlasterException.errorCode, xmlBlasterException.message);
1245 freeXmlBlasterAccessUnparsed(xa);
1246 exit(1);
1247 }
1248 free(response);
1249 printf("[client] Connected to xmlBlaster, do some tests ...\n");
1250 }
1251
1252 response = xa->ping(xa, 0, &xmlBlasterException);
1253 if (response == (char *)0) {
1254 printf("[client] ERROR: Pinging a connected server failed: errorCode=%s, message=%s\n",
1255 xmlBlasterException.errorCode, xmlBlasterException.message);
1256 }
1257 else {
1258 printf("[client] Pinging a connected server, response=%s\n", response);
1259 free(response);
1260 }
1261
1262 { /* subscribe ... */
1263 const char *key = "<key oid='HelloWorld'/>";
1264 const char *qos = "<qos/>";
1265 printf("[client] Subscribe message 'HelloWorld' ...\n");
1266 response = xa->subscribe(xa, key, qos, &xmlBlasterException);
1267 if (*xmlBlasterException.errorCode != 0) {
1268 printf("[client] Caught exception in subscribe errorCode=%s, message=%s\n",
1269 xmlBlasterException.errorCode, xmlBlasterException.message);
1270 xa->disconnect(xa, 0, &xmlBlasterException);
1271 freeXmlBlasterAccessUnparsed(xa);
1272 exit(1);
1273 }
1274 printf("[client] Subscribe success, returned status is '%s'\n", response);
1275 free(response);
1276 }
1277
1278 { /* publish ... */
1279 MsgUnit msgUnit;
1280 printf("[client] Publishing message 'HelloWorld' ...\n");
1281 msgUnit.key = strcpyAlloc("<key oid='HelloWorld'/>");
1282 msgUnit.content = strcpyAlloc("Some message payload");
1283 msgUnit.contentLen = strlen(msgUnit.content);
1284 msgUnit.qos =strcpyAlloc("<qos><persistent/></qos>");
1285 response = xa->publish(xa, &msgUnit, &xmlBlasterException);
1286 freeMsgUnitData(&msgUnit);
1287 if (*xmlBlasterException.errorCode != 0) {
1288 printf("[client] Caught exception in publish errorCode=%s, message=%s\n",
1289 xmlBlasterException.errorCode, xmlBlasterException.message);
1290 xa->disconnect(xa, 0, &xmlBlasterException);
1291 freeXmlBlasterAccessUnparsed(xa);
1292 exit(1);
1293 }
1294 printf("[client] Publish success, returned status is '%s'\n", response);
1295 free(response);
1296 }
1297
1298 { /* unSubscribe ... */
1299 const char *key = "<key oid='HelloWorld'/>";
1300 const char *qos = "<qos/>";
1301 printf("[client] UnSubscribe message 'HelloWorld' ...\n");
1302 response = xa->unSubscribe(xa, key, qos, &xmlBlasterException);
1303 if (response) {
1304 printf("[client] Unsubscribe success, returned status is '%s'\n", response);
1305 free(response);
1306 }
1307 else {
1308 printf("[client] Caught exception in unSubscribe errorCode=%s, message=%s\n",
1309 xmlBlasterException.errorCode, xmlBlasterException.message);
1310 xa->disconnect(xa, 0, &xmlBlasterException);
1311 freeXmlBlasterAccessUnparsed(xa);
1312 exit(1);
1313 }
1314 }
1315
1316 { /* get synchnronous ... */
1317 size_t i;
1318 const char *key = "<key queryType='XPATH'>//key</key>";
1319 const char *qos = "<qos/>";
1320 MsgUnitArr *msgUnitArr;
1321 printf("[client] Get synchronous messages with XPath '//key' ...\n");
1322 msgUnitArr = xa->get(xa, key, qos, &xmlBlasterException);
1323 if (*xmlBlasterException.errorCode != 0) {
1324 printf("[client] Caught exception in get errorCode=%s, message=%s\n",
1325 xmlBlasterException.errorCode, xmlBlasterException.message);
1326 xa->disconnect(xa, 0, &xmlBlasterException);
1327 freeXmlBlasterAccessUnparsed(xa);
1328 exit(1);
1329 }
1330 if (msgUnitArr != (MsgUnitArr *)0) {
1331 for (i=0; i<msgUnitArr->len; i++) {
1332 char *contentStr = strFromBlobAlloc(msgUnitArr->msgUnitArr[i].content,
1333 msgUnitArr->msgUnitArr[i].contentLen);
1334 const char *dots = (msgUnitArr->msgUnitArr[i].contentLen > 96) ?
1335 " ..." : "";
1336 printf("\n[client] Received message#%u/%u:\n"
1337 "-------------------------------------"
1338 "%s\n <content>%.100s%s</content>%s\n"
1339 "-------------------------------------\n",
1340 i+1, msgUnitArr->len,
1341 msgUnitArr->msgUnitArr[i].key,
1342 contentStr, dots,
1343 msgUnitArr->msgUnitArr[i].qos);
1344 free(contentStr);
1345 }
1346 freeMsgUnitArr(msgUnitArr);
1347 }
1348 else {
1349 printf("[client] Caught exception in get errorCode=%s, message=%s\n",
1350 xmlBlasterException.errorCode, xmlBlasterException.message);
1351 xa->disconnect(xa, 0, &xmlBlasterException);
1352 freeXmlBlasterAccessUnparsed(xa);
1353 exit(1);
1354 }
1355 }
1356
1357
1358 { /* erase ... */
1359 const char *key = "<key oid='HelloWorld'/>";
1360 const char *qos = "<qos/>";
1361 printf("[client] Erasing message 'HelloWorld' ...\n");
1362 response = xa->erase(xa, key, qos, &xmlBlasterException);
1363 if (*xmlBlasterException.errorCode != 0) {
1364 printf("[client] Caught exception in erase errorCode=%s, message=%s\n",
1365 xmlBlasterException.errorCode, xmlBlasterException.message);
1366 xa->disconnect(xa, 0, &xmlBlasterException);
1367 freeXmlBlasterAccessUnparsed(xa);
1368 exit(1);
1369 }
1370 printf("[client] Erase success, returned status is '%s'\n", response);
1371 free(response);
1372 }
1373
1374 if (xa->disconnect(xa, 0, &xmlBlasterException) == false) {
1375 printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n",
1376 xmlBlasterException.errorCode, xmlBlasterException.message);
1377 freeXmlBlasterAccessUnparsed(xa);
1378 exit(1);
1379 }
1380
1381 freeXmlBlasterAccessUnparsed(xa);
1382 if (numTests > 1) {
1383 printf("[client] Successfully finished test #%d from %d\n\n", ii, numTests);
1384 }
1385 }
1386 printf("[client] Good bye.\n");
1387 return 0; /*exit(0);*/
1388 }
1389 #endif /* #ifdef XmlBlasterAccessUnparsedMain */
syntax highlighted by Code2HTML, v. 0.9.1