demo/c/socket/Publisher.c

Go to the documentation of this file.
00001 /*----------------------------------------------------------------------------
00002 Name:      xmlBlaster/demo/c/socket/Publisher.c
00003 Project:   xmlBlaster.org
00004 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
00005 Comment:   Demo to publish messages from command line
00006 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
00007 Compile:   cd xmlBlaster; build.sh c
00008            (Win: copy xmlBlaster\src\c\socket\pthreadVC2.dll to your PATH)
00009 Invoke:    Publisher -help
00010 See:    http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
00011 -----------------------------------------------------------------------------*/
00012 #include <stdio.h>
00013 #include <stdlib.h>
00014 #include <string.h>
00015 #include <XmlBlasterAccessUnparsed.h>
00016 
00017 static char* readFile(const char *fn);
00018 
00019 #if defined(WINCE)
00020 int _tmain(int argc, _TCHAR** argv_wcs) { /* wchar_t==_TCHAR */
00021    char **argv = convertWcsArgv(argv_wcs, argc);
00022 #else
00023 
00028 int main(int argc, const char* const* argv) {
00029 #endif
00030    int iarg, iPublish;
00031    const char *callbackSessionId = "topSecret";
00032    XmlBlasterException xmlBlasterException;
00033    XmlBlasterAccessUnparsed *xa = 0;
00034    bool disconnect = true;
00035    bool erase = true;
00036    const char *publishToken = 0;
00037 
00038    printf("[client] XmlBlaster %s C SOCKET client, try option '-help' if you need"
00039           " usage informations\n", getXmlBlasterVersion());
00040 
00041    for (iarg=0; iarg < argc; iarg++) {
00042       if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
00043          char usage[XMLBLASTER_MAX_USAGE_LEN];
00044          const char *pp =
00045          "\n\nExample:"
00046          "\n  Publisher -logLevel TRACE"
00047          " -dispatch/connection/plugin/socket/hostname 192.168.2.9";
00048          printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n",
00049                   getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp);
00050          exit(EXIT_FAILURE);
00051       }
00052    }
00053 
00054    xa = getXmlBlasterAccessUnparsed(argc, (const char* const* )argv);
00055    if (xa->initialize(xa, 0, &xmlBlasterException) == false) {
00056       printf("[client] Connection to xmlBlaster failed,"
00057              " please start the server or check your configuration\n");
00058       freeXmlBlasterAccessUnparsed(xa);
00059       exit(EXIT_FAILURE);
00060    }
00061 
00062    disconnect = xa->props->getBool(xa->props, "disconnect", disconnect);
00063    erase = xa->props->getBool(xa->props, "erase", erase);
00064 
00065    {  /* connect */
00066       char *response = (char *)0;
00067       const char * const sessionName = xa->props->getString(xa->props, "session.name", "Publisher");
00068       const char * const passwd = xa->props->getString(xa->props, "passwd", "publisher");
00069       long sessionTimeout = xa->props->getLong(xa->props, "session.timeout", 86400000L);
00070       int maxSessions = xa->props->getInt(xa->props, "session.maxSessions", 10);
00071       const bool persistent = xa->props->getBool(xa->props, "persistentConnection", false);
00072       char connectQos[4096];
00073       char callbackQos[1024];
00074       sprintf(callbackQos,
00075                "<queue relating='callback' maxEntries='10000000' maxEntriesCache='10000000'>"
00076                "  <callback type='SOCKET' sessionId='%.256s'>"
00077                "    socket://%.120s:%d"
00078                "  </callback>"
00079                "</queue>",
00080                callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB);
00081       sprintf(connectQos,
00082                "<qos>"
00083                " <securityService type='htpasswd' version='1.0'>"
00084                "  <![CDATA["
00085                "   <user>%.80s</user>"
00086                "   <passwd>%.40s</passwd>"
00087                "  ]]>"
00088                " </securityService>"
00089                " <session name='%.80s' timeout='%ld' maxSessions='%d' clearSessions='false' reconnectSameClientOnly='false'/>"
00090                " %.20s"
00091                "%.1024s"
00092                "</qos>", sessionName, passwd, sessionName, sessionTimeout, maxSessions, persistent?"<persistent/>":"", callbackQos);
00093 
00094       response = xa->connect(xa, connectQos, 0, &xmlBlasterException);
00095       if (*xmlBlasterException.errorCode != 0) {
00096          printf("[client] Caught exception during connect errorCode=%s, message=%s\n",
00097                   xmlBlasterException.errorCode, xmlBlasterException.message);
00098          freeXmlBlasterAccessUnparsed(xa);
00099          exit(EXIT_FAILURE);
00100       }
00101       xmlBlasterFree(response);
00102       printf("[client] Connected to xmlBlaster, do some tests ...\n");
00103    }
00104 
00105    { /* publish ... */
00106       char *response = (char *)0;
00107 
00108       char key[4098];
00109       const char *oid = xa->props->getString(xa->props, "oid", "Hello");
00110       const char *domain = xa->props->getString(xa->props, "domain", 0);
00111       bool interactive = xa->props->getBool(xa->props, "interactive", true);
00112 
00113       char qos[4098];
00114       char topicQos[2048];
00115       char destinationQos[2048];
00116       bool oneway = xa->props->getBool(xa->props, "oneway", false);
00117       long sleep = xa->props->getLong(xa->props, "sleep", 1000L);
00118       int numPublish = xa->props->getInt(xa->props, "numPublish", 1);
00119       const char *clientTags = xa->props->getString(xa->props, "clientTags", "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>");
00120       const char *content = xa->props->getString(xa->props, "content", "Hi-%counter");
00121       int priority = xa->props->getInt(xa->props, "priority", 5);
00122       bool persistentPublish = xa->props->getBool(xa->props, "persistent", true);
00123       long lifeTime = xa->props->getLong(xa->props, "lifeTime", -1L);
00124       bool verbose = xa->props->getBool(xa->props, "verbose", true);
00125       bool forceUpdate = xa->props->getBool(xa->props, "forceUpdate", true);
00126       bool forceDestroy = xa->props->getBool(xa->props, "forceDestroy", false);
00127       bool readonly = xa->props->getBool(xa->props, "readonly", false);
00128       long destroyDelay = xa->props->getLong(xa->props, "destroyDelay", -1L);
00129       bool createDomEntry = xa->props->getBool(xa->props, "createDomEntry", true);
00130       long historyMaxMsg = xa->props->getLong(xa->props, "queue/history/maxEntries", 10L);
00131       long historyMaxBytes = xa->props->getLong(xa->props, "queue/history/maxBytes", 2147483647L);
00132       bool forceQueuing = xa->props->getBool(xa->props, "forceQueuing", true);
00133       bool subscribable = xa->props->getBool(xa->props, "subscribable", true);
00134       const char *destination = xa->props->getString(xa->props, "destination", 0);
00135       int contentSize = xa->props->getInt(xa->props, "contentSize", -1);
00136       const char *contentFile = xa->props->getString(xa->props, "contentFile", 0);
00137       /*Map clientPropertyMap = xa->props->getInt(xa->props, "clientProperty", (Map)0); */
00138 
00139       publishToken = (domain == 0) ? oid : domain;
00140 
00141       sprintf(key, "<key oid='%.512s' domain='%.100s'>%.2000s</key>",
00142                   oid, ((domain==0)?"":domain), clientTags);
00143 
00144       sprintf(topicQos, 
00145                    " <topic readonly='%.20s' destroyDelay='%ld' createDomEntry='%.20s'>"
00146                    "  <persistence/>"
00147                    "  <queue relating='history' type='CACHE' version='1.0' maxEntries='%ld' maxBytes='%ld'/>"
00148                    " </topic>",
00149                    readonly?"true":"false",
00150                    destroyDelay,
00151                    createDomEntry?"true":"false",
00152                    historyMaxMsg,
00153                    historyMaxBytes
00154                    );
00155       if (destination!=0)
00156          sprintf(destinationQos, " <destination queryType='EXACT' forceQueuing='%.20s'>%.512s</destination>",
00157                  forceQueuing?"true":"false", destination);
00158       else
00159          *destinationQos = 0;
00160 
00161       for (iPublish=0; iPublish<numPublish || numPublish==-1; iPublish++) {
00162          char msg[20];
00163          const char *pp = strstr(key, "%counter");
00164          MsgUnit msgUnit;
00165          memset(&msgUnit, 0, sizeof(MsgUnit));
00166 
00167          if (interactive) {
00168             printf("[client] Hit a key to publish '%s' #%d/%d ('b' to break) >> ", oid, iPublish, numPublish);
00169             fgets(msg, 19, stdin);
00170             if (*msg == 'b') 
00171                break;
00172          }
00173          else {
00174             if (sleep > 0) {
00175                sleepMillis(sleep);
00176             }
00177             if (verbose) {
00178                if (contentFile != 0)
00179                   printf("[client] Publish to topic '%s' file '%s' #%d/%d\n", oid, contentFile, iPublish, numPublish);
00180                else
00181                   printf("[client] Publish to topic '%s' #%d/%d\n", oid, iPublish, numPublish);
00182             }
00183          }
00184 
00185          if (pp) { /* Replace '%counter' token by current index */
00186             char *k = (char *)malloc(strlen(key)+10);
00187             strncpy(k, key, pp-key);
00188             sprintf(k+(pp-key), "%d%s", iPublish, pp+strlen("%counter"));
00189             msgUnit.key = k;
00190          }
00191          else
00192             msgUnit.key = strcpyAlloc(key);
00193          
00194          if (iPublish == 1) *topicQos = 0;
00195          sprintf(qos, "<qos>"
00196                    " <priority>%d</priority>"
00197                    " <subscribable>%.20s</subscribable>"
00198                    " <expiration lifeTime='%ld'/>"
00199                    " <persistent>%.20s</persistent>"
00200                    " <forceUpdate>%.20s</forceUpdate>"
00201                    " <forceDestroy>%.20s</forceDestroy>"
00202                    " %.2048s"
00203                    " <clientProperty name='%.100s'>%.512s</clientProperty>"
00204                    " %.512s"
00205                    "</qos>",
00206                    priority,
00207                    subscribable?"true":"false",
00208                    lifeTime,
00209                    persistentPublish?"true":"false",
00210                    forceUpdate?"true":"false",
00211                    forceDestroy?"true":"false",
00212                    destinationQos,
00213                    "", "", /* ClientProperty */
00214                    topicQos
00215                    );
00216 
00217          /*if (iPublish == 0) printf("[client] publishQos is\n%s\n", qos);*/
00218 
00219          if (contentSize > 0) {
00220             int i;
00221             char *p = (char *)malloc(contentSize);
00222             for (i=0; i<contentSize; i++) {
00223                int ran = rand() % 100;
00224                p[i] = (char)(ran+28);
00225             }
00226             msgUnit.content = p;
00227             msgUnit.contentLen = contentSize;
00228          }
00229          else if (contentFile != 0) {
00230             char* p = readFile(contentFile);
00231             msgUnit.content = p;
00232             msgUnit.contentLen = strlen(msgUnit.content);
00233          }
00234          else {
00235             const char *pc = strstr(content, "%counter");
00236             if (pc) { /* Replace '%counter' token by current index */
00237                char *p = (char *)malloc(strlen(content)+10);
00238                strncpy(p, content, pc-content);
00239                sprintf(p+(pc-content), "%d%s", iPublish, pc+strlen("%counter"));
00240                msgUnit.content = p;
00241                msgUnit.contentLen = strlen(msgUnit.content);
00242             }
00243             else {
00244                msgUnit.content = strcpyAlloc(content);
00245                msgUnit.contentLen = strlen(msgUnit.content);
00246             }
00247          }
00248          msgUnit.qos =strcpyAlloc(qos);
00249          if (oneway) {
00250             MsgUnitArr msgUnitArr;
00251             msgUnitArr.len = 1;
00252             msgUnitArr.msgUnitArr = &msgUnit;
00253             xa->publishOneway(xa, &msgUnitArr, &xmlBlasterException);
00254          }
00255          else {
00256             response = xa->publish(xa, &msgUnit, &xmlBlasterException);
00257          }
00258          freeMsgUnitData(&msgUnit);
00259          if (*xmlBlasterException.errorCode != 0) {
00260             printf("[client] Caught exception in publish errorCode=%s, message=%s\n",
00261                      xmlBlasterException.errorCode, xmlBlasterException.message);
00262             xa->disconnect(xa, 0, &xmlBlasterException);
00263             freeXmlBlasterAccessUnparsed(xa);
00264             exit(EXIT_FAILURE);
00265          }
00266          if (verbose) {
00267            printf("[client] Publish success, returned status is '%s'\n", response);
00268          }
00269          xmlBlasterFree(response);
00270       }
00271    }
00272 
00273    while (true) {
00274       char msg[20];
00275       bool interactive = xa->props->getBool(xa->props, "interactiveQuit", true);
00276       if (!interactive) break;
00277                   
00278       printf("(Enter 'q' to exit) >> ");
00279       fgets(msg, 19, stdin);
00280       if (*msg == 'q') 
00281          break;
00282    }
00283     
00284    if (erase) {  /* erase ... */
00285       QosArr *resp;
00286       char key[256];
00287       const char *qos = "<qos/>";
00288       sprintf(key, "<key oid='%.200s'/>", publishToken); /* TODO: use subscriptionId */
00289       printf("[client] Erase topic '%s' ...\n", publishToken);
00290       resp = xa->erase(xa, key, qos, &xmlBlasterException);
00291       if (resp) {
00292          size_t i;
00293          for (i=0; i<resp->len; i++) {
00294             printf("[client] Erase success, returned status is '%s'\n", resp->qosArr[i]);
00295          }
00296          freeQosArr(resp);
00297       }
00298       else {
00299          printf("[client] Caught exception in erase errorCode=%s, message=%s\n",
00300                   xmlBlasterException.errorCode, xmlBlasterException.message);
00301          xa->disconnect(xa, 0, &xmlBlasterException);
00302          freeXmlBlasterAccessUnparsed(xa);
00303          exit(EXIT_FAILURE);
00304       }
00305    }
00306 
00307    if (disconnect) {
00308       if (xa->disconnect(xa, 0, &xmlBlasterException) == false) {
00309          printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n",
00310                   xmlBlasterException.errorCode, xmlBlasterException.message);
00311          freeXmlBlasterAccessUnparsed(xa);
00312          exit(EXIT_FAILURE);
00313       }
00314    }
00315 
00316    freeXmlBlasterAccessUnparsed(xa);
00317    printf("[client] Good bye.\n");
00318    return 0;
00319 }
00320 
00321 char* readFile(const char *fn) {
00322    FILE *fp;
00323    char *retbuf = NULL;
00324    size_t nchmax = 0;
00325    register int c;
00326    size_t nchread = 0;
00327    char *newbuf;
00328 
00329    if ((fp = fopen(fn, "r")) == NULL) {
00330       printf("Error Opening File %s.\n", fn);
00331       return 0;
00332    }
00333 
00334    while ((c = getc(fp)) != EOF) {
00335       if (nchread >= nchmax) {
00336          nchmax += 1024;
00337          if(nchread >= nchmax) { /* in case nchmax overflowed */
00338             free(retbuf);
00339             return NULL;
00340          }
00341 #ifdef SAFEREALLOC
00342          newbuf = realloc(retbuf, nchmax + 1);
00343 #else
00344          if (retbuf == NULL)      /* in case pre-ANSI realloc */
00345             newbuf = (char *)malloc(nchmax + 1);
00346          else    newbuf = (char *)realloc(retbuf, nchmax + 1);
00347 #endif
00348          /* +1 for \0 */
00349          if (newbuf == NULL) {
00350             free(retbuf);
00351             return NULL;
00352          }
00353          retbuf = newbuf;
00354       }
00355       retbuf[nchread++] = c;
00356    }
00357 
00358    if(retbuf != NULL) {
00359       retbuf[nchread] = '\0';
00360       newbuf = (char *)realloc(retbuf, nchread + 1);
00361       if(newbuf != NULL)
00362          retbuf = newbuf;
00363    }
00364 
00365    return retbuf;
00366 }