1 /*----------------------------------------------------------------------------
  2 Name:      xmlBlaster/testsuite/src/c/TestStress.c
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Test C client library
  6 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
  7 Compile:   cd xmlBlaster; build c
  8 Invoke:    Start 'java org.xmlBlaster.Main' and then 'TestStress'
  9 See:       http://www.xmlblaster.org/xmlBlaster/doc/requirements/c.client.socket.html
 10 See:       http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 11 -----------------------------------------------------------------------------*/
 12 #include <stdio.h>
 13 #include <stdlib.h>
 14 #include <string.h>
 15 #include <XmlBlasterAccessUnparsed.h>
 16 #include "test.h"
 17 
 18 static int argc = 0;
 19 static char** argv = 0;
 20 #define  ERRORSTR_LEN 4096
 21 static char errorString[ERRORSTR_LEN+1];
 22 static char updateContent[256];
 23 static void *updateUserData;
 24 static const char *CONTENT = "Some message payload";
 25 static size_t updateCounter = 0;
 26 
 27 /**
 28  * Here we receive the callback messages from xmlBlaster
 29  * mu_assert() does not help here as it is another thread
 30  */
 31 static bool myUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException)
 32 {
 33    size_t i;
 34    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userData;
 35    if (xmlBlasterException != 0) ;  /* Supress compiler warning */
 36    updateUserData = xa;
 37    updateCounter += msgUnitArr->len;
 38    for (i=0; i<msgUnitArr->len; i++) {
 39       MsgUnit *msg = &msgUnitArr->msgUnitArr[i];
 40       if (xa->logLevel>=XMLBLASTER_LOG_TRACE)
 41          xa->log(0, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "CALLBACK update(): Asynchronous message update arrived\n"); 
 42       strncpy0(updateContent, msg->content, msg->contentLen+1); /* Adds '\0' to the end */
 43       msgUnitArr->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
 44    }
 45    return true;
 46 }
 47 
 48 /**
 49  * Invoke: TestStress -logLevel TRACE
 50  */
 51 static const char * test_stress()
 52 {
 53    char *response = (char *)0;
 54    /*
 55       * callbackSessionId:
 56       * Is created by the client and used to validate callback messages in update. 
 57       * This is sent on connect in ConnectQos.
 58       * (Is different from the xmlBlaster secret session ID)
 59       */
 60    const char *callbackSessionId = "topSecret";
 61    XmlBlasterException xmlBlasterException;
 62    XmlBlasterAccessUnparsed *xa = 0;
 63    bool retBool;
 64    int iPub, iWait, numPublish, maxQueueEntries, maxSessions=10;
 65    const char *sessionName = "joe";
 66 
 67    xa = getXmlBlasterAccessUnparsed(argc, (const char* const*)argv);
 68    if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) {
 69       freeXmlBlasterAccessUnparsed(xa);
 70       mu_fail("[TEST FAIL] Connection to xmlBlaster failed, please start the server or check your configuration");
 71    }
 72 
 73    numPublish      = xa->props->getInt(xa->props, "numPublish", 2500);
 74    maxQueueEntries = xa->props->getInt(xa->props, "queue/callback/maxEntries", numPublish);
 75    sessionName     = xa->props->getString(xa->props, "session.name", sessionName);
 76    maxSessions     = xa->props->getInt(xa->props, "session.maxSessions", maxSessions);
 77 
 78    {  /* connect */
 79       char connectQos[2048];
 80       char callbackQos[1024];
 81       sprintf(callbackQos,
 82                "<queue relating='callback' maxEntries='%d' maxEntriesCache='%d'>"
 83                "  <callback type='SOCKET' sessionId='%s'>"
 84                "    socket://%.120s:%d"
 85                "  </callback>"
 86                "</queue>",
 87                maxQueueEntries, maxQueueEntries, callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB);
 88       sprintf(connectQos,
 89                "<qos>"
 90                " <securityService type='htpasswd' version='1.0'>"
 91                "  <![CDATA["
 92                "   <user>fritz</user>"
 93                "   <passwd>secret</passwd>"
 94                "  ]]>"
 95                " </securityService>"
 96                " <session name='%.120s' timeout='3600000' maxSessions='%d' clearSessions='false' reconnectSameClientOnly='false'/>"
 97                "%.1024s"
 98                "</qos>", sessionName, maxSessions, callbackQos);
 99 
100       response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException);
101       if (*xmlBlasterException.errorCode != '\0') {
102          SNPRINTF(errorString, ERRORSTR_LEN, "[TEST FAIL] Caught exception during connect errorCode=%s, message=%s\n",
103                   xmlBlasterException.errorCode, xmlBlasterException.message);
104          freeXmlBlasterAccessUnparsed(xa);
105          mu_assert(errorString, false);
106       }
107       xmlBlasterFree(response);
108       printf("[client] Connected to xmlBlaster, do some tests ...\n");
109    }
110 
111    { /* subscribe ... */
112       const char *key = "<key oid='TestStress'/>";
113       const char *qos = "<qos/>";
114       printf("[client] Subscribe message 'TestStress' ...\n");
115       response = xa->subscribe(xa, key, qos, &xmlBlasterException);
116       if (*xmlBlasterException.errorCode != 0) {
117          SNPRINTF(errorString, ERRORSTR_LEN, "[TEST FAIL] Caught exception in subscribe errorCode=%s, message=%s\n",
118                   xmlBlasterException.errorCode, xmlBlasterException.message);
119          freeXmlBlasterAccessUnparsed(xa);
120          mu_assert(errorString, false);
121       }
122       printf("[client] Subscribe success\n");
123       mu_assert("Subscribe response is invalid", strstr(response, "subscribe id=")!=0);
124       mu_assert("Subscribe response is invalid", strstr(response, "WARNING")==0);
125       mu_assert("Subscribe response is invalid", strstr(response, "ERROR")==0);
126       xmlBlasterFree(response);
127    }
128 
129    printf("[client] Publishing %d messages 'TestStress' ...\n", numPublish);
130    for (iPub=0; iPub<numPublish; iPub++) {
131       char tmp[200];
132       MsgUnit msgUnit;
133       memset(&msgUnit, 0, sizeof(MsgUnit));
134       msgUnit.key = strcpyAlloc("<key oid='TestStress'/>");
135       sprintf(tmp, "#%d %s", (iPub+1), CONTENT);
136       msgUnit.content = strcpyAlloc(tmp);
137       msgUnit.contentLen = strlen(msgUnit.content);
138       msgUnit.qos =strcpyAlloc("<qos><persistent>false</persistent></qos>");
139       response = xa->publish(xa, &msgUnit, &xmlBlasterException);
140       freeMsgUnitData(&msgUnit);
141       if (*xmlBlasterException.errorCode != '\0') {
142          SNPRINTF(errorString, ERRORSTR_LEN, "[TEST FAIL] Caught exception in publish #%d errorCode=%s, message=%s\n",
143                   iPub, xmlBlasterException.errorCode, xmlBlasterException.message);
144          freeXmlBlasterAccessUnparsed(xa);
145          mu_assert(errorString, false);
146       }
147       if (xa->logLevel>=XMLBLASTER_LOG_TRACE)
148          xa->log(0, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Publish #%d messages success\n", iPub); 
149       mu_assert("Publish response is invalid", strstr(response, "rcvTimestamp nanos=")!=0);
150       xmlBlasterFree(response);
151    }
152 
153    for (iWait=0; iWait<10; iWait++) {
154       printf("[client] Publish of %d messages success, received %d updates\n", numPublish, (int32_t)updateCounter);
155       if ((int)updateCounter >= numPublish)
156          break;
157       sleepMillis(500);
158    }
159 
160    mu_assert("No update arrived", *updateContent != '\0');
161    if ((int)updateCounter < numPublish) {
162       freeXmlBlasterAccessUnparsed(xa);
163       mu_assert("Missing updates", (int)updateCounter == numPublish);
164    }
165    else if ((int)updateCounter > numPublish) {
166       printf("[client] WARN: Publish of %d messages but received %d updates\n", numPublish, (int32_t)updateCounter);
167    }
168    printf("[client] updateContent = %s, CONTENT = %s\n", updateContent, CONTENT);
169    mu_assert("Received wrong message in update()", strstr(updateContent, CONTENT) != 0);
170    *updateContent = '\0';
171 
172    mu_assert("UserData from update() is invalid", updateUserData == xa);
173 
174 
175    {  /* erase ... */
176       QosArr* responseArrP;
177       const char *key = "<key oid='TestStress'/>";
178       const char *qos = "<qos/>";
179       printf("[client] Erasing message 'TestStress' ...\n");
180       responseArrP = xa->erase(xa, key, qos, &xmlBlasterException);
181       if (*xmlBlasterException.errorCode != '\0') {
182          SNPRINTF(errorString, ERRORSTR_LEN, "[TEST FAIL] Caught exception in erase() errorCode=%s, message=%s\n",
183                   xmlBlasterException.errorCode, xmlBlasterException.message);
184          freeXmlBlasterAccessUnparsed(xa);
185          mu_assert(errorString, false);
186       }
187       printf("[client] Erase success\n");
188       freeQosArr(responseArrP);
189    }
190 
191    retBool = xa->disconnect(xa, 0, &xmlBlasterException);
192    if (*xmlBlasterException.errorCode != '\0') {
193       SNPRINTF(errorString, ERRORSTR_LEN, "[TEST FAIL] Caught exception in erase() errorCode=%s, message=%s\n",
194                xmlBlasterException.errorCode, xmlBlasterException.message);
195       freeXmlBlasterAccessUnparsed(xa);
196       mu_assert(errorString, false);
197    }
198    mu_assert("disconnect() returned false", retBool == true);
199 
200    if (*updateContent != '\0') { /* The erase event is sent as update as well */
201       *updateContent = '\0';
202    }
203 
204    freeXmlBlasterAccessUnparsed(xa);
205    printf("[client] Good bye.\n");
206    return 0;
207 }
208 
209 
210 static const char *all_tests()
211 {
212    mu_run_test(test_stress);
213    return 0;
214 }
215 
216 int main(int argc_, char **argv_)
217 {
218    const char *result;
219    argc = argc_;
220    argv = argv_;
221 
222    result = all_tests();
223 
224    if (result != 0) {
225       printf("%s\n", result);
226    }
227    else {
228       printf("ALL TESTS PASSED\n");
229    }
230    printf("Tests run: %d\n", tests_run);
231 
232    return result != 0;
233 }


syntax highlighted by Code2HTML, v. 0.9.1