1 // xmlBlaster/demo/HelloWorld4.java
2 import java.util.logging.Level;
3 import java.util.logging.Logger;
4
5 import org.xmlBlaster.client.I_Callback;
6 import org.xmlBlaster.client.I_ConnectionStateListener;
7 import org.xmlBlaster.client.I_XmlBlasterAccess;
8 import org.xmlBlaster.client.key.EraseKey;
9 import org.xmlBlaster.client.key.GetKey;
10 import org.xmlBlaster.client.key.PublishKey;
11 import org.xmlBlaster.client.key.SubscribeKey;
12 import org.xmlBlaster.client.key.UpdateKey;
13 import org.xmlBlaster.client.qos.ConnectQos;
14 import org.xmlBlaster.client.qos.ConnectReturnQos;
15 import org.xmlBlaster.client.qos.DisconnectQos;
16 import org.xmlBlaster.client.qos.EraseQos;
17 import org.xmlBlaster.client.qos.GetQos;
18 import org.xmlBlaster.client.qos.GetReturnQos;
19 import org.xmlBlaster.client.qos.PublishQos;
20 import org.xmlBlaster.client.qos.PublishReturnQos;
21 import org.xmlBlaster.client.qos.SubscribeQos;
22 import org.xmlBlaster.client.qos.UpdateQos;
23 import org.xmlBlaster.util.Global;
24 import org.xmlBlaster.util.MsgUnit;
25 import org.xmlBlaster.util.XmlBlasterException;
26 import org.xmlBlaster.util.def.MethodName;
27 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
28 import org.xmlBlaster.util.dispatch.I_PostSendListener;
29 import org.xmlBlaster.util.error.I_MsgErrorHandler;
30 import org.xmlBlaster.util.error.I_MsgErrorInfo;
31 import org.xmlBlaster.util.qos.QosData;
32 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
33
34
35 /**
36 * This client connects to xmlBlaster in failsafe mode and uses specific update handlers.
37 * <p />
38 * In fail save mode the client will poll for the xmlBlaster server and
39 * queue messages until the server is available.
40 * We show all available control of a client in failsafe mode.
41 * <p />
42 * Invoke: java HelloWorld4 -session.name joe/2 -passwd secret
43 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
44 */
45 public class HelloWorld4
46 {
47 private final Global glob;
48 private static Logger log = Logger.getLogger(HelloWorld4.class.getName());
49 private I_XmlBlasterAccess con = null;
50 private ConnectReturnQos conRetQos = null;
51
52 public HelloWorld4(final Global glob) {
53 this.glob = glob;
54
55
56 try {
57 con = glob.getXmlBlasterAccess();
58
59 // Do all client side error handling our self
60 // this error handler is called when we are/were polling for the server:
61 con.setClientErrorHandler(new I_MsgErrorHandler() {
62
63 public void handleError(I_MsgErrorInfo msgErrorInfo) {
64 if (msgErrorInfo == null) return;
65 XmlBlasterException ex = msgErrorInfo.getXmlBlasterException();
66 if (ex.isUser()) {
67 log.severe("Connection failed: " + msgErrorInfo.getXmlBlasterException().getMessage());
68 if (msgErrorInfo.getDispatchManager() != null) {
69 msgErrorInfo.getDispatchManager().toDead(msgErrorInfo.getXmlBlasterException());
70 if (msgErrorInfo.getQueue() != null)
71 msgErrorInfo.getQueue().clear();
72 msgErrorInfo.getDispatchManager().shutdown();
73 return;
74 }
75 }
76 MsgQueueEntry[] entries = msgErrorInfo.getMsgQueueEntries();
77 for (int i=0; i<entries.length; i++)
78 log.severe("Message '" + entries[i].getEmbeddedType() + "' '" +
79 entries[i].getLogId() + "' is lost: " + msgErrorInfo.getXmlBlasterException().getMessage());
80 if (msgErrorInfo.getQueue() != null)
81 msgErrorInfo.getQueue().clear();
82 }
83
84 public void handleErrorSync(I_MsgErrorInfo msgErrorInfo) throws XmlBlasterException {
85 if (msgErrorInfo.getXmlBlasterException().isCommunication()) {
86 handleError(msgErrorInfo);
87 return;
88 }
89 throw msgErrorInfo.getXmlBlasterException(); // Throw back to client
90 }
91
92 public void shutdown() {
93 }
94 }
95 );
96
97 // This listener receives only events from asynchronously send messages from queue.
98 // e.g. after a reconnect when client side queued messages are delivered
99 con.registerPostSendListener(new I_PostSendListener() {
100 /**
101 * @see I_PostSendListener#postSend(MsgQueueEntry[])
102 */
103 public void postSend(MsgQueueEntry[] entries) {
104 try {
105 for (int i=0; i<entries.length; i++) {
106 if (MethodName.PUBLISH.equals(entries[i].getMethodName())) {
107 MsgUnit msg = entries[i].getMsgUnit();
108 PublishReturnQos retQos = (PublishReturnQos)entries[i].getReturnObj();
109 log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue: " + retQos.toXml());
110 }
111 else
112 log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
113 }
114 } catch (Throwable e) {
115 e.printStackTrace();
116 }
117 }
118
119 /**
120 * @see I_PostSendListener#sendingFailed(MsgQueueEntry[], XmlBlasterException)
121 */
122 public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException ex) {
123 try {
124 for (int i=0; i<entries.length; i++) {
125 if (MethodName.PUBLISH.equals(entries[i].getMethodName())) {
126 MsgUnit msg = entries[i].getMsgUnit();
127 log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue failed: " + ex.getMessage());
128 }
129 else
130 log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
131 }
132 } catch (Throwable e) {
133 e.printStackTrace();
134 }
135 return false; // false: message remains in queue and we go to dead
136 }
137 });
138
139
140 // Listen on status changes of our connection to xmlBlaster
141 con.registerConnectionListener(new I_ConnectionStateListener() {
142
143 public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
144 log.info("I_ConnectionStateListener.reachedAlive(): We were lucky, connected to " +
145 connection.getConnectReturnQos().getSessionName());
146 if (connection.getQueue().getNumOfEntries() > 0) {
147 log.info("I_ConnectionStateListener.reachedAlive(): Queue contains " +
148 connection.getQueue().getNumOfEntries() + " messages: " +
149 connection.getQueue().toXml(""));
150 try {
151 java.util.ArrayList list = connection.getQueue().peek(-1, -1);
152 for (int i=0; i<list.size(); i++) {
153 log.info(((MsgQueueEntry)list.get(i)).toXml());
154 }
155 /*
156 MsgQueueEntry entry = (MsgQueueEntry)connection.getQueue().peek();
157 log.info("I_ConnectionStateListener.reachedAlive(): Discarding messages from queue");
158 connection.getQueue().clear(); // e.g. discard all msgs (it is our choice)
159 if (MethodName.CONNECT == entry.getMethodName()) {
160 connection.getQueue().put(entry, false);
161 }
162 */
163 }
164 catch (XmlBlasterException e) {
165 }
166 }
167 if (!connection.getConnectReturnQos().isReconnected()) {
168 log.info("I_ConnectionStateListener.reachedAlive(): New server instance found");
169 if (connection.isConnected())
170 initClient(); // initialize subscription etc. again
171 }
172 else {
173 log.info("I_ConnectionStateListener.reachedAlive(): Same server instance found");
174 }
175 }
176
177 public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
178 log.warning("I_ConnectionStateListener.reachedPolling(): No connection to " + glob.getId() + ", we are polling ...");
179 if (!connection.isConnected())
180 initClient();
181 }
182
183 public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
184 log.severe("I_ConnectionStateListener.reachedDead(): Connection to " + glob.getId() + " is dead, good bye");
185 System.exit(1);
186 }
187 });
188
189
190 ConnectQos qos = new ConnectQos(glob);
191 conRetQos = con.connect(qos, new I_Callback() {
192
193 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
194 if (log.isLoggable(Level.FINEST)) log.finest("UpdateKey.toString()=" + updateKey.toString() +
195 "UpdateQos.toString()=" + updateQos.toString());
196 if (updateKey.isInternal()) {
197 log.severe("Receiving unexpected asynchronous internal message '" + updateKey.getOid() +
198 "' in default handler");
199 return "";
200 }
201 if (updateQos.isErased()) {
202 log.info("Message '" + updateKey.getOid() + "' is erased");
203 return "";
204 }
205 if (updateKey.getOid().equals("Banking"))
206 log.info("Receiving asynchronous message '" + updateKey.getOid() +
207 "' state=" + updateQos.getState() + " in default handler");
208 else
209 log.severe("Receiving unexpected asynchronous message '" + updateKey.getOid() +
210 "' in default handler");
211 return "";
212 }
213
214 }); // Login to xmlBlaster, default handler for updates
215
216
217 if (con.isAlive())
218 log.info("Connected as " + qos.getUserId() + " to xmlBlaster, your public session ID is " + conRetQos.getSessionName());
219 else
220 log.info("Not connected to xmlBlaster, proceeding in fail save mode ...");
221
222 while (true) {
223 // Wait a second for messages to arrive before we logout
224 try { Thread.sleep(1000); } catch( InterruptedException i) {}
225 int key = Global.waitOnKeyboardHit("Hit a key: 'p'=publish, 'g'=get, 'q'=exit");
226 if (key == 'p') {
227 publishMessages();
228 continue;
229 }
230 else if (key == 'g') {
231 GetKey gk = new GetKey(glob, "Banking");
232 GetQos gq = new GetQos(glob);
233 try {
234 MsgUnit[] msgs = con.get(gk, gq);
235 if (msgs.length > 0) {
236 GetReturnQos grq = new GetReturnQos(glob, msgs[0].getQos());
237 log.info("Accessed xmlBlaster message with content '" + new String(msgs[0].getContent()) +
238 "' and status=" + grq.getState());
239 }
240 else {
241 log.info("No message matched get() call on " + gk.getOid());
242 }
243 }
244 catch (XmlBlasterException e) {
245 log.warning("get() failed:" + e.getMessage());
246 }
247 continue;
248 }
249 else if (key == 'q') {
250 break;
251 }
252 }
253 }
254 catch (XmlBlasterException e) {
255 log.severe("Houston, we have a problem: " + e.getMessage());
256 }
257 finally {
258 if (con != null) {
259 if (con.isConnected()) {
260 try {
261 EraseQos eq = new EraseQos(glob);
262
263 EraseKey ek = new EraseKey(glob, "HelloWorld4");
264 con.erase(ek, eq);
265
266 ek = new EraseKey(glob, "Banking");
267 con.erase(ek, eq);
268
269 // Wait on message erase events
270 try { Thread.sleep(1000); } catch( InterruptedException i) {}
271 }
272 catch (XmlBlasterException e) {
273 log.severe("Houston, we have a problem: " + e.getMessage());
274 e.printStackTrace();
275 }
276 }
277 con.disconnect(new DisconnectQos(glob));
278 }
279 }
280 }
281
282 /**
283 * We subscribe to some messages on startup or on reconnect
284 * to a new server instance.
285 */
286 private void initClient() {
287 log.info("Entering initClient() and doing subscribes");
288 try {
289 SubscribeKey sk = new SubscribeKey(glob, "Banking");
290 SubscribeQos sq = new SubscribeQos(glob);
291 sq.setWantInitialUpdate(false);
292 con.subscribe(sk, sq);
293
294
295 sk = new SubscribeKey(glob, "HelloWorld4");
296 sq = new SubscribeQos(glob);
297 sq.setWantInitialUpdate(false);
298 con.subscribe(sk, sq, new I_Callback() {
299 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
300 if (updateKey.getOid().equals("HelloWorld4"))
301 log.info("Receiving asynchronous message '" + updateKey.getOid() +
302 "' state=" + updateQos.getState() + " in HelloWorld4 handler");
303 else
304 log.severe("Receiving unexpected asynchronous message '" + updateKey.getOid() +
305 "' with state '" + updateQos.getState() + "' in HelloWorld4 handler");
306 return "";
307 }
308 }); // subscribe with our specific update handler
309 }
310 catch (XmlBlasterException e) {
311 log.severe("Client initialization failed: " + e.getMessage());
312 }
313 }
314
315 /**
316 * We publish some messages.
317 */
318 private void publishMessages() {
319 try {
320
321 PublishKey pk = new PublishKey(glob, "HelloWorld4", "text/plain", "1.0");
322 PublishQos pq = new PublishQos(glob);
323 MsgUnit msgUnit = new MsgUnit(pk, "Hi", pq);
324 con.publish(msgUnit);
325 log.info("Published message '" + pk.getOid() + "'");
326
327
328 pk = new PublishKey(glob, "Banking", "text/plain", "1.0");
329 pk.setClientTags("<Account><withdraw/></Account>"); // Add banking specific meta data
330 pq = new PublishQos(glob);
331 msgUnit = new MsgUnit(pk, "Ho".getBytes(), pq);
332 con.publish(msgUnit);
333 log.info("Published message '" + pk.getOid() + "'");
334
335 }
336 catch (XmlBlasterException e) {
337 log.severe("Houston, we have a problem: " + e.getMessage());
338 }
339 }
340
341 /**
342 * Try
343 * <pre>
344 * java HelloWorld4 -help
345 * </pre>
346 * for usage help
347 */
348 public static void main(String args[]) {
349 Global glob = new Global();
350
351 if (glob.init(args) != 0) { // Get help with -help
352 System.out.println(glob.usage());
353 System.err.println("Example: java HelloWorld4 -session.name Jeff\n");
354 System.exit(1);
355 }
356
357 new HelloWorld4(glob);
358 }
359 }
syntax highlighted by Code2HTML, v. 0.9.1