1 // xmlBlaster/demo/javaclients/HelloWorldSubscribe.java
2 package javaclients;
3
4 import java.util.logging.Logger;
5 import java.util.logging.Level;
6
7 import org.xmlBlaster.util.FileLocator;
8 import org.xmlBlaster.util.Global;
9 import org.xmlBlaster.util.XmlBlasterException;
10 import org.xmlBlaster.util.def.Constants;
11 import org.xmlBlaster.util.def.ErrorCode;
12 import org.xmlBlaster.util.qos.HistoryQos;
13 import org.xmlBlaster.util.qos.ClientProperty;
14 import org.xmlBlaster.client.qos.ConnectQos;
15 import org.xmlBlaster.client.qos.ConnectReturnQos;
16 import org.xmlBlaster.client.qos.DisconnectQos;
17 import org.xmlBlaster.client.I_Callback;
18 import org.xmlBlaster.client.key.UpdateKey;
19 import org.xmlBlaster.client.key.SubscribeKey;
20 import org.xmlBlaster.client.key.UnSubscribeKey;
21 import org.xmlBlaster.client.qos.UpdateQos;
22 import org.xmlBlaster.client.qos.SubscribeQos;
23 import org.xmlBlaster.client.qos.SubscribeReturnQos;
24 import org.xmlBlaster.client.qos.UnSubscribeQos;
25 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
26 import org.xmlBlaster.util.qos.AccessFilterQos;
27 import org.xmlBlaster.client.I_XmlBlasterAccess;
28 import org.xmlBlaster.client.I_ConnectionStateListener;
29 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
30 import java.util.Map;
31 import java.util.Iterator;
32 import java.io.File;
33 import java.text.DateFormat;
34 import java.text.SimpleDateFormat;
35 import java.util.TimeZone;
36 import java.util.Date;
37
38
39 /**
40 * This client connects to xmlBlaster and subscribes to messages.
41 * <p>
42 * This is a nice client to experiment and play with xmlBlaster as there are many
43 * command line options to specify the type and amount of messages published.
44 * </p>
45 * <p>
46 * Try using 'java javaclients.HelloWorldPublish' in another window to publish some
47 * messages.
48 * Further you can type 'd' in the window running xmlBlaster to get a server dump.
49 * </p>
50 *
51 * Invoke (after starting the xmlBlaster server):
52 * <pre>
53 * java javaclients.HelloWorldSubscribe -xpath //key -initialUpdate true -unSubscribe true
54 *
55 * java javaclients.HelloWorldSubscribe -interactive false -oid Hello -initialUpdate true -unSubscribe true
56 *
57 * java javaclients.HelloWorldSubscribe -session.name joeSubscriber/5 -passwd secret -initialUpdate true -dump[HelloWorldSubscribe] true
58 *
59 * java javaclients.HelloWorldSubscribe -xpath //key -filter.type GnuRegexFilter -filter.query "^__sys__jdbc.*"
60 *
61 * java javaclients.HelloWorldSubscribe -xpath //key -filter.type XPathFilter -filter.query "//tomato"
62 *
63 * java javaclients.HelloWorldSubscribe -xpath //key -filter.type ContentLenFilter -filter.query "10"
64 * </pre>
65 * <p>
66 * If unSubscribe=false the message is not unsubscribed at the end, if disconnect=false we don't logout at the end.
67 * </p>
68 * @see java javaclients.HelloWorldPublish
69 * @see java javaclients.HelloWorldGet
70 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
71 */
72 public class HelloWorldSubscribe implements I_Callback
73 {
74 private final String ME = "HelloWorldSubscribe";
75 private final Global glob;
76 private static Logger log = Logger.getLogger(HelloWorldSubscribe.class.getName());
77 private I_XmlBlasterAccess con;
78 private SubscribeReturnQos srq;
79 private String subscribeServerId;
80 private int updateCounter;
81 private boolean connectPersistent;
82 private boolean firstConnect=true;
83 private boolean interactive;
84 private boolean interactiveUpdate;
85 private long updateSleep;
86 private String updateExceptionErrorCode;
87 private String updateExceptionMessage;
88 private String updateExceptionRuntime;
89 private boolean shutdownCbServer;
90 private String oid;
91 private String domain;
92 private String xpath;
93 private boolean multiSubscribe;
94 private boolean persistentSubscribe;
95 private boolean notifyOnErase;
96 private boolean local;
97 private boolean initialUpdate;
98 private boolean updateOneway;
99 private boolean wantContent;
100 private boolean dumpToFile;
101 // only IF dumpToFile==true:
102 private String fileExtension;
103 private String filePrefix;
104 private String fileDateFormat;
105 private volatile DateFormat formatter;
106 private String fileLock;
107 private String fileHeader;
108 private int historyNumUpdates;
109 private boolean historyNewestFirst;
110 private String filterType;
111 private String filterVersion;
112 private String filterQuery;
113 private boolean unSubscribe;
114 private int maxContentLength;
115 private boolean connectRefreshSession;
116 private boolean runAsDaemon;
117 private boolean dumpToConsole;
118 private Map clientPropertyMap;
119 private Map connectQosClientPropertyMap;
120
121 private void readEnv() {
122 this.connectPersistent = glob.getProperty().get("connect/qos/persistent", false);
123 this.interactive = glob.getProperty().get("interactive", true);
124 this.interactiveUpdate = glob.getProperty().get("interactiveUpdate", false);
125 this.updateSleep = glob.getProperty().get("updateSleep", 0L);
126 this.updateExceptionErrorCode = glob.getProperty().get("updateException.errorCode", (String)null);
127 this.updateExceptionMessage = glob.getProperty().get("updateException.message", (String)null);
128 this.updateExceptionRuntime = glob.getProperty().get("updateException.runtime", (String)null);
129 this.shutdownCbServer = glob.getProperty().get("shutdownCbServer", false);
130 this.oid = glob.getProperty().get("oid", "");
131 this.domain = glob.getProperty().get("domain", "");
132 this.xpath = glob.getProperty().get("xpath", "");
133 this.multiSubscribe = glob.getProperty().get("multiSubscribe", true);
134 this.persistentSubscribe = glob.getProperty().get("persistentSubscribe", false);
135 this.notifyOnErase = glob.getProperty().get("notifyOnErase", true);
136 this.local = glob.getProperty().get("local", true);
137 this.initialUpdate = glob.getProperty().get("initialUpdate", true);
138 this.updateOneway = glob.getProperty().get("updateOneway", false);
139 this.wantContent = glob.getProperty().get("wantContent", true);
140 this.dumpToFile = glob.getProperty().get("dumpToFile", false);
141 // only IF dumpToFile==true:
142 this.fileExtension = glob.getProperty().get("fileExtension", ""); // for example ".jpg"
143 this.filePrefix = glob.getProperty().get("filePrefix", ""); // Fixed file name instead of topic as file name
144 this.fileDateFormat = glob.getProperty().get("fileDateFormat", "yyyy-MM-dd'T'HHmmss.S"); // How to format the date of the file name (ISO 8601)
145 this.fileLock = glob.getProperty().get("fileLock", ""); // add extension for lock file during Fixed file name instead of topic as file name, ".lck"
146 this.fileHeader = glob.getProperty().get("fileHeader", ""); // add a header text to the file, e.g. "<?xml version='1.0' encoding='UTF-8' ?>\n"
147 this.historyNumUpdates = glob.getProperty().get("historyNumUpdates", 1);
148 this.historyNewestFirst = glob.getProperty().get("historyNewestFirst", true);
149 this.filterType = glob.getProperty().get("filter.type", "GnuRegexFilter");// XPathFilter | ContentLenFilter
150 this.filterVersion = glob.getProperty().get("filter.version", "1.0");
151 this.filterQuery = glob.getProperty().get("filter.query", "");
152 this.unSubscribe = glob.getProperty().get("unSubscribe", true);
153 this.maxContentLength = glob.getProperty().get("maxContentLength", 250);
154 this.connectRefreshSession = glob.getProperty().get("connect/qos/sessionRefresh", false);
155 this.runAsDaemon = glob.getProperty().get("runAsDaemon", false);
156 this.dumpToConsole = glob.getProperty().get("dumpToConsole", true);
157 this.clientPropertyMap = glob.getProperty().get("clientProperty", (Map)null);
158 this.connectQosClientPropertyMap = glob.getProperty().get("connect/qos/clientProperty", (Map)null);
159 }
160
161 public HelloWorldSubscribe(Global glob_) {
162 this.glob = glob_;
163
164 boolean disconnect = glob.getProperty().get("disconnect", true);
165 try {
166 readEnv();
167
168 if (oid.length() < 1 && xpath.length() < 1) {
169 log.warning("No -oid or -xpath given, we subscribe to oid='Hello'.");
170 oid = "Hello";
171 }
172
173 if (this.updateSleep > 0L && interactiveUpdate == true) {
174 log.warning("You can't set 'updateSleep' and 'interactiveUpdate' simultaneous, we reset interactiveUpdate to false");
175 this.interactiveUpdate = false;
176 }
177
178 if (this.updateExceptionErrorCode != null && this.updateExceptionRuntime != null) {
179 log.warning("You can't throw a runtime and an XmlBlasterException simultaneous, please check your settings " +
180 " -updateException.errorCode and -updateException.runtime");
181 this.updateExceptionRuntime = null;
182 }
183
184 log.info("Used settings are:");
185 log.info(" -connect/qos/persistent " + connectPersistent);
186 log.info(" -connect/qos/sessionRefresh " + connectRefreshSession);
187 if (connectQosClientPropertyMap != null) {
188 Iterator it = connectQosClientPropertyMap.keySet().iterator();
189 while (it.hasNext()) {
190 String key = (String)it.next();
191 log.info(" -connect/qos/clientProperty["+key+"] " + connectQosClientPropertyMap.get(key).toString());
192 }
193 }
194 else {
195 log.info(" -connect/qos/clientProperty[] ");
196 }
197 log.info(" -interactive " + interactive);
198 log.info(" -interactiveUpdate " + this.interactiveUpdate);
199 log.info(" -updateSleep " + this.updateSleep);
200 log.info(" -updateException.errorCode " + this.updateExceptionErrorCode);
201 log.info(" -updateException.message " + this.updateExceptionMessage);
202 log.info(" -updateException.runtime " + this.updateExceptionRuntime);
203 log.info(" -shutdownCbServer " + shutdownCbServer);
204 log.info(" -oid " + oid);
205 log.info(" -domain " + domain);
206 log.info(" -xpath " + xpath);
207 log.info(" -multiSubscribe " + multiSubscribe);
208 log.info(" -persistentSubscribe " + persistentSubscribe);
209 log.info(" -notifyOnErase " + notifyOnErase);
210 log.info(" -local " + local);
211 log.info(" -initialUpdate " + initialUpdate);
212 log.info(" -updateOneway " + updateOneway);
213 log.info(" -historyNumUpdates " + historyNumUpdates);
214 log.info(" -historyNewestFirst " + historyNewestFirst);
215 log.info(" -wantContent " + wantContent);
216 log.info(" -dumpToFile " + dumpToFile);
217 log.info(" -fileExtension " + fileExtension);
218 log.info(" -unSubscribe " + unSubscribe);
219 log.info(" -disconnect " + disconnect);
220 log.info(" -filter.type " + filterType);
221 log.info(" -filter.version " + filterVersion);
222 log.info(" -filter.query " + filterQuery);
223 if (this.clientPropertyMap != null) {
224 Iterator it = this.clientPropertyMap.keySet().iterator();
225 while (it.hasNext()) {
226 String key = (String)it.next();
227 log.info(" -clientProperty["+key+"] " + this.clientPropertyMap.get(key).toString());
228 }
229 }
230 else {
231 log.info(" -clientProperty[] ");
232 }
233
234 log.info("For more info please read:");
235 log.info(" http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.subscribe.html");
236
237 con = glob.getXmlBlasterAccess();
238
239 // Do fail safe handling:
240 con.registerConnectionListener(new I_ConnectionStateListener() {
241 public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
242 if (connection.getConnectReturnQos().isReconnected())
243 log.info("I_ConnectionStateListener.reachedAlive(): Same server instance found");
244 else
245 log.info("I_ConnectionStateListener.reachedAlive(): New server instance found, connected to " +
246 connection.getConnectReturnQos().getSessionName());
247
248 if (connection.getQueue().getNumOfEntries() > 0) {
249 log.info("I_ConnectionStateListener.reachedAlive(): Queue contains " +
250 connection.getQueue().getNumOfEntries() + " messages: " +
251 connection.getQueue().toXml(""));
252 // connection.getQueue().clear(); -> Would destroy ConnectQos if new connected
253 }
254
255 String id = connection.getConnectReturnQos().getSecretSessionId() + connection.getConnectReturnQos().getServerInstanceId();
256
257 if (!firstConnect && (subscribeServerId == null ||
258 !subscribeServerId.equals(id) && !persistentSubscribe)) {
259 subscribe(); // We lost the old subscription, initialize subscription again
260 }
261 }
262 public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
263 log.warning("I_ConnectionStateListener.reachedPolling(): No connection to " + glob.getId() + ", we are polling ...");
264 }
265 public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
266 log.severe("I_ConnectionStateListener.reachedDead(): Connection to " + glob.getId() + " is dead, good bye");
267 System.exit(1);
268 }
269 });
270
271 // ConnectQos checks -session.name and -passwd from command line
272 ConnectQos qos = new ConnectQos(glob);
273 qos.setPersistent(connectPersistent);
274 qos.setRefreshSession(connectRefreshSession);
275 if (connectQosClientPropertyMap != null) {
276 Iterator it = connectQosClientPropertyMap.keySet().iterator();
277 while (it.hasNext()) {
278 String key = (String)it.next();
279 qos.addClientProperty(key, connectQosClientPropertyMap.get(key).toString());
280 }
281 }
282 log.info("ConnectQos is " + qos.toXml());
283 ConnectReturnQos crq = con.connect(qos, this); // Login to xmlBlaster, register for updates
284 // crq can be null if '-dispatch/connection/doSendConnect false' is set
285 log.info("Connect success as " + ((crq==null)?" faked connect":crq.toXml()));
286
287 subscribe(); // first time
288
289 if (shutdownCbServer) {
290 Global.waitOnKeyboardHit("Hit a key to shutdown callback server");
291 con.getCbServer().shutdown();
292 log.info("Callback server halted, no update should arrive ...");
293 /*
294 for (int ii=0; ii<4; ii++) {
295 Global.waitOnKeyboardHit("Hit a key to publish " + ii + "/4 ...");
296 org.xmlBlaster.util.MsgUnit msgUnit = new org.xmlBlaster.util.MsgUnit("<key oid='FromSubscriber'/>", (new String("BLA")).getBytes(), "<qos/>");
297 con.publish(msgUnit);
298 log.info("Published message");
299 }
300 */
301 }
302 else {
303 log.info("Waiting on update ...");
304 }
305
306 if (interactiveUpdate) {
307 try { Thread.sleep(1000000000); } catch( InterruptedException i) {}
308 }
309
310 char ret = 0;
311 if (unSubscribe && srq!=null) {
312 if (interactive) {
313 while (ret != 'q' && ret != 'u')
314 ret = (char)Global.waitOnKeyboardHit("Hit 'u' to unSubscribe, 'q' to quit");
315 }
316
317 if (ret == 0 || ret == 'u') {
318 UnSubscribeKey uk = new UnSubscribeKey(glob, srq.getSubscriptionId());
319 if (domain.length() > 0) // cluster routing information
320 uk.setDomain(domain);
321 UnSubscribeQos uq = new UnSubscribeQos(glob);
322 log.info("UnSubscribeKey=\n" + uk.toXml());
323 log.info("UnSubscribeQos=\n" + uq.toXml());
324 UnSubscribeReturnQos[] urqArr = con.unSubscribe(uk, uq);
325 log.info("UnSubscribe on " + urqArr.length + " subscriptions done");
326 }
327 }
328
329 if (runAsDaemon) {
330 while (true) {
331 try {
332 Thread.sleep(1000000000L);
333 }
334 catch (Exception e) {}
335 }
336 }
337 else {
338 if (ret != 'q')
339 Global.waitOnKeyboardHit("Hit a key to exit");
340 }
341 }
342 catch (XmlBlasterException e) {
343 log.severe(e.getMessage());
344 }
345 catch (Exception e) {
346 e.printStackTrace();
347 log.severe(e.toString());
348 }
349 finally {
350 if (con != null && disconnect) {
351 DisconnectQos dq = new DisconnectQos(glob);
352 con.disconnect(dq);
353 log.info("Disconnected");
354 }
355 }
356 }
357
358 /**
359 * Does the xmlBlaster subscribe.
360 */
361 private void subscribe() {
362 try {
363 SubscribeKey sk = null;
364 String qStr = null;
365 if (oid.length() > 0) {
366 sk = new SubscribeKey(glob, oid);
367 qStr = oid;
368 }
369 else if (xpath.length() > 0) {
370 sk = new SubscribeKey(glob, xpath, Constants.XPATH);
371 qStr = xpath;
372 }
373 if (domain.length() > 0) { // cluster routing information
374 if (sk == null) sk = new SubscribeKey(glob, "", Constants.DOMAIN); // usually never
375 sk.setDomain(domain);
376 qStr = domain;
377 }
378 SubscribeQos sq = new SubscribeQos(glob);
379 sq.setWantInitialUpdate(initialUpdate);
380 sq.setWantUpdateOneway(updateOneway);
381 sq.setMultiSubscribe(multiSubscribe);
382 sq.setPersistent(persistentSubscribe);
383 sq.setWantNotify(notifyOnErase);
384 sq.setWantLocal(local);
385 sq.setWantContent(wantContent);
386
387 HistoryQos historyQos = new HistoryQos(glob);
388 historyQos.setNumEntries(historyNumUpdates);
389 historyQos.setNewestFirst(historyNewestFirst);
390 sq.setHistoryQos(historyQos);
391
392 if (filterQuery.length() > 0) {
393 AccessFilterQos filter = new AccessFilterQos(glob, filterType, filterVersion, filterQuery);
394 sq.addAccessFilter(filter);
395 }
396 if (clientPropertyMap != null) {
397 Iterator it = clientPropertyMap.keySet().iterator();
398 while (it.hasNext()) {
399 String key = (String)it.next();
400 sq.addClientProperty(key, clientPropertyMap.get(key).toString());
401 }
402 }
403
404 log.info("SubscribeKey=\n" + sk.toXml());
405 log.info("SubscribeQos=\n" + sq.toXml());
406
407 if (firstConnect && interactive) {
408 Global.waitOnKeyboardHit("Hit a key to subscribe '" + qStr + "'");
409 }
410 firstConnect = false;
411
412 this.srq = con.subscribe(sk, sq);
413
414 subscribeServerId = con.getConnectReturnQos().getSecretSessionId() + con.getConnectReturnQos().getServerInstanceId();
415
416 log.info("Subscribed on topic '" + ((oid.length() > 0) ? oid : xpath) +
417 "', got subscription id='" + this.srq.getSubscriptionId() + "'\n" + this.srq.toXml());
418 if (log.isLoggable(Level.FINEST)) log.finest("Subscribed: " + sk.toXml() + sq.toXml() + srq.toXml());
419 }
420 catch (XmlBlasterException e) {
421 log.severe(e.getMessage());
422 }
423 }
424
425 /**
426 * Here the messages from xmlBlaster arrive.
427 */
428 public String update(String cbSessionId, UpdateKey updateKey, byte[] content,
429 UpdateQos updateQos) throws XmlBlasterException {
430 if (updateQos.isErased() && oid.length() > 0) { // Erased topic with EXACT subscription?
431 if (dumpToConsole) {
432 System.out.println("============= Topic '" + updateKey.getOid() + "' is ERASED =======================");
433 System.out.println(updateKey.toXml());
434 }
435 subscribe(); // topic is erased -> re-subsribe
436 return Constants.RET_OK; // "<qos><state id='OK'/></qos>";
437 }
438 ++updateCounter;
439 if (dumpToConsole) {
440 System.out.println("");
441 System.out.println("============= START #" + updateCounter + " '" + updateKey.getOid() + "' =======================");
442 log.info("Receiving update #" + updateCounter + " of a message ...");
443 System.out.println("<xmlBlaster>");
444 System.out.println(updateKey.toXml());
445 System.out.println("");
446 System.out.println("<content size='"+content.length+"'>");
447 if (maxContentLength < 0 || content.length < maxContentLength) {
448 System.out.println(new String(content));
449 }
450 else {
451 String str = new String(content, 0,maxContentLength-5);
452 System.out.println(str + " ...");
453 }
454 System.out.println("</content>");
455 System.out.println(updateQos.toXml());
456 System.out.println("</xmlBlaster>");
457 }
458
459 if (dumpToFile) {
460 String pre = (this.filePrefix.length() > 0) ? this.filePrefix : (updateKey.getOid() + "-");
461 String time = formatDate(updateQos.getRcvTimestamp().getMillis()); // 2005-06-15T052536
462 String fileName = pre + time;
463 if (fileExtension != null && fileExtension.length() > 0) {
464 fileName += fileExtension;
465 }
466 String lckFile = "";
467 if (this.fileLock.length() > 0) {
468 lckFile = fileName + this.fileLock;
469 }
470 try {
471 if (lckFile.length() > 0) {
472 FileLocator.writeFile(lckFile, "Writing " +