1 // xmlBlaster/demo/javaclients/HelloWorldPublish.java
2 package javaclients;
3
4 import java.io.UnsupportedEncodingException;
5 import java.util.Iterator;
6 import java.util.Map;
7 import java.util.Random;
8 import java.util.logging.Level;
9 import java.util.logging.Logger;
10
11 import org.xmlBlaster.client.I_Callback;
12 import org.xmlBlaster.client.I_ConnectionStateListener;
13 import org.xmlBlaster.client.I_XmlBlasterAccess;
14 import org.xmlBlaster.client.key.EraseKey;
15 import org.xmlBlaster.client.key.PublishKey;
16 import org.xmlBlaster.client.key.UpdateKey;
17 import org.xmlBlaster.client.qos.ConnectQos;
18 import org.xmlBlaster.client.qos.ConnectReturnQos;
19 import org.xmlBlaster.client.qos.DisconnectQos;
20 import org.xmlBlaster.client.qos.EraseQos;
21 import org.xmlBlaster.client.qos.PublishQos;
22 import org.xmlBlaster.client.qos.PublishReturnQos;
23 import org.xmlBlaster.client.qos.UpdateQos;
24 import org.xmlBlaster.util.FileLocator;
25 import org.xmlBlaster.util.Global;
26 import org.xmlBlaster.util.MsgUnit;
27 import org.xmlBlaster.util.SessionName;
28 import org.xmlBlaster.util.Timestamp;
29 import org.xmlBlaster.util.XmlBlasterException;
30 import org.xmlBlaster.util.def.MethodName;
31 import org.xmlBlaster.util.def.PriorityEnum;
32 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
33 import org.xmlBlaster.util.dispatch.I_PostSendListener;
34 import org.xmlBlaster.util.qos.TopicProperty;
35 import org.xmlBlaster.util.qos.address.Destination;
36 import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;
37 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
38
39 /**
40 * This client connects to xmlBlaster and publishes a configurable amount of messages.
41 * <p>
42 * This is a nice client to experiment and play with xmlBlaster as there are many
43 * command line options to specify the type and amount of messages published.
44 * </p>
45 * <p>
46 * Try using 'java javaclients.HelloWorldSubscribe' in another window to subscribe to
47 * our messages.
48 * Further you can type 'd' in the window running xmlBlaster to get a server dump.
49 * </p>
50 *
51 * Invoke (after starting the xmlBlaster server):
52 * <pre>
53 *Publish manually 10 messages:
54 * java javaclients.HelloWorldPublish -interactive true -numPublish 10 -oid Hello -persistent true -erase true
55 *
56 *Publish automatically 10 messages and sleep 1 sec in between:
57 * java javaclients.HelloWorldPublish -interactive false -sleep 1000 -numPublish 10 -oid Hello -persistent true -erase true
58 *
59 *Publish automatically 10 different topics with different DOM entries:
60 * java javaclients.HelloWorldPublish -interactive false -numPublish 10 -oid Hello-%counter -clientTags "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>"
61 *
62 *Login as joe/5 and send one persistent message:
63 * java javaclients.HelloWorldPublish -session.name joe/5 -passwd secret -persistent true -dump[HelloWorldPublish] true
64 *
65 *Send a PtP message:
66 * java javaclients.HelloWorldPublish -destination jack/17 -forceQueuing true -persistent true -subscribable true
67 *
68 *Add some client properties which will be send in the qos to the receivers:
69 * java javaclients.HelloWorldPublish -clientProperty[transactionId] 0x23345 -clientProperty[myName] jack
70 *creates a publish Qos containing:
71 * <clientProperty name='transactionId'>0x23345</clientProperty>
72 * <clientProperty name='myName'>jack</clientProperty>
73 * </pre>
74 * <p>
75 * If interactive is false, the sleep gives the number of millis to sleep before publishing the next message.
76 * </p>
77 * <p>
78 * If erase=false the message is not erase at the end, if disconnect=false we don't logout at the end.
79 * </p>
80 * <p>
81 * You can add '%counter' to the clientTags or the content string, each occurrence will be replaced
82 * by the current message number.
83 * </p>
84 * @see javaclients.HelloWorldSubscribe
85 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
86 */
87 public class HelloWorldPublish
88 {
89 private final Global glob;
90 private static Logger log = Logger.getLogger(HelloWorldPublish.class.getName());
91
92 public HelloWorldPublish(Global glob) {
93 this.glob = glob;
94
95 try {
96 boolean interactive = glob.getProperty().get("interactive", true);
97 boolean oneway = glob.getProperty().get("oneway", false);
98 long sleep = glob.getProperty().get("sleep", 1000L);
99 int numPublish = glob.getProperty().get("numPublish", 2000);
100 String oid = glob.getProperty().get("oid", "Hello"); // "HelloTopic_#%counter"
101 String domain = glob.getProperty().get("domain", (String)null);
102 String clientTags = glob.getProperty().get("clientTags", "<org.xmlBlaster><demo/></org.xmlBlaster>");
103 //String clientTags = glob.getProperty().get("clientTags", "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>");
104 String contentStr = glob.getProperty().get("content", "Hi-%counter");
105 String contentFile = glob.getProperty().get("contentFile", (String)null);
106 PriorityEnum priority = PriorityEnum.toPriorityEnum(glob.getProperty().get("priority", PriorityEnum.NORM_PRIORITY.getInt()));
107 boolean persistent = glob.getProperty().get("persistent", true);
108 long lifeTime = glob.getProperty().get("lifeTime", -1L);
109 boolean forceUpdate = glob.getProperty().get("forceUpdate", true);
110 boolean forceDestroy = glob.getProperty().get("forceDestroy", false);
111 boolean readonly = glob.getProperty().get("readonly", false);
112 long destroyDelay = glob.getProperty().get("destroyDelay", 60000L);
113 boolean createDomEntry = glob.getProperty().get("createDomEntry", true);
114 boolean consumableQueue = glob.getProperty().get("consumableQueue", false);
115 long historyMaxMsg = glob.getProperty().get("queue/history/maxEntries", -1L);
116 boolean forceQueuing = glob.getProperty().get("forceQueuing", true);
117 boolean subscribable = glob.getProperty().get("subscribable", true);
118 String destination = glob.getProperty().get("destination", (String)null);
119 boolean erase = glob.getProperty().get("erase", true);
120 boolean disconnect = glob.getProperty().get("disconnect", true);
121 final boolean eraseTailback = glob.getProperty().get("eraseTailback", false);
122 int contentSize = glob.getProperty().get("contentSize", -1); // 2000000);
123 boolean eraseForceDestroy = glob.getProperty().get("erase.forceDestroy", false);
124 boolean connectPersistent = glob.getProperty().get("connect/qos/persistent", false);
125
126 Map clientPropertyMap = glob.getProperty().get("clientProperty", (Map)null);
127 Map connectQosClientPropertyMap = glob.getProperty().get("connect/qos/clientProperty", (Map)null);
128
129 if (historyMaxMsg < 1 && !glob.getProperty().propertyExists("destroyDelay"))
130 destroyDelay = 24L*60L*60L*1000L; // Increase destroyDelay to one day if no history queue is used
131
132 log.info("Used settings are:");
133 log.info(" -interactive " + interactive);
134 log.info(" -sleep " + Timestamp.millisToNice(sleep));
135 log.info(" -oneway " + oneway);
136 log.info(" -erase " + erase);
137 log.info(" -disconnect " + disconnect);
138 log.info(" -eraseTailback " + eraseTailback);
139 log.info(" Pub/Sub settings");
140 log.info(" -numPublish " + numPublish);
141 log.info(" -oid " + oid);
142 log.info(" -clientTags " + clientTags);
143 log.info(" -domain " + ((domain==null)?"":domain));
144 if (contentSize >= 0) {
145 log.info(" -content [generated]");
146 log.info(" -contentSize " + contentSize);
147 }
148 //else if (contentFile != null && contentFile.length() > 0) {
149 // log.info(" -contentFile " + contentFile);
150 //}
151 else {
152 log.info(" -content " + contentStr);
153 log.info(" -contentSize " + contentStr.length());
154 log.info(" -contentFile " + contentFile);
155 }
156 log.info(" -priority " + priority.toString());
157 log.info(" -persistent " + persistent);
158 log.info(" -lifeTime " + Timestamp.millisToNice(lifeTime));
159 log.info(" -forceUpdate " + forceUpdate);
160 log.info(" -forceDestroy " + forceDestroy);
161 if (clientPropertyMap != null) {
162 Iterator it = clientPropertyMap.keySet().iterator();
163 while (it.hasNext()) {
164 String key = (String)it.next();
165 log.info(" -clientProperty["+key+"] " + clientPropertyMap.get(key).toString());
166 }
167 }
168 else {
169 log.info(" -clientProperty[] ");
170 }
171 log.info(" Topic settings");
172 log.info(" -readonly " + readonly);
173 log.info(" -destroyDelay " + Timestamp.millisToNice(destroyDelay));
174 log.info(" -createDomEntry " + createDomEntry);
175 log.info(" -queue/history/maxEntries " + historyMaxMsg);
176 log.info(" -consumableQueue " + consumableQueue);
177 log.info(" PtP settings");
178 log.info(" -subscribable " + subscribable);
179 log.info(" -forceQueuing " + forceQueuing);
180 log.info(" -destination " + destination);
181 log.info(" Erase settings");
182 log.info(" -erase.forceDestroy " + eraseForceDestroy);
183 log.info(" -erase.domain " + ((domain==null)?"":domain));
184 log.info(" ConnectQos settings");
185 log.info(" -connect/qos/persistent " + connectPersistent);
186 if (connectQosClientPropertyMap != null) {
187 Iterator it = connectQosClientPropertyMap.keySet().iterator();
188 while (it.hasNext()) {
189 String key = (String)it.next();
190 log.info(" -connect/qos/clientProperty["+key+"] " + connectQosClientPropertyMap.get(key).toString());
191 }
192 }
193 else {
194 log.info(" -connect/qos/clientProperty[] ");
195 }
196 log.info("For more info please read:");
197 log.info(" http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html");
198
199 I_XmlBlasterAccess con = glob.getXmlBlasterAccess();
200
201 // Handle lost server explicitly
202 con.registerConnectionListener(new I_ConnectionStateListener() {
203
204 public void reachedAlive(ConnectionStateEnum oldState,
205 I_XmlBlasterAccess connection) {
206 /*
207 ConnectReturnQos conRetQos = connection.getConnectReturnQos();
208 log.info("I_ConnectionStateListener: We were lucky, connected to " +
209 connection.getGlobal().getId() + " as " + conRetQos.getSessionName());
210 */
211 if (eraseTailback) {
212 log.info("Destroying " + connection.getQueue().getNumOfEntries() +
213 " client side tailback messages");
214 connection.getQueue().clear();
215 }
216 }
217 public void reachedPolling(ConnectionStateEnum oldState,
218 I_XmlBlasterAccess connection) {
219 log.warning("I_ConnectionStateListener: No connection to xmlBlaster server, we are polling ...");
220 }
221 public void reachedDead(ConnectionStateEnum oldState,
222 I_XmlBlasterAccess connection) {
223 log.warning("I_ConnectionStateListener: Connection from " +
224 connection.getGlobal().getId() + " to xmlBlaster is DEAD.");
225 //System.exit(1);
226 }
227 });
228
229 // This listener receives only events from asynchronously send messages from queue.
230 // e.g. after a reconnect when client side queued messages are delivered
231 con.registerPostSendListener(new I_PostSendListener() {
232 /**
233 * @see I_PostSendListener#postSend(MsgQueueEntry[])
234 */
235 public void postSend(MsgQueueEntry[] entries) {
236 try {
237 for (int i=0; i<entries.length; i++) {
238 if (MethodName.PUBLISH.equals(entries[i].getMethodName())) {
239 MsgUnit msg = entries[i].getMsgUnit();
240 PublishReturnQos retQos = (PublishReturnQos)entries[i].getReturnObj();
241 log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue: " + retQos.toXml());
242 }
243 else
244 log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
245 }
246 } catch (Throwable e) {
247 e.printStackTrace();
248 }
249 }
250
251 /**
252 * @see I_PostSendListener#sendingFailed(MsgQueueEntry[], XmlBlasterException)
253 */
254 public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException ex) {
255 try {
256 for (int i=0; i<entries.length; i++) {
257 if (MethodName.PUBLISH.equals(entries[i].getMethodName())) {
258 MsgUnit msg = entries[i].getMsgUnit();
259 log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue failed: " + ex.getMessage());
260 }
261 else
262 log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
263 }
264 } catch (Throwable e) {
265 e.printStackTrace();
266 }
267 //return true; // true: We have handled the case (safely stored the message) and it may be removed from connection queue
268 return false; // false: Default error handling: message remains in queue and we go to dead
269 }
270 });
271
272 // ConnectQos checks -session.name and -passwd from command line
273 log.info("============= CreatingConnectQos");
274 ConnectQos qos = new ConnectQos(glob);
275 if (connectPersistent) {
276 qos.setPersistent(connectPersistent);
277 }
278 if (connectQosClientPropertyMap != null) {
279 Iterator it = connectQosClientPropertyMap.keySet().iterator();
280 while (it.hasNext()) {
281 String key = (String)it.next();
282 qos.addClientProperty(key, connectQosClientPropertyMap.get(key).toString());
283 }
284 }
285 log.info("ConnectQos is " + qos.toXml());
286 ConnectReturnQos crq = con.connect(qos, new I_Callback() {
287 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
288 try {
289 log.info("Received '" + updateKey.getOid() + "':" + new String(content, "UTF-8"));
290 } catch (UnsupportedEncodingException e) {
291 log.severe("Update failed: " + e.toString());
292 }
293 return "";
294 }
295 }); // Login to xmlBlaster, register for updates
296 log.info("Connect success as " + crq.toXml());
297
298 org.xmlBlaster.util.StopWatch stopWatch = new org.xmlBlaster.util.StopWatch();
299 for(int i=0; true; i++) {
300 if (numPublish != -1)
301 if (i>=numPublish)
302 break;
303
304 String currCounter = ""+(i+1);
305 if (numPublish > 0) { // Add leading zeros to have nice justified numbers in dump
306 String tmp = ""+numPublish;
307 int curLen = currCounter.length();
308 currCounter = "";
309 for (int j=curLen; j<tmp.length(); j++) {
310 currCounter += "0";
311 }
312 currCounter += (i+1);
313 }
314
315 String currOid = org.xmlBlaster.util.ReplaceVariable.replaceAll(oid, "%counter", currCounter);
316
317 if (interactive) {
318 char ret = (char)Global.waitOnKeyboardHit("Hit 'b' to break, hit other key to publish '" + currOid + "' #" + currCounter + "/" + numPublish);
319 if (ret == 'b')
320 break;
321 }
322 else {
323 if (sleep > 0 && i > 0) {
324 try { Thread.sleep(sleep); } catch( InterruptedException e) {}
325 }
326 log.info("Publish '" + currOid + "' #" + currCounter + "/" + numPublish);
327 }
328
329 PublishKey pk = new PublishKey(glob, currOid, "text/xml", "1.0");
330 if (domain != null) pk.setDomain(domain);
331 pk.setClientTags(org.xmlBlaster.util.ReplaceVariable.replaceAll(clientTags, "%counter", currCounter));
332 PublishQos pq = new PublishQos(glob);
333 pq.setPriority(priority);
334 pq.setPersistent(persistent);
335 pq.setLifeTime(lifeTime);
336 pq.setForceUpdate(forceUpdate);
337 pq.setForceDestroy(forceDestroy);
338 pq.setSubscribable(subscribable);
339 if (clientPropertyMap != null) {
340 Iterator it = clientPropertyMap.keySet().iterator();
341 while (it.hasNext()) {
342 String key = (String)it.next();
343 pq.addClientProperty(key, clientPropertyMap.get(key).toString());
344 }
345 //Example for a typed property:
346 //pq.getData().addClientProperty("ALONG", (new Long(12)));
347 }
348
349 if (i == 0) {
350 TopicProperty topicProperty = new TopicProperty(glob);
351 topicProperty.setDestroyDelay(destroyDelay);
352 topicProperty.setCreateDomEntry(createDomEntry);
353 topicProperty.setReadonly(readonly);
354 if (historyMaxMsg >= 0L) {
355 HistoryQueueProperty prop = new HistoryQueueProperty(this.glob, null);
356 prop.setMaxEntries(historyMaxMsg);
357 topicProperty.setHistoryQueueProperty(prop);
358 }
359 if (consumableQueue)
360 topicProperty.setMsgDistributor("ConsumableQueue,1.0");
361 pq.setTopicProperty(topicProperty);
362 log.info("Added TopicProperty on first publish: " + topicProperty.toXml());
363 }
364
365 if (destination != null) {
366 log.fine("Using destination: '" + destination + "'");
367 Destination dest = new Destination(glob, new SessionName(glob, destination));
368 dest.forceQueuing(forceQueuing);
369 pq.addDestination(dest);
370 }
371
372 byte[] content;
373 if (contentSize >= 0) {
374 content = new byte[contentSize];
375 Random random = new Random();
376 for (int j=0; j<content.length; j++) {
377 content[j] = (byte)(random.nextInt(96)+32);
378 //content[j] = (byte)('X');
379 //content[j] = (byte)(j % 255);
380 }
381 }
382 else if (contentFile != null && contentFile.length() > 0) {
383 content = FileLocator.readFile(contentFile);
384 }
385 else {
386 content = org.xmlBlaster.util.ReplaceVariable.replaceAll(contentStr, "%counter", ""+(i+1)).getBytes();
387 }
388
389 if (log.isLoggable(Level.FINEST)) log.finest("Going to parse publish message: " + pk.toXml() + " : " + content + " : " + pq.toXml());
390 MsgUnit msgUnit = new MsgUnit(pk, content, pq);
391 if (log.isLoggable(Level.FINEST)) log.finest("Going to publish message: " + msgUnit.toXml());
392
393 if (oneway) {
394 MsgUnit msgUnitArr[] = { msgUnit };
395 con.publishOneway(msgUnitArr);
396 log.info("#" + (i+1) + "/" + numPublish +
397 ": Published oneway message '" + msgUnit.getKeyOid() + "'");
398 }
399 else {
400 PublishReturnQos prq = con.publish(msgUnit);
401 if (log.isLoggable(Level.FINEST)) log.finest("Returned: " + prq.toXml());
402
403 log.info("#" + currCounter + "/" + numPublish +
404 ": Got status='" + prq.getState() +
405 (prq.getData().hasStateInfo()?"' '" + prq.getStateInfo():"") +
406 "' rcvTimestamp=" + prq.getRcvTimestamp() +
407 " for published message '" + prq.getKeyOid() + "'");
408 }
409 }
410 log.info("Elapsed since starting to publish: " + stopWatch.nice(numPublish));
411
412 if (erase) {
413 char ret = 0;
414 if (interactive) {
415 ret = (char)Global.waitOnKeyboardHit("Hit 'e' to erase topic '"+oid+"', or any other key to keep the topic");
416 }
417
418 if (ret == 0 || ret == 'e') {
419 EraseKey ek = new EraseKey(glob, oid);
420 if (domain != null) ek.setDomain(domain);
421 EraseQos eq = new EraseQos(glob);
422 eq.setForceDestroy(eraseForceDestroy);
423 if (log.isLoggable(Level.FINEST)) log.finest("Going to erase the topic: " + ek.toXml() + eq.toXml());
424 /*EraseReturnQos[] eraseArr =*/con.erase(ek, eq);
425 log.info("Erase success");
426 }
427 }
428
429 char ret = 0;
430 if (interactive) {
431 boolean hasQueued = con.getQueue().getNumOfEntries() > 0;
432 while (ret != 'l' && ret != 'd')
433 ret = (char)Global.waitOnKeyboardHit("Hit 'l' to leave server, 'd' to disconnect" + (hasQueued ? "(and destroy client side entries)" : ""));
434 }
435
436
437 if (ret == 0 || ret == 'd') {
438 DisconnectQos dq = new DisconnectQos(glob);
439 dq.clearClientQueue(true);
440 con.disconnect(dq);
441 log.info("Disconnected from server, all resources released");
442 }
443 else {
444 con.leaveServer(null);
445 ret = 0;
446 if (interactive) {
447 while (ret != 'q')
448 ret = (char)Global.waitOnKeyboardHit("Hit 'q' to quit");
449 }
450 log.info("Left server, our server side session remains, bye");
451