1 // xmlBlaster/demo/javaclients/HelloWorldSubscribe.java
  2 package javaclients;
  3 
  4 import java.util.logging.Logger;
  5 import java.util.logging.Level;
  6 
  7 import org.xmlBlaster.util.FileLocator;
  8 import org.xmlBlaster.util.Global;
  9 import org.xmlBlaster.util.XmlBlasterException;
 10 import org.xmlBlaster.util.def.Constants;
 11 import org.xmlBlaster.util.def.ErrorCode;
 12 import org.xmlBlaster.util.qos.HistoryQos;
 13 import org.xmlBlaster.util.qos.ClientProperty;
 14 import org.xmlBlaster.client.qos.ConnectQos;
 15 import org.xmlBlaster.client.qos.ConnectReturnQos;
 16 import org.xmlBlaster.client.qos.DisconnectQos;
 17 import org.xmlBlaster.client.I_Callback;
 18 import org.xmlBlaster.client.key.UpdateKey;
 19 import org.xmlBlaster.client.key.SubscribeKey;
 20 import org.xmlBlaster.client.key.UnSubscribeKey;
 21 import org.xmlBlaster.client.qos.UpdateQos;
 22 import org.xmlBlaster.client.qos.SubscribeQos;
 23 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 24 import org.xmlBlaster.client.qos.UnSubscribeQos;
 25 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
 26 import org.xmlBlaster.util.qos.AccessFilterQos;
 27 import org.xmlBlaster.client.I_XmlBlasterAccess;
 28 import org.xmlBlaster.client.I_ConnectionStateListener;
 29 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 30 import java.util.Map;
 31 import java.util.Iterator;
 32 import java.io.File;
 33 import java.text.DateFormat;
 34 import java.text.SimpleDateFormat;
 35 import java.util.TimeZone;
 36 import java.util.Date;
 37 
 38 
 39 /**
 40  * This client connects to xmlBlaster and subscribes to messages. 
 41  * <p>
 42  * This is a nice client to experiment and play with xmlBlaster as there are many
 43  * command line options to specify the type and amount of messages published.
 44  * </p>
 45  * <p>
 46  * Try using 'java javaclients.HelloWorldPublish' in another window to publish some
 47  * messages.
 48  * Further you can type 'd' in the window running xmlBlaster to get a server dump.
 49  * </p>
 50  *
 51  * Invoke (after starting the xmlBlaster server):
 52  * <pre>
 53  * java javaclients.HelloWorldSubscribe -xpath //key -initialUpdate true -unSubscribe true
 54  *
 55  * java javaclients.HelloWorldSubscribe -interactive false -oid Hello -initialUpdate true -unSubscribe true
 56  *
 57  * java javaclients.HelloWorldSubscribe -session.name joeSubscriber/5 -passwd secret -initialUpdate true -dump[HelloWorldSubscribe] true
 58  *
 59  * java javaclients.HelloWorldSubscribe -xpath //key -filter.type GnuRegexFilter -filter.query "^__sys__jdbc.*"
 60  *
 61  * java javaclients.HelloWorldSubscribe -xpath //key -filter.type XPathFilter -filter.query "//tomato"
 62  *
 63  * java javaclients.HelloWorldSubscribe -xpath //key -filter.type ContentLenFilter -filter.query "10"
 64  * </pre>
 65  * <p>
 66  * If unSubscribe=false the message is not unsubscribed at the end, if disconnect=false we don't logout at the end.
 67  * </p>
 68  * @see java javaclients.HelloWorldPublish
 69  * @see java javaclients.HelloWorldGet
 70  * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
 71  */
 72 public class HelloWorldSubscribe implements I_Callback
 73 {
 74    private final String ME = "HelloWorldSubscribe";
 75    private final Global glob;
 76    private static Logger log = Logger.getLogger(HelloWorldSubscribe.class.getName());
 77    private I_XmlBlasterAccess con;
 78    private SubscribeReturnQos srq;
 79    private String subscribeServerId;
 80    private int updateCounter;
 81    private boolean connectPersistent;
 82    private boolean firstConnect=true;
 83    private boolean interactive;
 84    private boolean interactiveUpdate;
 85    private long updateSleep;
 86    private String updateExceptionErrorCode;
 87    private String updateExceptionMessage;
 88    private String updateExceptionRuntime;
 89    private boolean shutdownCbServer;
 90    private String oid;
 91    private String domain;
 92    private String xpath;
 93    private boolean multiSubscribe;
 94    private boolean persistentSubscribe;
 95    private boolean notifyOnErase;
 96    private boolean local;
 97    private boolean initialUpdate;
 98    private boolean updateOneway;
 99    private boolean wantContent;
100    private boolean dumpToFile;
101    // only IF dumpToFile==true:
102    private String fileExtension;
103    private String filePrefix;
104    private String fileDateFormat;
105    private volatile DateFormat formatter;
106    private String fileLock;
107    private String fileHeader;
108    private int historyNumUpdates;
109    private boolean historyNewestFirst;
110    private String filterType;
111    private String filterVersion;
112    private String filterQuery;
113    private boolean unSubscribe;
114    private int maxContentLength;
115    private boolean connectRefreshSession;
116    private boolean runAsDaemon;
117    private boolean dumpToConsole;
118    private Map clientPropertyMap;
119    private Map connectQosClientPropertyMap;
120 
121    private void readEnv() {
122       this.connectPersistent = glob.getProperty().get("connect/qos/persistent", false);
123       this.interactive = glob.getProperty().get("interactive", true);
124       this.interactiveUpdate = glob.getProperty().get("interactiveUpdate", false);
125       this.updateSleep = glob.getProperty().get("updateSleep", 0L);
126       this.updateExceptionErrorCode = glob.getProperty().get("updateException.errorCode", (String)null);
127       this.updateExceptionMessage = glob.getProperty().get("updateException.message", (String)null);
128       this.updateExceptionRuntime = glob.getProperty().get("updateException.runtime", (String)null);
129       this.shutdownCbServer = glob.getProperty().get("shutdownCbServer", false);
130       this.oid = glob.getProperty().get("oid", "");
131       this.domain = glob.getProperty().get("domain", "");
132       this.xpath = glob.getProperty().get("xpath", "");
133       this.multiSubscribe = glob.getProperty().get("multiSubscribe", true);
134       this.persistentSubscribe = glob.getProperty().get("persistentSubscribe", false);
135       this.notifyOnErase = glob.getProperty().get("notifyOnErase", true);
136       this.local = glob.getProperty().get("local", true);
137       this.initialUpdate = glob.getProperty().get("initialUpdate", true);
138       this.updateOneway = glob.getProperty().get("updateOneway", false);
139       this.wantContent = glob.getProperty().get("wantContent", true);
140       this.dumpToFile = glob.getProperty().get("dumpToFile", false);
141       // only IF dumpToFile==true:
142       this.fileExtension = glob.getProperty().get("fileExtension", ""); // for example ".jpg"
143       this.filePrefix = glob.getProperty().get("filePrefix", "");       // Fixed file name instead of topic as file name
144       this.fileDateFormat = glob.getProperty().get("fileDateFormat", "yyyy-MM-dd'T'HHmmss.S"); // How to format the date of the file name (ISO 8601)
145       this.fileLock = glob.getProperty().get("fileLock", "");           // add extension for lock file during Fixed file name instead of topic as file name, ".lck"
146       this.fileHeader = glob.getProperty().get("fileHeader", "");       // add a header text to the file, e.g. "<?xml version='1.0' encoding='UTF-8' ?>\n"
147       this.historyNumUpdates = glob.getProperty().get("historyNumUpdates", 1);
148       this.historyNewestFirst = glob.getProperty().get("historyNewestFirst", true);
149       this.filterType = glob.getProperty().get("filter.type", "GnuRegexFilter");// XPathFilter | ContentLenFilter
150       this.filterVersion = glob.getProperty().get("filter.version", "1.0");
151       this.filterQuery = glob.getProperty().get("filter.query", "");
152       this.unSubscribe = glob.getProperty().get("unSubscribe", true);
153       this.maxContentLength = glob.getProperty().get("maxContentLength", 250);
154       this.connectRefreshSession = glob.getProperty().get("connect/qos/sessionRefresh", false);
155       this.runAsDaemon = glob.getProperty().get("runAsDaemon", false);
156       this.dumpToConsole = glob.getProperty().get("dumpToConsole", true);
157       this.clientPropertyMap = glob.getProperty().get("clientProperty", (Map)null);
158       this.connectQosClientPropertyMap = glob.getProperty().get("connect/qos/clientProperty", (Map)null);
159    }
160 
161    public HelloWorldSubscribe(Global glob_) {
162       this.glob = glob_;
163 
164       boolean disconnect = glob.getProperty().get("disconnect", true);
165       try {
166          readEnv();
167 
168          if (oid.length() < 1 && xpath.length() < 1) {
169             log.warning("No -oid or -xpath given, we subscribe to oid='Hello'.");
170             oid = "Hello";
171          }
172 
173          if (this.updateSleep > 0L && interactiveUpdate == true) {
174             log.warning("You can't set 'updateSleep' and  'interactiveUpdate' simultaneous, we reset interactiveUpdate to false");
175             this.interactiveUpdate = false;
176          }
177 
178          if (this.updateExceptionErrorCode != null && this.updateExceptionRuntime != null) {
179             log.warning("You can't throw a runtime and an XmlBlasterException simultaneous, please check your settings " +
180                           " -updateException.errorCode and -updateException.runtime");
181             this.updateExceptionRuntime = null;
182          }
183 
184          log.info("Used settings are:");
185          log.info("   -connect/qos/persistent     " + connectPersistent);
186          log.info("   -connect/qos/sessionRefresh " + connectRefreshSession);
187          if (connectQosClientPropertyMap != null) {
188             Iterator it = connectQosClientPropertyMap.keySet().iterator();
189             while (it.hasNext()) {
190                String key = (String)it.next();
191                log.info("   -connect/qos/clientProperty["+key+"]   " + connectQosClientPropertyMap.get(key).toString());
192             }
193          }
194          else {
195             log.info("   -connect/qos/clientProperty[]   ");
196          }
197          log.info("   -interactive       " + interactive);
198          log.info("   -interactiveUpdate " + this.interactiveUpdate);
199          log.info("   -updateSleep       " + this.updateSleep);
200          log.info("   -updateException.errorCode " + this.updateExceptionErrorCode);
201          log.info("   -updateException.message   " + this.updateExceptionMessage);
202          log.info("   -updateException.runtime   " + this.updateExceptionRuntime);
203          log.info("   -shutdownCbServer          " + shutdownCbServer);
204          log.info("   -oid               " + oid);
205          log.info("   -domain            " + domain);
206          log.info("   -xpath             " + xpath);
207          log.info("   -multiSubscribe    " + multiSubscribe);
208          log.info("   -persistentSubscribe " + persistentSubscribe);
209          log.info("   -notifyOnErase     " + notifyOnErase);
210          log.info("   -local             " + local);
211          log.info("   -initialUpdate     " + initialUpdate);
212          log.info("   -updateOneway      " + updateOneway);
213          log.info("   -historyNumUpdates " + historyNumUpdates);
214          log.info("   -historyNewestFirst " + historyNewestFirst);
215          log.info("   -wantContent       " + wantContent);
216          log.info("   -dumpToFile        " + dumpToFile);
217          log.info("   -fileExtension     " + fileExtension);
218          log.info("   -unSubscribe       " + unSubscribe);
219          log.info("   -disconnect        " + disconnect);
220          log.info("   -filter.type       " + filterType);
221          log.info("   -filter.version    " + filterVersion);
222          log.info("   -filter.query      " + filterQuery);
223          if (this.clientPropertyMap != null) {
224             Iterator it = this.clientPropertyMap.keySet().iterator();
225             while (it.hasNext()) {
226                String key = (String)it.next();
227                log.info("   -clientProperty["+key+"]   " + this.clientPropertyMap.get(key).toString());
228             }
229          }
230          else {
231             log.info("   -clientProperty[]   ");
232          }
233          
234          log.info("For more info please read:");
235          log.info("   http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.subscribe.html");
236 
237          con = glob.getXmlBlasterAccess();
238 
239          // Do fail safe handling:
240          con.registerConnectionListener(new I_ConnectionStateListener() {
241             public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
242                if (connection.getConnectReturnQos().isReconnected())
243                   log.info("I_ConnectionStateListener.reachedAlive(): Same server instance found");
244                else
245                   log.info("I_ConnectionStateListener.reachedAlive(): New server instance found, connected to " +
246                         connection.getConnectReturnQos().getSessionName());
247 
248                if (connection.getQueue().getNumOfEntries() > 0) {
249                   log.info("I_ConnectionStateListener.reachedAlive(): Queue contains " +
250                            connection.getQueue().getNumOfEntries() + " messages: " +
251                            connection.getQueue().toXml(""));
252                   // connection.getQueue().clear(); -> Would destroy ConnectQos if new connected
253                }
254 
255                String id = connection.getConnectReturnQos().getSecretSessionId() + connection.getConnectReturnQos().getServerInstanceId();
256 
257                if (!firstConnect && (subscribeServerId == null ||
258                    !subscribeServerId.equals(id) && !persistentSubscribe)) {
259                   subscribe(); // We lost the old subscription, initialize subscription again
260                }
261             }
262             public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
263                log.warning("I_ConnectionStateListener.reachedPolling(): No connection to " + glob.getId() + ", we are polling ...");
264             }
265             public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
266                log.severe("I_ConnectionStateListener.reachedDead(): Connection to " + glob.getId() + " is dead, good bye");
267                System.exit(1);
268             }
269          });
270 
271          // ConnectQos checks -session.name and -passwd from command line
272          ConnectQos qos = new ConnectQos(glob);
273          qos.setPersistent(connectPersistent);
274          qos.setRefreshSession(connectRefreshSession);
275          if (connectQosClientPropertyMap != null) {
276             Iterator it = connectQosClientPropertyMap.keySet().iterator();
277             while (it.hasNext()) {
278                String key = (String)it.next();
279                qos.addClientProperty(key, connectQosClientPropertyMap.get(key).toString());
280             }
281          }
282          log.info("ConnectQos is " + qos.toXml());
283          ConnectReturnQos crq = con.connect(qos, this);  // Login to xmlBlaster, register for updates
284          // crq can be null if '-dispatch/connection/doSendConnect false' is set
285          log.info("Connect success as " + ((crq==null)?" faked connect":crq.toXml()));
286 
287          subscribe(); // first time
288 
289          if (shutdownCbServer) {
290             Global.waitOnKeyboardHit("Hit a key to shutdown callback server");
291             con.getCbServer().shutdown();
292             log.info("Callback server halted, no update should arrive ...");
293             /*
294             for (int ii=0; ii<4; ii++) {
295                Global.waitOnKeyboardHit("Hit a key to publish " + ii + "/4 ...");
296                org.xmlBlaster.util.MsgUnit msgUnit = new org.xmlBlaster.util.MsgUnit("<key oid='FromSubscriber'/>", (new String("BLA")).getBytes(), "<qos/>");
297                con.publish(msgUnit);
298                log.info("Published message");
299             }
300             */
301          }
302          else {
303             log.info("Waiting on update ...");
304          }
305 
306          if (interactiveUpdate) {
307             try { Thread.sleep(1000000000); } catch( InterruptedException i) {}
308          }
309 
310          char ret = 0;
311          if (unSubscribe && srq!=null) {
312             if (interactive) {
313                while (ret != 'q' && ret != 'u')
314                   ret = (char)Global.waitOnKeyboardHit("Hit 'u' to unSubscribe, 'q' to quit");
315             }
316 
317             if (ret == 0 || ret == 'u') {
318                UnSubscribeKey uk = new UnSubscribeKey(glob, srq.getSubscriptionId());
319                if (domain.length() > 0)  // cluster routing information
320                   uk.setDomain(domain);
321                UnSubscribeQos uq = new UnSubscribeQos(glob);
322                log.info("UnSubscribeKey=\n" + uk.toXml());
323                log.info("UnSubscribeQos=\n" + uq.toXml());
324                UnSubscribeReturnQos[] urqArr = con.unSubscribe(uk, uq);
325                log.info("UnSubscribe on " + urqArr.length + " subscriptions done");
326             }
327          }
328 
329          if (runAsDaemon) {
330             while (true) {
331                try {
332                   Thread.sleep(1000000000L);
333                }
334                catch (Exception e) {}
335             }
336          }
337          else {
338             if (ret != 'q')
339                Global.waitOnKeyboardHit("Hit a key to exit");
340          }
341       }
342       catch (XmlBlasterException e) {
343          log.severe(e.getMessage());
344       }
345       catch (Exception e) {
346          e.printStackTrace();
347          log.severe(e.toString());
348       }
349       finally {
350          if (con != null && disconnect) {
351             DisconnectQos dq = new DisconnectQos(glob);
352             con.disconnect(dq);
353             log.info("Disconnected");
354          }
355       }
356    }
357 
358    /**
359     * Does the xmlBlaster subscribe. 
360     */
361    private void subscribe() {
362       try {
363          SubscribeKey sk = null;
364          String qStr = null;
365          if (oid.length() > 0) {
366             sk = new SubscribeKey(glob, oid);
367             qStr = oid;
368          }
369          else if (xpath.length() > 0) {
370             sk = new SubscribeKey(glob, xpath, Constants.XPATH);
371             qStr = xpath;
372          }
373          if (domain.length() > 0) {  // cluster routing information
374             if (sk == null) sk = new SubscribeKey(glob, "", Constants.DOMAIN); // usually never
375             sk.setDomain(domain);
376             qStr = domain;
377          }
378          SubscribeQos sq = new SubscribeQos(glob);
379          sq.setWantInitialUpdate(initialUpdate);
380          sq.setWantUpdateOneway(updateOneway);
381          sq.setMultiSubscribe(multiSubscribe);
382          sq.setPersistent(persistentSubscribe);
383          sq.setWantNotify(notifyOnErase);
384          sq.setWantLocal(local);
385          sq.setWantContent(wantContent);
386          
387          HistoryQos historyQos = new HistoryQos(glob);
388          historyQos.setNumEntries(historyNumUpdates);
389          historyQos.setNewestFirst(historyNewestFirst);
390          sq.setHistoryQos(historyQos);
391 
392          if (filterQuery.length() > 0) {
393             AccessFilterQos filter = new AccessFilterQos(glob, filterType, filterVersion, filterQuery);
394             sq.addAccessFilter(filter);
395          }
396          if (clientPropertyMap != null) {
397             Iterator it = clientPropertyMap.keySet().iterator();
398             while (it.hasNext()) {
399                String key = (String)it.next();
400                sq.addClientProperty(key, clientPropertyMap.get(key).toString());
401             }
402          }
403 
404          log.info("SubscribeKey=\n" + sk.toXml());
405          log.info("SubscribeQos=\n" + sq.toXml());
406 
407          if (firstConnect && interactive) {
408             Global.waitOnKeyboardHit("Hit a key to subscribe '" + qStr + "'");
409          }
410          firstConnect = false;
411 
412          this.srq = con.subscribe(sk, sq);
413 
414          subscribeServerId = con.getConnectReturnQos().getSecretSessionId() + con.getConnectReturnQos().getServerInstanceId();
415 
416          log.info("Subscribed on topic '" + ((oid.length() > 0) ? oid : xpath) +
417                         "', got subscription id='" + this.srq.getSubscriptionId() + "'\n" + this.srq.toXml());
418          if (log.isLoggable(Level.FINEST)) log.finest("Subscribed: " + sk.toXml() + sq.toXml() + srq.toXml());
419       }
420       catch (XmlBlasterException e) {
421          log.severe(e.getMessage());
422       }
423    }
424 
425    /**
426     * Here the messages from xmlBlaster arrive. 
427     */
428    public String update(String cbSessionId, UpdateKey updateKey, byte[] content,
429                         UpdateQos updateQos) throws XmlBlasterException {
430       if (updateQos.isErased() && oid.length() > 0) { // Erased topic with EXACT subscription?
431          if (dumpToConsole) {
432             System.out.println("============= Topic '" + updateKey.getOid() + "' is ERASED =======================");
433             System.out.println(updateKey.toXml());
434          }
435          subscribe();             // topic is erased -> re-subsribe
436          return Constants.RET_OK; // "<qos><state id='OK'/></qos>";
437       }
438       ++updateCounter;
439       if (dumpToConsole) {
440          System.out.println("");
441          System.out.println("============= START #" + updateCounter + " '" + updateKey.getOid() + "' =======================");
442          log.info("Receiving update #" + updateCounter + " of a message ...");
443          System.out.println("<xmlBlaster>");
444          System.out.println(updateKey.toXml());
445          System.out.println("");
446          System.out.println("<content size='"+content.length+"'>");
447          if (maxContentLength < 0 || content.length < maxContentLength) {
448             System.out.println(new String(content));
449          }
450          else {
451             String str = new String(content, 0,maxContentLength-5);
452             System.out.println(str + " ...");
453          }
454          System.out.println("</content>");
455          System.out.println(updateQos.toXml());
456          System.out.println("</xmlBlaster>");
457       }
458 
459       if (dumpToFile) {
460          String pre = (this.filePrefix.length() > 0) ? this.filePrefix : (updateKey.getOid() + "-");
461          String time = formatDate(updateQos.getRcvTimestamp().getMillis()); // 2005-06-15T052536
462          String fileName = pre + time;
463          if (fileExtension != null && fileExtension.length() > 0) {
464             fileName += fileExtension;
465          }
466          String lckFile = "";
467          if (this.fileLock.length() > 0) {
468             lckFile = fileName + this.fileLock;
469          }
470          try {
471             if (lckFile.length() > 0) {
472                FileLocator.writeFile(lckFile, "Writing " +</