1 // xmlBlaster/demo/javaclients/HelloWorldPublish.java
  2 package javaclients;
  3 
  4 import java.io.UnsupportedEncodingException;
  5 import java.util.Iterator;
  6 import java.util.Map;
  7 import java.util.Random;
  8 import java.util.logging.Level;
  9 import java.util.logging.Logger;
 10 
 11 import org.xmlBlaster.client.I_Callback;
 12 import org.xmlBlaster.client.I_ConnectionStateListener;
 13 import org.xmlBlaster.client.I_XmlBlasterAccess;
 14 import org.xmlBlaster.client.key.EraseKey;
 15 import org.xmlBlaster.client.key.PublishKey;
 16 import org.xmlBlaster.client.key.UpdateKey;
 17 import org.xmlBlaster.client.qos.ConnectQos;
 18 import org.xmlBlaster.client.qos.ConnectReturnQos;
 19 import org.xmlBlaster.client.qos.DisconnectQos;
 20 import org.xmlBlaster.client.qos.EraseQos;
 21 import org.xmlBlaster.client.qos.PublishQos;
 22 import org.xmlBlaster.client.qos.PublishReturnQos;
 23 import org.xmlBlaster.client.qos.UpdateQos;
 24 import org.xmlBlaster.util.FileLocator;
 25 import org.xmlBlaster.util.Global;
 26 import org.xmlBlaster.util.MsgUnit;
 27 import org.xmlBlaster.util.SessionName;
 28 import org.xmlBlaster.util.Timestamp;
 29 import org.xmlBlaster.util.XmlBlasterException;
 30 import org.xmlBlaster.util.def.MethodName;
 31 import org.xmlBlaster.util.def.PriorityEnum;
 32 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 33 import org.xmlBlaster.util.dispatch.I_PostSendListener;
 34 import org.xmlBlaster.util.qos.TopicProperty;
 35 import org.xmlBlaster.util.qos.address.Destination;
 36 import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;
 37 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
 38 
 39 /**
 40  * This client connects to xmlBlaster and publishes a configurable amount of messages.
 41  * <p>
 42  * This is a nice client to experiment and play with xmlBlaster as there are many
 43  * command line options to specify the type and amount of messages published.
 44  * </p>
 45  * <p>
 46  * Try using 'java javaclients.HelloWorldSubscribe' in another window to subscribe to
 47  * our messages.
 48  * Further you can type 'd' in the window running xmlBlaster to get a server dump.
 49  * </p>
 50  *
 51  * Invoke (after starting the xmlBlaster server):
 52  * <pre>
 53  *Publish manually 10 messages:
 54  * java javaclients.HelloWorldPublish -interactive true -numPublish 10 -oid Hello -persistent true -erase true
 55  *
 56  *Publish automatically 10 messages and sleep 1 sec in between:
 57  * java javaclients.HelloWorldPublish -interactive false -sleep 1000 -numPublish 10 -oid Hello -persistent true -erase true
 58  *
 59  *Publish automatically 10 different topics with different DOM entries:
 60  * java javaclients.HelloWorldPublish -interactive false -numPublish 10 -oid Hello-%counter -clientTags "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>"
 61  *
 62  *Login as joe/5 and send one persistent message:
 63  * java javaclients.HelloWorldPublish -session.name joe/5 -passwd secret -persistent true -dump[HelloWorldPublish] true
 64  *
 65  *Send a PtP message:
 66  * java javaclients.HelloWorldPublish -destination jack/17 -forceQueuing true -persistent true -subscribable true
 67  *
 68  *Add some client properties which will be send in the qos to the receivers:
 69  * java javaclients.HelloWorldPublish -clientProperty[transactionId] 0x23345 -clientProperty[myName] jack
 70  *creates a publish Qos containing:
 71  *   &lt;clientProperty name='transactionId'>0x23345&lt;/clientProperty>
 72  *   &lt;clientProperty name='myName'>jack&lt;/clientProperty>
 73  * </pre>
 74  * <p>
 75  * If interactive is false, the sleep gives the number of millis to sleep before publishing the next message.
 76  * </p>
 77  * <p>
 78  * If erase=false the message is not erase at the end, if disconnect=false we don't logout at the end.
 79  * </p>
 80  * <p>
 81  * You can add '%counter' to the clientTags or the content string, each occurrence will be replaced
 82  * by the current message number.
 83  * </p>
 84  * @see javaclients.HelloWorldSubscribe
 85  * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
 86  */
 87 public class HelloWorldPublish
 88 {
 89    private final Global glob;
 90    private static Logger log = Logger.getLogger(HelloWorldPublish.class.getName());
 91 
 92    public HelloWorldPublish(Global glob) {
 93       this.glob = glob;
 94 
 95       try {
 96          boolean interactive = glob.getProperty().get("interactive", true);
 97          boolean oneway = glob.getProperty().get("oneway", false);
 98          long sleep = glob.getProperty().get("sleep", 1000L);
 99          int numPublish = glob.getProperty().get("numPublish", 2000);
100          String oid = glob.getProperty().get("oid", "Hello");  // "HelloTopic_#%counter"
101          String domain = glob.getProperty().get("domain", (String)null);
102          String clientTags = glob.getProperty().get("clientTags", "<org.xmlBlaster><demo/></org.xmlBlaster>");
103          //String clientTags = glob.getProperty().get("clientTags", "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>");
104          String contentStr = glob.getProperty().get("content", "Hi-%counter");
105          String contentFile = glob.getProperty().get("contentFile", (String)null);
106          PriorityEnum priority = PriorityEnum.toPriorityEnum(glob.getProperty().get("priority", PriorityEnum.NORM_PRIORITY.getInt()));
107          boolean persistent = glob.getProperty().get("persistent", true);
108          long lifeTime = glob.getProperty().get("lifeTime", -1L);
109          boolean forceUpdate = glob.getProperty().get("forceUpdate", true);
110          boolean forceDestroy = glob.getProperty().get("forceDestroy", false);
111          boolean readonly = glob.getProperty().get("readonly", false);
112          long destroyDelay = glob.getProperty().get("destroyDelay", 60000L);
113          boolean createDomEntry = glob.getProperty().get("createDomEntry", true);
114          boolean consumableQueue = glob.getProperty().get("consumableQueue", false);
115          long historyMaxMsg = glob.getProperty().get("queue/history/maxEntries", -1L);
116          boolean forceQueuing = glob.getProperty().get("forceQueuing", true);
117          boolean subscribable = glob.getProperty().get("subscribable", true);
118          String destination = glob.getProperty().get("destination", (String)null);
119          boolean erase = glob.getProperty().get("erase", true);
120          boolean disconnect = glob.getProperty().get("disconnect", true);
121          final boolean eraseTailback = glob.getProperty().get("eraseTailback", false);
122          int contentSize = glob.getProperty().get("contentSize", -1); // 2000000);
123          boolean eraseForceDestroy = glob.getProperty().get("erase.forceDestroy", false);
124          boolean connectPersistent = glob.getProperty().get("connect/qos/persistent", false);
125 
126          Map clientPropertyMap = glob.getProperty().get("clientProperty", (Map)null);
127          Map connectQosClientPropertyMap = glob.getProperty().get("connect/qos/clientProperty", (Map)null);
128 
129          if (historyMaxMsg < 1 && !glob.getProperty().propertyExists("destroyDelay"))
130             destroyDelay = 24L*60L*60L*1000L; // Increase destroyDelay to one day if no history queue is used
131 
132          log.info("Used settings are:");
133          log.info("   -interactive    " + interactive);
134          log.info("   -sleep          " + Timestamp.millisToNice(sleep));
135          log.info("   -oneway         " + oneway);
136          log.info("   -erase          " + erase);
137          log.info("   -disconnect     " + disconnect);
138          log.info("   -eraseTailback  " + eraseTailback);
139          log.info(" Pub/Sub settings");
140          log.info("   -numPublish     " + numPublish);
141          log.info("   -oid            " + oid);
142          log.info("   -clientTags     " + clientTags);
143          log.info("   -domain         " + ((domain==null)?"":domain));
144          if (contentSize >= 0) {
145             log.info("   -content        [generated]");
146             log.info("   -contentSize    " + contentSize);
147          }
148          //else if (contentFile != null && contentFile.length() > 0) {
149          //   log.info("   -contentFile    " + contentFile);
150          //}
151          else {
152             log.info("   -content        " + contentStr);
153             log.info("   -contentSize    " + contentStr.length());
154             log.info("   -contentFile    " + contentFile);
155          }
156          log.info("   -priority       " + priority.toString());
157          log.info("   -persistent     " + persistent);
158          log.info("   -lifeTime       " + Timestamp.millisToNice(lifeTime));
159          log.info("   -forceUpdate    " + forceUpdate);
160          log.info("   -forceDestroy   " + forceDestroy);
161          if (clientPropertyMap != null) {
162             Iterator it = clientPropertyMap.keySet().iterator();
163             while (it.hasNext()) {
164                String key = (String)it.next();
165                log.info("   -clientProperty["+key+"]   " + clientPropertyMap.get(key).toString());
166             }
167          }
168          else {
169             log.info("   -clientProperty[]   ");
170          }
171          log.info(" Topic settings");
172          log.info("   -readonly       " + readonly);
173          log.info("   -destroyDelay   " + Timestamp.millisToNice(destroyDelay));
174          log.info("   -createDomEntry " + createDomEntry);
175          log.info("   -queue/history/maxEntries " + historyMaxMsg);
176          log.info("   -consumableQueue " + consumableQueue);
177          log.info(" PtP settings");
178          log.info("   -subscribable   " + subscribable);
179          log.info("   -forceQueuing   " + forceQueuing);
180          log.info("   -destination    " + destination);
181          log.info(" Erase settings");
182          log.info("   -erase.forceDestroy " + eraseForceDestroy);
183          log.info("   -erase.domain   " + ((domain==null)?"":domain));
184          log.info(" ConnectQos settings");
185          log.info("   -connect/qos/persistent " + connectPersistent);
186          if (connectQosClientPropertyMap != null) {
187             Iterator it = connectQosClientPropertyMap.keySet().iterator();
188             while (it.hasNext()) {
189                String key = (String)it.next();
190                log.info("   -connect/qos/clientProperty["+key+"]   " + connectQosClientPropertyMap.get(key).toString());
191             }
192          }
193          else {
194             log.info("   -connect/qos/clientProperty[]   ");
195          }
196          log.info("For more info please read:");
197          log.info("   http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html");
198 
199          I_XmlBlasterAccess con = glob.getXmlBlasterAccess();
200 
201          // Handle lost server explicitly
202          con.registerConnectionListener(new I_ConnectionStateListener() {
203 
204                public void reachedAlive(ConnectionStateEnum oldState,
205                                         I_XmlBlasterAccess connection) {
206                   /*
207                   ConnectReturnQos conRetQos = connection.getConnectReturnQos();
208                   log.info("I_ConnectionStateListener: We were lucky, connected to " +
209                      connection.getGlobal().getId() + " as " + conRetQos.getSessionName());
210                      */
211                   if (eraseTailback) {
212                      log.info("Destroying " + connection.getQueue().getNumOfEntries() +
213                                   " client side tailback messages");
214                      connection.getQueue().clear();
215                   }
216                }
217                public void reachedPolling(ConnectionStateEnum oldState,
218                                           I_XmlBlasterAccess connection) {
219                   log.warning("I_ConnectionStateListener: No connection to xmlBlaster server, we are polling ...");
220                }
221                public void reachedDead(ConnectionStateEnum oldState,
222                                        I_XmlBlasterAccess connection) {
223                   log.warning("I_ConnectionStateListener: Connection from " +
224                           connection.getGlobal().getId() + " to xmlBlaster is DEAD.");
225                   //System.exit(1);
226                }
227             });
228 
229          // This listener receives only events from asynchronously send messages from queue.
230          // e.g. after a reconnect when client side queued messages are delivered
231          con.registerPostSendListener(new I_PostSendListener() {
232             /**
233              * @see I_PostSendListener#postSend(MsgQueueEntry[])
234              */
235             public void postSend(MsgQueueEntry[] entries) {
236                try {
237                   for (int i=0; i<entries.length; i++) {
238                      if (MethodName.PUBLISH.equals(entries[i].getMethodName())) { 
239                         MsgUnit msg = entries[i].getMsgUnit();
240                         PublishReturnQos retQos = (PublishReturnQos)entries[i].getReturnObj();
241                         log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue: " + retQos.toXml());
242                      }
243                      else
244                         log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
245                   }
246                } catch (Throwable e) {
247                   e.printStackTrace();
248                }
249             }
250 
251             /**
252              * @see I_PostSendListener#sendingFailed(MsgQueueEntry[], XmlBlasterException)
253              */
254             public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException ex) {
255                try {
256                   for (int i=0; i<entries.length; i++) {
257                      if (MethodName.PUBLISH.equals(entries[i].getMethodName())) { 
258                         MsgUnit msg = entries[i].getMsgUnit();
259                         log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue failed: " + ex.getMessage());
260                      }
261                      else
262                         log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
263                   }
264                } catch (Throwable e) {
265                   e.printStackTrace();
266                }
267                //return true; // true: We have handled the case (safely stored the message) and it may be removed from connection queue
268                return false; // false: Default error handling: message remains in queue and we go to dead
269             }
270          });
271 
272          // ConnectQos checks -session.name and -passwd from command line
273          log.info("============= CreatingConnectQos");
274          ConnectQos qos = new ConnectQos(glob);
275          if (connectPersistent) {
276             qos.setPersistent(connectPersistent);
277          }
278          if (connectQosClientPropertyMap != null) {
279             Iterator it = connectQosClientPropertyMap.keySet().iterator();
280             while (it.hasNext()) {
281                String key = (String)it.next();
282                qos.addClientProperty(key, connectQosClientPropertyMap.get(key).toString());
283             }
284          }
285          log.info("ConnectQos is " + qos.toXml());
286          ConnectReturnQos crq = con.connect(qos, new I_Callback() {
287          public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
288             try {
289                log.info("Received '" + updateKey.getOid() + "':" + new String(content, "UTF-8"));
290             } catch (UnsupportedEncodingException e) {
291                log.severe("Update failed: " + e.toString());
292             }
293             return "";
294          }
295          });  // Login to xmlBlaster, register for updates
296          log.info("Connect success as " + crq.toXml());
297 
298          org.xmlBlaster.util.StopWatch stopWatch = new org.xmlBlaster.util.StopWatch();
299          for(int i=0; true; i++) {
300             if (numPublish != -1)
301                if (i>=numPublish)
302                   break;
303 
304             String currCounter = ""+(i+1);
305             if (numPublish > 0) { // Add leading zeros to have nice justified numbers in dump
306                String tmp = ""+numPublish;
307                int curLen = currCounter.length();
308                currCounter = "";
309                for (int j=curLen; j<tmp.length(); j++) {
310                   currCounter += "0";
311                }
312                currCounter += (i+1);
313             }
314 
315             String currOid = org.xmlBlaster.util.ReplaceVariable.replaceAll(oid, "%counter", currCounter);
316 
317             if (interactive) {
318                char ret = (char)Global.waitOnKeyboardHit("Hit 'b' to break, hit other key to publish '" + currOid + "' #" + currCounter + "/" + numPublish);
319                if (ret == 'b')
320                   break;
321             }
322             else {
323                if (sleep > 0 && i > 0) {
324                   try { Thread.sleep(sleep); } catch( InterruptedException e) {}
325                }
326                log.info("Publish '" + currOid + "' #" + currCounter + "/" + numPublish);
327             }
328 
329             PublishKey pk = new PublishKey(glob, currOid, "text/xml", "1.0");
330             if (domain != null) pk.setDomain(domain);
331             pk.setClientTags(org.xmlBlaster.util.ReplaceVariable.replaceAll(clientTags, "%counter", currCounter));
332             PublishQos pq = new PublishQos(glob);
333             pq.setPriority(priority);
334             pq.setPersistent(persistent);
335             pq.setLifeTime(lifeTime);
336             pq.setForceUpdate(forceUpdate);
337             pq.setForceDestroy(forceDestroy);
338             pq.setSubscribable(subscribable);
339             if (clientPropertyMap != null) {
340                Iterator it = clientPropertyMap.keySet().iterator();
341                while (it.hasNext()) {
342                   String key = (String)it.next();
343                   pq.addClientProperty(key, clientPropertyMap.get(key).toString());
344                }
345                //Example for a typed property:
346                //pq.getData().addClientProperty("ALONG", (new Long(12)));
347             }
348 
349             if (i == 0) {
350                TopicProperty topicProperty = new TopicProperty(glob);
351                topicProperty.setDestroyDelay(destroyDelay);
352                topicProperty.setCreateDomEntry(createDomEntry);
353                topicProperty.setReadonly(readonly);
354                if (historyMaxMsg >= 0L) {
355                   HistoryQueueProperty prop = new HistoryQueueProperty(this.glob, null);
356                   prop.setMaxEntries(historyMaxMsg);
357                   topicProperty.setHistoryQueueProperty(prop);
358                }
359                if (consumableQueue)
360                   topicProperty.setMsgDistributor("ConsumableQueue,1.0");
361                pq.setTopicProperty(topicProperty);
362                log.info("Added TopicProperty on first publish: " + topicProperty.toXml());
363             }
364 
365             if (destination != null) {
366                log.fine("Using destination: '" + destination + "'");
367                Destination dest = new Destination(glob, new SessionName(glob, destination));
368                dest.forceQueuing(forceQueuing);
369                pq.addDestination(dest);
370             }
371 
372             byte[] content;
373             if (contentSize >= 0) {
374                content = new byte[contentSize];
375                Random random = new Random();
376                for (int j=0; j<content.length; j++) {
377                   content[j] = (byte)(random.nextInt(96)+32);
378                   //content[j] = (byte)('X');
379                   //content[j] = (byte)(j % 255);
380                }
381             }
382             else if (contentFile != null && contentFile.length() > 0) {
383                content = FileLocator.readFile(contentFile);
384             }
385             else {
386                content = org.xmlBlaster.util.ReplaceVariable.replaceAll(contentStr, "%counter", ""+(i+1)).getBytes();
387             }
388 
389             if (log.isLoggable(Level.FINEST)) log.finest("Going to parse publish message: " + pk.toXml() + " : " + content + " : " + pq.toXml());
390             MsgUnit msgUnit = new MsgUnit(pk, content, pq);
391             if (log.isLoggable(Level.FINEST)) log.finest("Going to publish message: " + msgUnit.toXml());
392 
393             if (oneway) {
394                MsgUnit msgUnitArr[] = { msgUnit };
395                con.publishOneway(msgUnitArr);
396                log.info("#" + (i+1) + "/" + numPublish +
397                          ": Published oneway message '" + msgUnit.getKeyOid() + "'");
398             }
399             else {
400                PublishReturnQos prq = con.publish(msgUnit);
401                if (log.isLoggable(Level.FINEST)) log.finest("Returned: " + prq.toXml());
402 
403                log.info("#" + currCounter + "/" + numPublish +
404                          ": Got status='" + prq.getState() +
405                          (prq.getData().hasStateInfo()?"' '" + prq.getStateInfo():"") +
406                          "' rcvTimestamp=" + prq.getRcvTimestamp() +
407                          " for published message '" + prq.getKeyOid() + "'");
408             }
409          }
410          log.info("Elapsed since starting to publish: " + stopWatch.nice(numPublish));
411 
412          if (erase) {
413             char ret = 0;
414             if (interactive) {
415                ret = (char)Global.waitOnKeyboardHit("Hit 'e' to erase topic '"+oid+"', or any other key to keep the topic");
416             }
417 
418             if (ret == 0 || ret == 'e') {
419                EraseKey ek = new EraseKey(glob, oid);
420                if (domain != null) ek.setDomain(domain);
421                EraseQos eq = new EraseQos(glob);
422                eq.setForceDestroy(eraseForceDestroy);
423                if (log.isLoggable(Level.FINEST)) log.finest("Going to erase the topic: " + ek.toXml() + eq.toXml());
424                /*EraseReturnQos[] eraseArr =*/con.erase(ek, eq);
425                log.info("Erase success");
426             }
427          }
428 
429          char ret = 0;
430          if (interactive) {
431             boolean hasQueued = con.getQueue().getNumOfEntries() > 0;
432             while (ret != 'l' && ret != 'd')
433                ret = (char)Global.waitOnKeyboardHit("Hit 'l' to leave server, 'd' to disconnect" + (hasQueued ? "(and destroy client side entries)" : ""));
434          }
435 
436 
437          if (ret == 0 || ret == 'd') {
438             DisconnectQos dq = new DisconnectQos(glob);
439             dq.clearClientQueue(true);
440             con.disconnect(dq);
441             log.info("Disconnected from server, all resources released");
442          }
443          else {
444             con.leaveServer(null);
445             ret = 0;
446             if (interactive) {
447                while (ret != 'q')
448                   ret = (char)Global.waitOnKeyboardHit("Hit 'q' to quit");
449             }
450             log.info("Left server, our server side session remains, bye");
451