1 // xmlBlaster/demo/HelloWorld4.java
  2 import java.util.logging.Level;
  3 import java.util.logging.Logger;
  4 
  5 import org.xmlBlaster.client.I_Callback;
  6 import org.xmlBlaster.client.I_ConnectionStateListener;
  7 import org.xmlBlaster.client.I_XmlBlasterAccess;
  8 import org.xmlBlaster.client.key.EraseKey;
  9 import org.xmlBlaster.client.key.GetKey;
 10 import org.xmlBlaster.client.key.PublishKey;
 11 import org.xmlBlaster.client.key.SubscribeKey;
 12 import org.xmlBlaster.client.key.UpdateKey;
 13 import org.xmlBlaster.client.qos.ConnectQos;
 14 import org.xmlBlaster.client.qos.ConnectReturnQos;
 15 import org.xmlBlaster.client.qos.DisconnectQos;
 16 import org.xmlBlaster.client.qos.EraseQos;
 17 import org.xmlBlaster.client.qos.GetQos;
 18 import org.xmlBlaster.client.qos.GetReturnQos;
 19 import org.xmlBlaster.client.qos.PublishQos;
 20 import org.xmlBlaster.client.qos.PublishReturnQos;
 21 import org.xmlBlaster.client.qos.SubscribeQos;
 22 import org.xmlBlaster.client.qos.UpdateQos;
 23 import org.xmlBlaster.util.Global;
 24 import org.xmlBlaster.util.MsgUnit;
 25 import org.xmlBlaster.util.XmlBlasterException;
 26 import org.xmlBlaster.util.def.MethodName;
 27 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 28 import org.xmlBlaster.util.dispatch.I_PostSendListener;
 29 import org.xmlBlaster.util.error.I_MsgErrorHandler;
 30 import org.xmlBlaster.util.error.I_MsgErrorInfo;
 31 import org.xmlBlaster.util.qos.QosData;
 32 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
 33 
 34 
 35 /**
 36  * This client connects to xmlBlaster in failsafe mode and uses specific update handlers. 
 37  * <p />
 38  * In fail save mode the client will poll for the xmlBlaster server and
 39  * queue messages until the server is available.
 40  * We show all available control of a client in failsafe mode.
 41  * <p />
 42  * Invoke: java HelloWorld4 -session.name joe/2 -passwd secret
 43  * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
 44  */
 45 public class HelloWorld4
 46 {
 47    private final Global glob;
 48    private static Logger log = Logger.getLogger(HelloWorld4.class.getName());
 49    private I_XmlBlasterAccess con = null;
 50    private ConnectReturnQos conRetQos = null;
 51 
 52    public HelloWorld4(final Global glob) {
 53       this.glob = glob;
 54 
 55 
 56       try {
 57          con = glob.getXmlBlasterAccess();
 58 
 59          // Do all client side error handling our self
 60          // this error handler is called when we are/were polling for the server:
 61          con.setClientErrorHandler(new I_MsgErrorHandler() {
 62 
 63                public void handleError(I_MsgErrorInfo msgErrorInfo) {
 64                   if (msgErrorInfo == null) return;
 65                   XmlBlasterException ex = msgErrorInfo.getXmlBlasterException();
 66                   if (ex.isUser()) {
 67                      log.severe("Connection failed: " + msgErrorInfo.getXmlBlasterException().getMessage());
 68                      if (msgErrorInfo.getDispatchManager() != null) {
 69                         msgErrorInfo.getDispatchManager().toDead(msgErrorInfo.getXmlBlasterException());
 70                         if (msgErrorInfo.getQueue() != null)
 71                            msgErrorInfo.getQueue().clear();
 72                         msgErrorInfo.getDispatchManager().shutdown();
 73                         return;
 74                      }
 75                   }
 76                   MsgQueueEntry[] entries = msgErrorInfo.getMsgQueueEntries();
 77                   for (int i=0; i<entries.length; i++)
 78                      log.severe("Message '" + entries[i].getEmbeddedType() + "' '" +
 79                                    entries[i].getLogId() + "' is lost: " + msgErrorInfo.getXmlBlasterException().getMessage());
 80                   if (msgErrorInfo.getQueue() != null)
 81                      msgErrorInfo.getQueue().clear();
 82                }
 83 
 84                public void handleErrorSync(I_MsgErrorInfo msgErrorInfo) throws XmlBlasterException {
 85                   if (msgErrorInfo.getXmlBlasterException().isCommunication()) {
 86                      handleError(msgErrorInfo);
 87                      return;
 88                   }
 89                   throw msgErrorInfo.getXmlBlasterException(); // Throw back to client
 90                }
 91 
 92                public void shutdown() {
 93                }
 94             }
 95          );
 96          
 97          // This listener receives only events from asynchronously send messages from queue.
 98          // e.g. after a reconnect when client side queued messages are delivered
 99          con.registerPostSendListener(new I_PostSendListener() {
100             /**
101              * @see I_PostSendListener#postSend(MsgQueueEntry[])
102              */
103             public void postSend(MsgQueueEntry[] entries) {
104                try {
105                   for (int i=0; i<entries.length; i++) {
106                      if (MethodName.PUBLISH.equals(entries[i].getMethodName())) { 
107                         MsgUnit msg = entries[i].getMsgUnit();
108                         PublishReturnQos retQos = (PublishReturnQos)entries[i].getReturnObj();
109                         log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue: " + retQos.toXml());
110                      }
111                      else
112                         log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
113                   }
114                } catch (Throwable e) {
115                   e.printStackTrace();
116                }
117             }
118 
119             /**
120              * @see I_PostSendListener#sendingFailed(MsgQueueEntry[], XmlBlasterException)
121              */
122             public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException ex) {
123                try {
124                   for (int i=0; i<entries.length; i++) {
125                      if (MethodName.PUBLISH.equals(entries[i].getMethodName())) { 
126                         MsgUnit msg = entries[i].getMsgUnit();
127                         log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue failed: " + ex.getMessage());
128                      }
129                      else
130                         log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
131                   }
132                } catch (Throwable e) {
133                   e.printStackTrace();
134                }
135                return false; // false: message remains in queue and we go to dead
136             }
137          });
138 
139 
140          // Listen on status changes of our connection to xmlBlaster
141          con.registerConnectionListener(new I_ConnectionStateListener() {
142                
143                public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
144                   log.info("I_ConnectionStateListener.reachedAlive(): We were lucky, connected to " +
145                            connection.getConnectReturnQos().getSessionName());
146                   if (connection.getQueue().getNumOfEntries() > 0) {
147                      log.info("I_ConnectionStateListener.reachedAlive(): Queue contains " +
148                               connection.getQueue().getNumOfEntries() + " messages: " +
149                               connection.getQueue().toXml(""));
150                      try {
151                         java.util.ArrayList list = connection.getQueue().peek(-1, -1);
152                         for (int i=0; i<list.size(); i++) {
153                            log.info(((MsgQueueEntry)list.get(i)).toXml());
154                         }
155                         /*
156                         MsgQueueEntry entry = (MsgQueueEntry)connection.getQueue().peek();
157                         log.info("I_ConnectionStateListener.reachedAlive(): Discarding messages from queue");
158                         connection.getQueue().clear(); // e.g. discard all msgs (it is our choice)
159                         if (MethodName.CONNECT == entry.getMethodName()) {
160                            connection.getQueue().put(entry, false);
161                         }
162                         */
163                      }
164                      catch (XmlBlasterException e) {
165                      }
166                   }
167                   if (!connection.getConnectReturnQos().isReconnected()) {
168                      log.info("I_ConnectionStateListener.reachedAlive(): New server instance found");
169                      if (connection.isConnected())
170                         initClient();    // initialize subscription etc. again
171                   }
172                   else {
173                      log.info("I_ConnectionStateListener.reachedAlive(): Same server instance found");
174                   }
175                }
176 
177                public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
178                   log.warning("I_ConnectionStateListener.reachedPolling(): No connection to " + glob.getId() + ", we are polling ...");
179                   if (!connection.isConnected())
180                      initClient();
181                }
182 
183                public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
184                   log.severe("I_ConnectionStateListener.reachedDead(): Connection to " + glob.getId() + " is dead, good bye");
185                   System.exit(1);
186                }
187             });
188 
189 
190          ConnectQos qos = new ConnectQos(glob);
191          conRetQos = con.connect(qos, new I_Callback() {
192 
193             public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
194                if (log.isLoggable(Level.FINEST)) log.finest("UpdateKey.toString()=" + updateKey.toString() +
195                                           "UpdateQos.toString()=" + updateQos.toString());
196                if (updateKey.isInternal()) {
197                   log.severe("Receiving unexpected asynchronous internal message '" + updateKey.getOid() +
198                                 "' in default handler");
199                   return "";
200                }
201                if (updateQos.isErased()) {
202                   log.info("Message '" + updateKey.getOid() + "' is erased");
203                   return "";
204                }
205                if (updateKey.getOid().equals("Banking"))
206                   log.info("Receiving asynchronous message '" + updateKey.getOid() +
207                                "' state=" + updateQos.getState() + " in default handler");
208                else
209                   log.severe("Receiving unexpected asynchronous message '" + updateKey.getOid() +
210                                    "' in default handler");
211                return "";
212             }
213 
214          });  // Login to xmlBlaster, default handler for updates
215 
216 
217          if (con.isAlive())
218             log.info("Connected as " + qos.getUserId() + " to xmlBlaster, your public session ID is " + conRetQos.getSessionName());
219          else
220             log.info("Not connected to xmlBlaster, proceeding in fail save mode ...");
221 
222          while (true) {
223             // Wait a second for messages to arrive before we logout
224             try { Thread.sleep(1000); } catch( InterruptedException i) {}
225             int key = Global.waitOnKeyboardHit("Hit a key: 'p'=publish, 'g'=get, 'q'=exit");
226             if (key == 'p') {
227                publishMessages();
228                continue;
229             }
230             else if (key == 'g') {
231                GetKey gk = new GetKey(glob, "Banking");
232                GetQos gq = new GetQos(glob);
233                try {
234                   MsgUnit[] msgs = con.get(gk, gq);
235                   if (msgs.length > 0) {
236                      GetReturnQos grq = new GetReturnQos(glob, msgs[0].getQos());
237                      log.info("Accessed xmlBlaster message with content '" + new String(msgs[0].getContent()) +
238                                "' and status=" + grq.getState());
239                   }
240                   else {
241                      log.info("No message matched get() call on " + gk.getOid());
242                   }
243                }
244                catch (XmlBlasterException e) {
245                   log.warning("get() failed:" + e.getMessage());
246                }
247                continue;
248             }
249             else if (key == 'q') {
250                break;
251             }
252          }
253       }
254       catch (XmlBlasterException e) {
255          log.severe("Houston, we have a problem: " + e.getMessage());
256       }
257       finally {
258          if (con != null) {
259             if (con.isConnected()) {
260                try {
261                   EraseQos eq = new EraseQos(glob);
262 
263                   EraseKey ek = new EraseKey(glob, "HelloWorld4");
264                   con.erase(ek, eq);
265                   
266                   ek = new EraseKey(glob, "Banking");
267                   con.erase(ek, eq);
268 
269                   // Wait on message erase events
270                   try { Thread.sleep(1000); } catch( InterruptedException i) {}
271                }
272                catch (XmlBlasterException e) {
273                   log.severe("Houston, we have a problem: " + e.getMessage());
274                   e.printStackTrace();
275                }
276             }
277             con.disconnect(new DisconnectQos(glob));
278          }
279       }
280    }
281 
282    /**
283     * We subscribe to some messages on startup or on reconnect
284     * to a new server instance. 
285     */
286    private void initClient() {
287       log.info("Entering initClient() and doing subscribes");
288       try {   
289          SubscribeKey sk = new SubscribeKey(glob, "Banking");
290          SubscribeQos sq = new SubscribeQos(glob);
291          sq.setWantInitialUpdate(false);
292          con.subscribe(sk, sq);
293 
294 
295          sk = new SubscribeKey(glob, "HelloWorld4");
296          sq = new SubscribeQos(glob);
297          sq.setWantInitialUpdate(false);
298          con.subscribe(sk, sq, new I_Callback() {
299             public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
300                if (updateKey.getOid().equals("HelloWorld4"))
301                   log.info("Receiving asynchronous message '" + updateKey.getOid() +
302                            "' state=" + updateQos.getState() + " in HelloWorld4 handler");
303                else
304                   log.severe("Receiving unexpected asynchronous message '" + updateKey.getOid() +
305                             "' with state '" + updateQos.getState() + "' in HelloWorld4 handler");
306                return "";
307             }
308          });  // subscribe with our specific update handler
309       }
310       catch (XmlBlasterException e) {
311          log.severe("Client initialization failed: " + e.getMessage());
312       }
313    }
314 
315    /**
316     * We publish some messages. 
317     */
318    private void publishMessages() {
319       try {
320 
321          PublishKey pk = new PublishKey(glob, "HelloWorld4", "text/plain", "1.0");
322          PublishQos pq = new PublishQos(glob);
323          MsgUnit msgUnit = new MsgUnit(pk, "Hi", pq);
324          con.publish(msgUnit);
325          log.info("Published message '" + pk.getOid() + "'");
326 
327 
328          pk = new PublishKey(glob, "Banking", "text/plain", "1.0");
329          pk.setClientTags("<Account><withdraw/></Account>"); // Add banking specific meta data
330          pq = new PublishQos(glob);
331          msgUnit = new MsgUnit(pk, "Ho".getBytes(), pq);
332          con.publish(msgUnit);
333          log.info("Published message '" + pk.getOid() + "'");
334 
335       }
336       catch (XmlBlasterException e) {
337          log.severe("Houston, we have a problem: " + e.getMessage());
338       }
339    }
340 
341    /**
342     * Try
343     * <pre>
344     *   java HelloWorld4 -help
345     * </pre>
346     * for usage help
347     */
348    public static void main(String args[]) {
349       Global glob = new Global();
350       
351       if (glob.init(args) != 0) { // Get help with -help
352          System.out.println(glob.usage());
353          System.err.println("Example: java HelloWorld4 -session.name Jeff\n");
354          System.exit(1);
355       }
356 
357       new HelloWorld4(glob);
358    }
359 }


syntax highlighted by Code2HTML, v. 0.9.1