1 /*------------------------------------------------------------------------------
2 Name: XmlBlasterAccess.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.client;
7
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.logging.Level;
14 import java.util.logging.Logger;
15
16 import org.xmlBlaster.authentication.plugins.I_ClientPlugin;
17 import org.xmlBlaster.client.dispatch.ClientDispatchManager;
18 import org.xmlBlaster.client.key.EraseKey;
19 import org.xmlBlaster.client.key.GetKey;
20 import org.xmlBlaster.client.key.PublishKey;
21 import org.xmlBlaster.client.key.SubscribeKey;
22 import org.xmlBlaster.client.key.UnSubscribeKey;
23 import org.xmlBlaster.client.key.UpdateKey;
24 import org.xmlBlaster.client.protocol.AbstractCallbackExtended;
25 import org.xmlBlaster.client.protocol.I_CallbackServer;
26 import org.xmlBlaster.client.qos.ConnectQos;
27 import org.xmlBlaster.client.qos.ConnectReturnQos;
28 import org.xmlBlaster.client.qos.DisconnectQos;
29 import org.xmlBlaster.client.qos.EraseQos;
30 import org.xmlBlaster.client.qos.EraseReturnQos;
31 import org.xmlBlaster.client.qos.GetQos;
32 import org.xmlBlaster.client.qos.PublishQos;
33 import org.xmlBlaster.client.qos.PublishReturnQos;
34 import org.xmlBlaster.client.qos.SubscribeQos;
35 import org.xmlBlaster.client.qos.SubscribeReturnQos;
36 import org.xmlBlaster.client.qos.UnSubscribeQos;
37 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
38 import org.xmlBlaster.client.qos.UpdateQos;
39 import org.xmlBlaster.client.queuemsg.MsgQueueConnectEntry;
40 import org.xmlBlaster.client.queuemsg.MsgQueueDisconnectEntry;
41 import org.xmlBlaster.client.queuemsg.MsgQueueEraseEntry;
42 import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;
43 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
44 import org.xmlBlaster.client.queuemsg.MsgQueueSubscribeEntry;
45 import org.xmlBlaster.client.queuemsg.MsgQueueUnSubscribeEntry;
46 import org.xmlBlaster.jms.XBConnectionMetaData;
47 import org.xmlBlaster.util.FileDumper;
48 import org.xmlBlaster.util.Global;
49 import org.xmlBlaster.util.I_ReplaceContent;
50 import org.xmlBlaster.util.I_Timeout;
51 import org.xmlBlaster.util.I_TimeoutManager;
52 import org.xmlBlaster.util.MsgUnit;
53 import org.xmlBlaster.util.SessionName;
54 import org.xmlBlaster.util.Timestamp;
55 import org.xmlBlaster.util.XmlBlasterException;
56 import org.xmlBlaster.util.admin.extern.JmxMBeanHandle;
57 import org.xmlBlaster.util.checkpoint.I_Checkpoint;
58 import org.xmlBlaster.util.cluster.NodeId;
59 import org.xmlBlaster.util.context.ContextNode;
60 import org.xmlBlaster.util.def.Constants;
61 import org.xmlBlaster.util.def.ErrorCode;
62 import org.xmlBlaster.util.def.MethodName;
63 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
64 import org.xmlBlaster.util.dispatch.DispatchConnection;
65 import org.xmlBlaster.util.dispatch.DispatchStatistic;
66 import org.xmlBlaster.util.dispatch.I_ConnectionStatusListener;
67 import org.xmlBlaster.util.dispatch.I_DispatchManager;
68 import org.xmlBlaster.util.dispatch.I_PostSendListener;
69 import org.xmlBlaster.util.error.I_MsgErrorHandler;
70 import org.xmlBlaster.util.key.MsgKeyData;
71 import org.xmlBlaster.util.qos.ClientProperty;
72 import org.xmlBlaster.util.qos.DisconnectQosData;
73 import org.xmlBlaster.util.qos.MsgQosData;
74 import org.xmlBlaster.util.qos.TopicProperty;
75 import org.xmlBlaster.util.qos.address.CallbackAddress;
76 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
77 import org.xmlBlaster.util.qos.storage.ClientQueueProperty;
78 import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;
79 import org.xmlBlaster.util.queue.I_Entry;
80 import org.xmlBlaster.util.queue.I_Queue;
81 import org.xmlBlaster.util.queue.StorageId;
82 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
83
84 /**
85 * This is the default implementation of the java client side remote access to xmlBlaster.
86 * <p>
87 * It hides a client side queue, the client side dispatcher framework for polling
88 * or pinging the server and some more features.
89 * </p>
90 * <p>
91 * The interface I_CallbackRaw/I_Callback/I_CallbackExtenden are enforced by AbstractCallbackExtended.
92 * </p>
93 */
94 public /*final*/ class XmlBlasterAccess extends AbstractCallbackExtended
95 implements I_XmlBlasterAccess, I_ConnectionStatusListener, I_PostSendListener, XmlBlasterAccessMBean
96 {
97 private static Logger log = Logger.getLogger(XmlBlasterAccess.class.getName());
98 private String ME = "XmlBlasterAccess";
99 private ContextNode contextNode;
100 /**
101 * The cluster node id (name) to which we want to connect, needed for nicer logging, typically null
102 * Can be set manually from outside before connect
103 */
104 private String serverNodeId = null;
105 private ConnectQos connectQos;
106 /** The return from connect() */
107 private ConnectReturnQos connectReturnQos;
108 private long jmxPublicSessionId;
109 /** Client side queue during connection failure */
110 private I_Queue clientQueue;
111 /** The dispatcher framework **/
112 private I_DispatchManager dispatchManager;
113 /** Statistic about send/received messages, can be null if there is a DispatchManager around */
114 private volatile DispatchStatistic statistic;
115 /** The object handling message delivery problems */
116 private I_MsgErrorHandler msgErrorHandler;
117 /** Client side helper classes to load the authentication xml string */
118 private I_ClientPlugin secPlgn;
119 /** The callback server */
120 private I_CallbackServer cbServer;
121 /** Handles the registered callback interfaces for given subscriptions. */
122 private final UpdateDispatcher updateDispatcher;
123 /** Used to callback the clients default update() method (as given on connect()) */
124 private I_Callback updateListener;
125 /** Is not null if the client wishes to be notified about connection state changes in fail safe operation */
126 private I_ConnectionStateListener connectionListener;
127 private I_PostSendListener postSendListener;
128 /** Allow to cache updated messages for simulated synchronous access with get().
129 * Do behind a get() a subscribe to allow cached synchronous get() access */
130 private SynchronousCache synchronousCache;
131 private boolean disconnectInProgress;
132 private boolean connectInProgress;
133 private String[] checkPointContext;
134
135 /** this I_XmlBlasterAccess is valid until a 'leaveServer' invocation is done.*/
136 private boolean isValid = true;
137
138 private boolean firstWarn = true;
139
140 private Timestamp sessionRefreshTimeoutHandle;
141 /** My JMX registration */
142 private JmxMBeanHandle mbeanHandle;
143 /** First call to connect() in millis */
144 private long startupTime;
145
146 StreamingCallback streamingCb;
147
148 private String storageIdPrefix;
149
150 private FileDumper fileDumper;
151
152 private boolean shutdown = false;
153
154 private Object userObject;
155
156 private XmlBlasterException toDeadXmlBlasterException;
157
158 /**
159 * Create an xmlBlaster accessor.
160 * Please don't create directly but use the factory instead:
161 * <pre>
162 * import org.xmlBlaster.util.Global;
163 * ...
164 * final Global glob = new Global(args);
165 * final I_XmlBlasterAccess xmlBlasterAccess = glob.getXmlBlasterAccess();
166 * </pre>
167 * @param glob Your environment handle or null to use the default Global.instance()
168 * You must use a cloned Global for each XmlBlasterAccess created.
169 * engine.Global is not allowed here, only util.Global is supported
170 * @exception IllegalArgumentException If engine.Global is used as parameter
171 */
172 public XmlBlasterAccess(Global glob) {
173 super((glob==null) ? Global.instance() : glob);
174 //if (glob.wantsHelp()) {
175 // usage();
176 //}
177 if (super.glob.getNodeId() != null) {
178 // it is a engine.Global!
179 throw new IllegalArgumentException("XmlBlasterAccess can't be created with a engine.Global, please clone a org.xmlBlaster.util.Global to create me");
180 }
181 this.updateDispatcher = new UpdateDispatcher(super.glob);
182 }
183
184 /**
185 * Create an xmlBlaster accessor.
186 * Please don't create directly but use the factory instead:
187 * <pre>
188 * final Global glob = new Global(args);
189 * final I_XmlBlasterAccess xmlBlasterAccess = glob.getXmlBlasterAccess();
190 * </pre>
191 * @param args Your command line arguments
192 */
193 public XmlBlasterAccess(String[] args) {
194 super(new Global(args, true, false));
195 this.updateDispatcher = new UpdateDispatcher(super.glob);
196 }
197
198 /**
199 * @see org.xmlBlaster.client.I_XmlBlasterAccess#registerConnectionListener(I_ConnectionStateListener)
200 */
201 public synchronized void registerConnectionListener(I_ConnectionStateListener connectionListener) {
202 if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Initializing registering connectionListener");
203 this.connectionListener = connectionListener;
204 }
205
206 /**
207 * Register a listener to get notifications when a messages is successfully send from
208 * the client side tail back queue.
209 * Max one can be registered, any old one will be overwritten
210 * @param postSendListener The postSendListener to set.
211 * @return the old listener or null if no previous was registered
212 */
213 public final I_PostSendListener registerPostSendListener(I_PostSendListener postSendListener) {
214 I_PostSendListener old = this.postSendListener;
215 this.postSendListener = postSendListener;
216 return old;
217 }
218
219 /**
220 * Called after a messages is send from the client side queue, but not for oneway messages.
221 * Enforced by I_PostSendListener
222 * @param msgQueueEntry, includes the returned QoS (e.g. PublisReturnQos)
223 */
224 public final void postSend(MsgQueueEntry[] entries) {
225 for (int i=0; i<entries.length; i++) {
226 MsgQueueEntry msgQueueEntry = entries[i];
227 if (msgQueueEntry.getMethodName() == MethodName.CONNECT) {
228 this.connectReturnQos = (ConnectReturnQos)msgQueueEntry.getReturnObj();
229 if (this.connectReturnQos != null) {
230 setContextNodeId(this.connectReturnQos.getServerInstanceId());
231 // break; Loop to the latest if any
232 }
233 else {
234 //log.warning("Expected connectReturnQos for " + msgQueueEntry.toXml() + " " + Global.getStackTraceAsString(null));
235 if (log.isLoggable(Level.FINE)) log.fine("Expected connectReturnQos for " + msgQueueEntry.toXml() + " " + Global.getStackTraceAsString(null));
236 }
237 }
238 }
239 I_PostSendListener l = this.postSendListener;
240 if (l != null) {
241 try {
242 l.postSend(entries);
243 }
244 catch (Throwable e) {
245 e.printStackTrace();
246 }
247 }
248 }
249
250 public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException exception) {
251 I_PostSendListener l = this.postSendListener;
252 try {
253 if (l == null) {
254 for (int i=0; i<entries.length; i++) {
255 MsgUnit msgUnit = entries[i].getMsgUnit();
256 if (msgUnit != null) {
257 String fn = this.getFileDumper().dumpMessage(msgUnit.getKeyData(), msgUnit.getContent(),
258 msgUnit.getQosData());
259 log.severe("Async sending of message failed for message " + msgUnit.getKeyOid() + ", is dumped to "
260 + fn + ": " + exception.getMessage());
261 } else {
262 log.severe("Async sending of message failed: " + entries[i].toXml() + ": " + exception.getMessage());
263 }
264 }
265 }
266 else {
267 return l.sendingFailed(entries, exception);
268 }
269 }
270 catch (Throwable e) {
271 e.printStackTrace();
272 for (int i=0; i<entries.length; i++)
273 log.severe("Async sending of message failed for message " + entries[i].toXml() +"\nreason is: " + exception.getMessage());
274 }
275 return false;
276 }
277
278 public FileDumper getFileDumper() throws XmlBlasterException {
279 if (this.fileDumper == null) {
280 synchronized (this) {
281 if (this.fileDumper == null) {
282 this.fileDumper = new FileDumper(this.glob);
283 }
284 }
285 }
286 return this.fileDumper;
287 }
288
289
290 /**
291 */
292 public SynchronousCache createSynchronousCache(int size) {
293 if (this.synchronousCache != null)
294 return this.synchronousCache; // Is initialized already
295 if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Initializing synchronous cache: size=" + size);
296 this.synchronousCache = new SynchronousCache(glob, size);
297 log.info(getLogId()+"SynchronousCache has been initialized with size="+size);
298 return this.synchronousCache;
299 }
300
301 public void setClientErrorHandler(I_MsgErrorHandler msgErrorHandler) {
302 this.msgErrorHandler = msgErrorHandler;
303 }
304
305 public String getConnectionQueueId() {
306 if (this.clientQueue != null) {
307 return this.clientQueue.getStorageId().toString();
308 }
309 return "";
310 }
311
312 /**
313 * The unique name of this session instance.
314 * @return Never null, for example "/xmlBlaster/node/heron/client/joe/session/-2"
315 */
316 public final ContextNode getContextNode() {
317 return this.contextNode;
318 }
319
320 public boolean forcePollingForTesting() {
321 if (!isAlive())
322 return false;
323 DispatchConnection dcon = this.dispatchManager.getDispatchConnectionsHandler().getAliveDispatchConnection();
324 if (dcon == null)
325 return false;
326 XmlBlasterException e = new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION,
327 "forcePollingForTesting", "Forcing POLLING");
328 try {
329 dcon.handleTransition(true, e);
330 } catch (XmlBlasterException e1) {
331 e1.printStackTrace();
332 return false;
333 }
334 return true;
335 }
336
337
338 public ConnectReturnQos connect(ConnectQos qos, I_StreamingCallback streamingUpdateListener, boolean withQueue) throws XmlBlasterException {
339 if (streamingUpdateListener == null)
340 throw new XmlBlasterException(this.glob, ErrorCode.USER_ILLEGALARGUMENT, "connect", "the streamingUpdateListener is null, you must provide one");
341 this.streamingCb = new StreamingCallback(this.glob, streamingUpdateListener, 0, 0, withQueue);
342 if (withQueue)
343 registerConnectionListener(this.streamingCb);
344 return connect(qos, this.streamingCb);
345 }
346
347 /**
348 * The storageId must remain the same after a client restart
349 *
350 * @param relating
351 * xbType like Constants.RELATING_CLIENT
352 * @return
353 */
354 public StorageId createStorageId(String relating) {
355 StorageId storageId = null;
356 if (getStorageIdStr() != null && getStorageIdStr().length() > 0) {
357 // client code forces a named client side storageId -
358 // dangerous if the name conflicts with server name in same DB
359 storageId = new StorageId(glob, serverNodeId, relating, getStorageIdStr());
360 } else {
361 if (getPublicSessionId() == 0) {
362 // having no public sessionId we need to generate a unique
363 // queue name
364 storageId = new StorageId(glob, serverNodeId, relating, getId() + System.currentTimeMillis()
365 + Global.getCounter());
366 } else {
367 SessionName ses = getSessionName();
368 if (ses != null)
369 storageId = new StorageId(glob, serverNodeId, relating, ses);
370 else
371 storageId = new StorageId(glob, serverNodeId, relating, getId() + System.currentTimeMillis()
372 + Global.getCounter());
373 }
374 }
375 return storageId;
376 }
377
378 /**
379 * @see org.xmlBlaster.client.I_XmlBlasterAccess#connect(ConnectQos, I_Callback)
380 */
381 public ConnectReturnQos connect(ConnectQos qos, I_Callback updateListener) throws XmlBlasterException {
382 if (!this.isValid)
383 throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "connect");
384
385 synchronized (this) {
386
387 if (this.startupTime == 0) {
388 this.startupTime = System.currentTimeMillis();
389 }
390
391 if (isConnected() || this.connectInProgress) {
392 String text = "connect() rejected, you are connected already, please check your code";
393 throw new XmlBlasterException(glob, ErrorCode.USER_CONNECT_MULTIPLE, ME, text);
394 }
395
396 this.connectInProgress = true;
397
398 try {
399 this.connectQos = (qos==null) ? new ConnectQos(glob) : qos;
400
401 ClientProperty tmp = this.connectQos.getClientProperty(Constants.UPDATE_BULK_ACK);
402 if (tmp != null) {
403 if (tmp.getBooleanValue()) {
404 log.info("Setting the flag '" + Constants.UPDATE_BULK_ACK + "' to 'true' since specified in ConnectQos");
405 this.updateBulkAck = true;
406 }
407 }
408
409
410 // We need to set a unique ID for this client so that global.getId() is unique
411 // which is used e.g. in the JDBC plugin
412 SessionName sn = getSessionName();
413 if (sn != null) {
414 if (sn.isPubSessionIdUser()) {
415 this.glob.setId(sn.toString());
416 }
417 else {
418 this.glob.setId(sn.toString() + System.currentTimeMillis()); // Not secure if two clients start simultaneously
419 }
420 }
421 else {
422 this.glob.setId(getLoginName() + System.currentTimeMillis()); // Not secure if two clients start simultaneously
423 }
424 this.glob.resetInstanceId();
425 this.connectQos.getData().setInstanceId(this.glob.getInstanceId());
426
427 if (connectQos.getData().getGlobal().isServerSide()) {
428 String text = "Your ConnectQos.getData() contains a ServerScope instead of a Global instance, this is not allowed";
429 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, text);
430 }
431
432 this.updateListener = updateListener;
433
434 // TODO: This is done by ConnectQos already, isn't it?
435 initSecuritySettings(this.connectQos.getData().getClientPluginType(),
436 this.connectQos.getData().getClientPluginVersion());
437
438 this.ME = "XmlBlasterAccess-" + getId();
439 setContextNodeId(getServerNodeId());
440
441 try {
442 ClientQueueProperty prop = this.connectQos.getClientQueueProperty();
443 StorageId storageId = createStorageId(Constants.RELATING_CLIENT);
444 this.clientQueue = glob.getQueuePluginManager().getPlugin(prop.getType(), prop.getVersion(), storageId,
445 this.connectQos.getClientQueueProperty());
446 if (this.clientQueue == null) {
447 String text = "The client queue plugin is not found with this configuration, please check your connect QoS: " + prop.toXml();
448 throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME, text);
449 }
450
451 if (this.msgErrorHandler == null) {
452 this.msgErrorHandler = new ClientErrorHandler(glob, this);
453 }
454
455 boolean forceCbAddressCreation = (updateListener != null);
456 this.dispatchManager = new ClientDispatchManager(glob, this.msgErrorHandler,
457 getSecurityPlugin(), this.clientQueue, this,
458 this.connectQos.getAddresses(forceCbAddressCreation), sn);
459 // the above can call toDead() and the client may have called shutdown(): this.connectQos == null again
460 if (this.dispatchManager.isDead())
461 throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "connect call failed, your toDead() code did shutdown?");
462
463 getDispatchStatistic(); // Force creation of dispatchStatistic as this syncs on 'this' and could deadlock if don later from a update()
464
465 this.dispatchManager.getDispatchConnectionsHandler().registerPostSendListener(this);
466
467 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Switching to synchronous delivery mode ...");
468 this.dispatchManager.trySyncMode(true);
469
470 if (this.updateListener != null) { // Start a default callback server using same protocol
471 createDefaultCbServer();
472 }
473
474 if (this.connectQos.doSendConnect()) {
475 // Try to connect to xmlBlaster ...
476 sendConnectQos();
477 }
478 else {
479 log.info(getLogId()+"Initialized client library, but no connect() is send to xmlBlaster, a delegate should do any subscribe if required");
480 }
481 }
482 catch (XmlBlasterException e) {
483 if (isConnected()) disconnect((DisconnectQos)null);
484 throw e;
485 }
486 catch (Throwable e) {
487 if (isConnected()) disconnect((DisconnectQos)null);
488 throw XmlBlasterException.convert(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "Connection failed", e);
489 }
490 }
491 finally {
492 this.connectInProgress = false;
493 }
494 } // synchronized
495
496 if (this.connectQos.getRefreshSession()) {
497 startSessionRefresher();
498 }
499
500 if (isAlive()) {
501 if (this.connectionListener != null) {
502 this.connectionListener.reachedAlive(ConnectionStateEnum.UNDEF, this);
503 }
504 log.info(glob.getReleaseId() + ": Successful " + this.connectQos.getAddress().getType() + " login as " + getId());
505
506 if (this.clientQueue.getNumOfEntries() > 0) {
507 long num = this.clientQueue.getNumOfEntries();
508 log.info(getLogId()+"We have " + num + " client side queued tail back messages");
509 this.dispatchManager.switchToASyncMode();
510 while (this.clientQueue.getNumOfEntries() > 0) {
511 try { Thread.sleep(20L); } catch( InterruptedException i) {}
512 }
513 log.info((num-this.clientQueue.getNumOfEntries()) + " client side queued tail back messages sent");
514 this.dispatchManager.switchToSyncMode();
515 }
516 else {
517 if (this.connectionListener != null)
518 this.connectionListener.reachedAliveSync(ConnectionStateEnum.ALIVE, this);
519 }
520 }
521 else {
522 if (this.connectionListener != null) {
523 this.connectionListener.reachedPolling(ConnectionStateEnum.UNDEF, this);
524 }
525 log.info(glob.getReleaseId() + ": Login request as " + getId() + " is queued");
526 }
527
528 if (this.connectReturnQos != null) {
529 setContextNodeId(this.connectReturnQos.getServerInstanceId());
530 }
531
532 return this.connectReturnQos; // new ConnectReturnQos(glob, "");
533 }
534
535 /**
536 * Sends the current connectQos to xmlBlaster and stores the connectReturnQos.
537 * @throws XmlBlasterException
538 */
539 private void sendConnectQos() throws XmlBlasterException {
540 MsgQueueConnectEntry entry = new MsgQueueConnectEntry(this.glob, this.clientQueue.getStorageId(), this.connectQos.getData());
541 // Try to connect to xmlBlaster ...
542 this.connectReturnQos = (ConnectReturnQos)queueMessage(entry);
543 this.connectReturnQos.getData().setInitialConnectionState(this.dispatchManager.getDispatchConnectionsHandler().getState());
544 }
545
546 public boolean isConnected() {
547 if (this.dispatchManager != null) {
548 return this.connectReturnQos != null && !this.dispatchManager.getDispatchConnectionsHandler().isDead();
549 }
550 return this.connectReturnQos != null;
551 }
552
553 private void startSessionRefresher() {
554 if (this.connectQos == null) return;
555 long sessionTimeout = this.connectQos.getSessionQos().getSessionTimeout();
556 final long MIN = 2000L; // Sessions which live less than 2 seconds are not supported
557 if (sessionTimeout >= MIN) {
558 long gap = (sessionTimeout < 60*1000L) ? sessionTimeout/2 : sessionTimeout-30*1000L;
559 final long refreshTimeout = sessionTimeout - gap;
560 final I_TimeoutManager timeout = this.glob.getPingTimer();
561 this.sessionRefreshTimeoutHandle = timeout.addTimeoutListener(new I_Timeout() {
562 public void timeout(Object userData) {
563 if (isAlive()) {
564 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Refreshing session to not expire");
565 try {
566 refreshSession();
567 }
568 catch (XmlBlasterException e) {
569 log.warning(getLogId()+"Can't refresh the login session '" + getId() + "': " + e.toString());
570 }
571 }
572 else {
573 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Can't refresh session as we have no connection");
574 }
575 try {
576 sessionRefreshTimeoutHandle = timeout.addOrRefreshTimeoutListener(this, refreshTimeout, null, sessionRefreshTimeoutHandle) ;
577 }
578 catch (XmlBlasterException e) {
579 log.warning(getLogId()+"Can't refresh the login session '" + getId() + "': " + e.toString());
580 }
581 }
582 },
583 refreshTimeout, null);
584 }
585 else {
586 log.warning(getLogId()+"Auto-refreshing session is not supported for session timeouts smaller " + MIN + " seconds");
587
588 }
589 }
590
591
592 /**
593 * @see I_XmlBlasterAccess#refreshSession()
594 */
595 public void refreshSession() throws XmlBlasterException {
596 GetKey gk = new GetKey(glob, "__refresh");
597 GetQos gq = new GetQos(glob);
598 get(gk, gq);
599 }
600
601 /**
602 * Extracts address data from ConnectQos (or adds default if missing)
603 * and instantiate a callback server as specified in ConnectQos
604 */
605 private void createDefaultCbServer() throws XmlBlasterException {
606 CbQueueProperty prop = connectQos.getSessionCbQueueProperty(); // Creates a default property for us if none is available
607 CallbackAddress addr = prop.getCurrentCallbackAddress(); // may return null
608 if (addr == null)
609 addr = new CallbackAddress(glob);
610
611 this.cbServer = initCbServer(getLoginName(), addr);
612
613 addr.setType(this.cbServer.getCbProtocol());
614 addr.setRawAddress(this.cbServer.getCbAddress());
615 //addr.setVersion(this.cbServer.getVersion());
616 //addr.setSecretSessionId(cbSessionId);
617 prop.setCallbackAddress(addr);
618
619 log.info(getLogId()+"Callback settings: " + prop.getSettings());
620 }
621
622 /**
623 * @see I_XmlBlasterAccess#initCbServer(String, CallbackAddress)
624 */
625 public I_CallbackServer initCbServer(String loginName, CallbackAddress callbackAddress) throws XmlBlasterException {
626 if (callbackAddress == null)
627 callbackAddress = new CallbackAddress(glob);
628 callbackAddress.setSessionName(this.getSessionName());
629 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Using 'client.cbProtocol=" + callbackAddress.getType() + "' to be used by " + getServerNodeId() + ", trying to create the callback server ...");
630 I_CallbackServer server = glob.getCbServerPluginManager().getPlugin(callbackAddress.getType(), callbackAddress.getVersion());
631 server.initialize(this.glob, loginName, callbackAddress, this);
632 return server;
633 }
634
635 /**
636 * Initializes the little client helper framework for authentication.
637 * <p />
638 * The first goal is a proper loginQoS xml string for authentication.
639 * <p />
640 * The second goal is to intercept the messages for encryption (or whatever the
641 * plugin supports).
642 * <p />
643 * See xmlBlaster.properties, for example:
644 * <pre>
645 * Security.Client.DefaultPlugin=gui,1.0
646 * Security.Client.Plugin[gui][1.0]=org.xmlBlaster.authentication.plugins.gui.ClientSecurityHelper
647 * </pre>
648 */
649 private void initSecuritySettings(String secMechanism, String secVersion) {
650 PluginLoader secPlgnMgr = glob.getClientSecurityPluginLoader();
651 try {
652 this.secPlgn = secPlgnMgr.getClientPlugin(secMechanism, secVersion);
653 if (secMechanism != null) // to avoid double logging for login()
654 log.info(getLogId()+"Loaded security plugin=" + secMechanism + " version=" + secVersion);
655 }
656 catch (XmlBlasterException e) {
657 log.severe(getLogId()+"Security plugin '" + secMechanism + "/" + secVersion +
658 "' initialization failed. Reason: "+e.getMessage());
659 this.secPlgn = null;
660 }
661 }
662
663 public I_ClientPlugin getSecurityPlugin() {
664 return this.secPlgn;
665 }
666
667 /**
668 * @see org.xmlBlaster.client.XmlBlasterAccessMBean#disconnect(String)
669 */
670 public String disconnect(String disconnectQos) {
671 DisconnectQosData dqd = new DisconnectQosData(this.glob, null, disconnectQos);
672 boolean success = disconnect(new DisconnectQos(this.glob, dqd));
673 return "Disconnect called, success=" + success;
674 }
675
676 /**
677 * @see org.xmlBlaster.client.I_XmlBlasterAccess#disconnect(DisconnectQos)
678 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.disconnect.html">interface.disconnect requirement</a>
679 */
680 public synchronized boolean disconnect(DisconnectQos disconnectQos) {
681 if (!this.isValid) return false;
682 // Relaxed check to allow shutdown of database without successful connection
683 if (this.connectQos == null /*!isConnected()*/) {
684 log.warning(getLogId()+"You called disconnect() but you are are not logged in, we ignore it.");
685 if (glob != null)
686 glob.shutdown();
687 //shutdown(disconnectQos);
688 return false;
689 }
690
691 if (disconnectQos == null)
692 disconnectQos = new DisconnectQos(glob);
693
694 if (!disconnectQos.getClearClientQueueProp().isModified()) {
695 boolean clearClientQueue = true;
696 if (this.connectQos != null) {
697 if (this.connectQos.getSessionName().isPubSessionIdUser())
698 clearClientQueue = false; // Keep tail back messages
699 }
700 disconnectQos.clearClientQueue(clearClientQueue);
701 }
702
703 return shutdown(disconnectQos);
704 }
705
706 /**
707 * @see org.xmlBlaster.client.XmlBlasterAccessMBean#leaveServer()
708 */
709 public String leaveServer() {
710 leaveServer(null);
711 return "Clientlibrary is shutdown";
712 }
713
714 /**
715 * @see org.xmlBlaster.client.I_XmlBlasterAccess#leaveServer(Map)
716 */
717 public void leaveServer(Map map) {
718 if (!this.isValid) return;
719 synchronized(this) {
720 this.isValid = false;
721 DisconnectQos disconnectQos = new DisconnectQos(glob);
722 disconnectQos.clearClientQueue(false);
723 disconnectQos.clearSessions(false);
724 disconnectQos.deleteSubjectQueue(false);
725 disconnectQos.setLeaveServer(true);
726 disconnectQos.shutdownCbServer(true);
727 disconnectQos.shutdownDispatcher(true);
728 shutdown(disconnectQos);
729 }
730 }
731
732 private synchronized boolean shutdown(DisconnectQos disconnectQos) {
733 if (this.disconnectInProgress) {
734 log.warning(getLogId()+"Calling disconnect again is ignored, you are in shutdown progress already");
735 return false;
736 }
737
738 this.disconnectInProgress = true;
739
740 this.glob.unregisterMBean(this.mbeanHandle);
741
742 if (disconnectQos == null)
743 disconnectQos = new DisconnectQos(glob);
744
745 if (isConnected()) {
746
747 if (this.clientQueue != null) {
748 long remainingEntries = this.clientQueue.getNumOfEntries();
749 if (remainingEntries > 0) {
750 if (disconnectQos.clearClientQueue())
751 log.warning(getLogId()+"You called disconnect(). Please note that there are " + remainingEntries +
752 " unsent invocations/messages in the queue which are discarded now.");
753 else
754 log.info(getLogId()+"You called disconnect(). Please note that there are " + remainingEntries +
755 " unsent invocations/messages in the queue which are sent on next connect of the same client with the same public session ID.");
756 }
757 }
758
759 if (!disconnectQos.isLeaveServer()) {
760 String[] subscriptionIdArr = this.updateDispatcher.getSubscriptionIds();
761 for (int ii=0; ii<subscriptionIdArr.length; ii++) {
762 String subscriptionId = subscriptionIdArr[ii];
763 UnSubscribeKey key = new UnSubscribeKey(glob, subscriptionId);
764 try {
765 unSubscribe(key, null);
766 }
767 catch(XmlBlasterException e) {
768 if (e.isCommunication()) {
769 break;
770 }
771 log.warning(getLogId()+"Couldn't unsubscribe '" + subscriptionId + "' : " + e.getMessage());
772 }
773 }
774 }
775
776 // Now send the disconnect() to the server ...
777 if (!disconnectQos.isLeaveServer() && this.clientQueue != null) {
778 try {
779 MsgQueueDisconnectEntry entry = new MsgQueueDisconnectEntry(this.glob, this.clientQueue.getStorageId(), disconnectQos);
780 queueMessage(entry); // disconnects are always transient
781 log.info(getLogId()+"Successful disconnect from " + getServerNodeId());
782 } catch(Throwable e) {
783 e.printStackTrace();
784 log.warning(e.toString());
785 }
786 }
787 }
788
789 if (this.synchronousCache != null) {
790 this.synchronousCache.clear();
791 }
792
793 if (this.clientQueue != null && disconnectQos.clearClientQueue()) {
794 this.clientQueue.clear();
795 }
796
797 if (disconnectQos.shutdownDispatcher()) {
798 if (this.dispatchManager != null) {
799 this.dispatchManager.shutdown();
800 //this.dispatchManager = null;
801 }
802 if (this.clientQueue != null) {
803 this.clientQueue.shutdown(); // added to make hsqldb shutdown
804 this.clientQueue = null;
805 }
806 }
807
808 if (disconnectQos.shutdownCbServer() && this.cbServer != null) {
809 try {
810 this.cbServer.shutdown();
811 this.cbServer = null;
812 } catch (Throwable e) {
813 e.printStackTrace();
814 log.warning(e.toString());
815 }
816 }
817
818 this.updateDispatcher.clear();
819
820 if (this.secPlgn != null) {
821 this.secPlgn = null;
822 }
823
824 this.connectQos = null;
825 this.connectReturnQos = null;
826 this.disconnectInProgress = false;
827 this.msgErrorHandler = null;
828 this.updateListener = null;
829
830 this.streamingCb = null;
831
832 super.glob.shutdown();
833
834 this.shutdown = true;
835 return true;
836 }
837
838 /**
839 * @return true if shutdown was called, typically by disconnect()
840 */
841 public boolean isShutdown() {
842 return this.shutdown;
843 }
844
845 /**
846 * Access the callback server.
847 * @return null if no callback server is established
848 */
849 public I_CallbackServer getCbServer() {
850 return this.cbServer;
851 }
852
853 /**
854 * Create a descriptive ME, for logging only
855 * @return e.g. "/node/heron/client/joe/3" or "UNKNOWN_SESSION" if connect() was not successful
856 */
857 public String getId() {
858 SessionName sessionName = getSessionName();
859 return (sessionName == null) ? "UNKNOWN_SESSION" : sessionName.getAbsoluteName();
860 }
861
862 /**
863 * Useful as a logging prefix.
864 * @return For example "client/TheDesperate/-6: "
865 */
866 public String getLogId() {
867 SessionName sessionName = getSessionName();
868 return (sessionName == null) ? "" : sessionName.getRelativeName() + ": ";
869 }
870
871 /**
872 * The public session ID of this login session.
873 */
874 public SessionName getSessionName() {
875 if (this.connectReturnQos != null)
876 return this.connectReturnQos.getSessionName();
877 if (this.connectQos != null) {
878 SessionName sessionName = this.connectQos.getSessionName();
879 if (sessionName != null && sessionName.getNodeIdStr() == null && this.serverNodeId != null) {
880 // In cluster setup the remote cluster node id is forced
881 SessionName sn = new SessionName(glob, new NodeId(this.serverNodeId), sessionName.getLoginName(),
882 sessionName.getPublicSessionId());
883 // log.info("Using sessionName=" + sn.getAbsoluteName());
884 this.connectQos.setSessionName(sn);
885 return sn;
886 }
887 return sessionName;
888 }
889 return null;
890 }
891
892 /**
893 * @see I_XmlBlasterAccess#getStorageIdStr()
894 */
895 public String getStorageIdStr() {
896 return this.storageIdPrefix;
897 }
898
899 /**
900 * @see I_XmlBlasterAccess#setStorageIdStr(String)
901 */
902 public void setStorageIdStr(String prefix) {
903 this.storageIdPrefix = Global.getStrippedString(prefix);
904 }
905
906
907 /**
908 * Allows to set the node name for nicer logging.
909 * Typically used by cluster clients and not by ordinary clients
910 * @param serverNodeId For example "/node/heron/instanceId/1233435" or "/node/heron"
911 */
912 public void setServerNodeId(String nodeId) {
913 if (nodeId == null) return;
914 if (nodeId.startsWith("/node") || nodeId.startsWith("/xmlBlaster/node"))
915 this.serverNodeId = nodeId;
916 else
917 this.serverNodeId = "/node/" + nodeId;
918 }
919
920 /**
921 * The cluster node id (name) to which we want to connect.
922 * <p />
923 * Needed for client queue storage identifier. see: setStorageIdStr()
924 * <p />
925 * for nicer logging when running in a cluster.<br />
926 * Is configurable with "-server.node.id golan" until a successful connect
927 *
928 * @return e.g. "/node/golan" or /xmlBlaster/node/heron"
929 */
930 public String getServerNodeId() {
931 if (this.contextNode != null) return this.contextNode.getParent(ContextNode.CLUSTER_MARKER_TAG).getAbsoluteName();
932 if (this.serverNodeId != null) return this.serverNodeId;
933 return this.glob.getInstanceId(); // Changes for each restart
934 }
935
936 /**
937 * Set my identity.
938 * @param serverNodeId For example "/node/heron/instanceId/1233435" or "/node/heron"
939 */
940 private void setContextNodeId(String nodeId) {
941 // Not for cluster with given serverNodeId: It is invariant
942 if (this.serverNodeId != null)
943 nodeId = this.serverNodeId;
944
945 if (nodeId == null) return;
946 if (nodeId.indexOf("/") == -1) nodeId = "/node/"+nodeId; // add CLUSTER_MARKER_TAG to e.g. "/node/avalon.mycomp.com"
947
948 String oldClusterObjectName = ""; // e.g. "org.xmlBlaster:nodeClass=node,node=clientSUB1"
949 String oldServerNodeInstanceName = ""; // e.g. "clientSUB1"
950 ContextNode clusterContext = null;
951 if (this.contextNode != null) {
952 // same instance as glob.getContextNode():
953 clusterContext = this.contextNode.getParent(ContextNode.CLUSTER_MARKER_TAG);
954 oldServerNodeInstanceName = clusterContext.getInstanceName();
955 oldClusterObjectName = clusterContext.getAbsoluteName(ContextNode.SCHEMA_JMX);
956 }
957
958 // Verify the publicSessionId ...
959 if (this.glob.supportJmx()) {
960 try {
961 if (this.mbeanHandle != null && this.jmxPublicSessionId != getPublicSessionId()) {
962 /*int count = */this.glob.getJmxWrapper().renameMBean(this.mbeanHandle.getObjectInstance().getObjectName().toString(),
963 ContextNode.SESSION_MARKER_TAG, ""+getPublicSessionId());
964 this.mbeanHandle.getContextNode().setInstanceName(""+getPublicSessionId());
965 this.jmxPublicSessionId = getPublicSessionId();
966 }
967 if (this.mbeanHandle == null &&
968 this.contextNode != null &&
969 !this.contextNode.getInstanceName().equals(""+getPublicSessionId())) {
970 this.contextNode.setInstanceName(""+getPublicSessionId());
971 }
972 }
973 catch (XmlBlasterException e) {
974 log.warning(getLogId()+"Ignoring problem during JMX session registration: " + e.toString());
975 }
976 }
977 else {
978 this.jmxPublicSessionId = getPublicSessionId();
979 }
980
981 // parse new cluster node name ...
982 ContextNode tmp = ContextNode.valueOf(nodeId);
983 ContextNode tmpClusterContext = (tmp==null)?null:tmp.getParent(ContextNode.CLUSTER_MARKER_TAG);
984 if (tmpClusterContext == null) {
985 log.severe(getLogId()+"Ignoring unknown serverNodeId '" + nodeId + "'");
986 return;
987 }
988 String newServerNodeInstanceName = tmpClusterContext.getInstanceName(); // e.g. "heron"
989
990 if (oldServerNodeInstanceName.equals(newServerNodeInstanceName)) {
991 return; // nothing to do, same cluster name
992 }
993
994 this.glob.getContextNode().setInstanceName(newServerNodeInstanceName);
995 if (clusterContext == null) {
996 clusterContext = this.glob.getContextNode();
997 String ln = getLoginName();
998 if (ln != null && ln.length() > 0) {
999 String instanceName = this.glob.validateJmxValue(ln);
1000 ContextNode contextNodeSubject = new ContextNode(ContextNode.CONNECTION_MARKER_TAG, instanceName, clusterContext);
1001 this.contextNode = new ContextNode(ContextNode.SESSION_MARKER_TAG, ""+getPublicSessionId(), contextNodeSubject);
1002 }
1003 }
1004 else {
1005 clusterContext.setInstanceName(newServerNodeInstanceName);
1006 }
1007
1008 this.glob.setScopeContextNode(this.contextNode);
1009
1010 if (this.glob.supportJmx()) {
1011 try {
1012 // Query all "org.xmlBlaster:nodeClass=node,node=clientSUB1" + ",*" sub-nodes and replace the name by "heron"
1013 // For example our connectionQueue or our plugins like Pop3Driver
1014 if (oldClusterObjectName.length() > 0) {
1015 int num = this.glob.getJmxWrapper().renameMBean(oldClusterObjectName, ContextNode.CLUSTER_MARKER_TAG, this.contextNode);
1016 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Renamed " + num + " jmx nodes to new '" + nodeId + "'");
1017 }
1018
1019 if (this.mbeanHandle == null && this.contextNode != null) { // "org.xmlBlaster:nodeClass=node,node=heron"
1020 this.mbeanHandle = this.glob.registerMBean(this.contextNode, this);
1021 }
1022 }
1023 catch (XmlBlasterException e) {
1024 log.warning(getLogId()+"Ignoring problem during JMX registration: " + e.toString());
1025 }
1026 }
1027
1028 setCheckpointContext(getLogId());
1029 }
1030
1031 private void setCheckpointContext(String id) {
1032 if (id == null || id.length() < 1) {
1033 this.checkPointContext = null;
1034 return;
1035 }
1036 this.checkPointContext = new String[] { "sessionName", id };
1037 }
1038
1039 /**
1040 * Put the given message entry into the queue
1041 */
1042 private Object queueMessage(MsgQueueEntry entry) throws XmlBlasterException {
1043 try {
1044 final I_Checkpoint cp = glob.getCheckpointPlugin();
1045 if (cp != null) {
1046 cp.passingBy(I_Checkpoint.CP_CONNECTION_PUBLISH_ENTER, entry.getMsgUnit(), null, this.checkPointContext);
1047 }
1048 this.clientQueue.put(entry, I_Queue.USE_PUT_INTERCEPTOR);
1049 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Forwarded one '" + entry.getEmbeddedType() + "' message, current state is " + getState().toString());
1050 return entry.getReturnObj();
1051 }
1052 catch (XmlBlasterException e) {
1053 if (log.isLoggable(Level.FINE)) log.fine(e.getMessage());
1054 throw e;
1055 }
1056 catch (Throwable e) {
1057 if (log.isLoggable(Level.FINE)) log.fine(e.toString());
1058 XmlBlasterException xmlBlasterException = XmlBlasterException.convert(glob,null,null,e);
1059 //msgErrorHandler.handleError(new MsgErrorInfo(glob, entry, null, xmlBlasterException));
1060 throw xmlBlasterException; // internal errors or not in failsafe mode: throw back to client
1061 }
1062 }
1063
1064 /**
1065 * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos)
1066 */
1067 public SubscribeReturnQos subscribe(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException {
1068 return subscribe(new SubscribeKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),
1069 new SubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)) );
1070 }
1071
1072 /**
1073 * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos)
1074 */
1075 public SubscribeReturnQos subscribe(SubscribeKey subscribeKey, SubscribeQos subscribeQos) throws XmlBlasterException {
1076 if (!this.isValid)
1077 throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "subscribe");
1078 if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1079 if (getSessionName().isPubSessionIdUser() &&
1080 subscribeQos.getData().getMultiSubscribe()==false &&
1081 !subscribeQos.getData().hasSubscriptionId()) {
1082 // For failsave clients we generate on client side the subscriptionId
1083 // In case of offline/clientSideQueued operation we guarantee like this a not changing
1084 // subscriptionId and the client code can reliably use the subscriptionId for further dispatching
1085 // of update() messages.
1086 subscribeQos.getData().generateSubscriptionId(getSessionName(), subscribeKey.getData());
1087 }
1088 MsgQueueSubscribeEntry entry = new MsgQueueSubscribeEntry(glob,
1089 this.clientQueue.getStorageId(), subscribeKey.getData(), subscribeQos.getData());
1090 return (SubscribeReturnQos)queueMessage(entry);
1091 }
1092
1093 /**
1094 * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos, I_Callback)
1095 */
1096 public SubscribeReturnQos subscribe(java.lang.String xmlKey, java.lang.String qos, I_Callback cb) throws XmlBlasterException {
1097 return subscribe(new SubscribeKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),
1098 new SubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)),
1099 cb );
1100 }
1101
1102 /**
1103 * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos, I_Callback)
1104 */
1105 public SubscribeReturnQos subscribe(SubscribeKey subscribeKey, SubscribeQos subscribeQos, I_Callback cb) throws XmlBlasterException {
1106 if (!this.isValid)
1107 throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "subscribe");
1108 if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1109 if (this.updateListener == null) {
1110 String text = "No callback listener is registered. " +
1111 " Please use XmlBlasterAccess.connect() with default I_Callback given.";
1112 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, text);
1113 }
1114
1115 // sync subscribe & put against update()'s check for entry
1116 // otherwise if the update was faster then the subscribe to return we miss the entry
1117 synchronized (this.updateDispatcher) {
1118 SubscribeReturnQos subscribeReturnQos = subscribe(subscribeKey, subscribeQos);
1119 this.updateDispatcher.addCallback(subscribeReturnQos.getSubscriptionId(), cb, subscribeQos.getPersistent());
1120 if (!subscribeReturnQos.isFakedReturn()) {
1121 this.updateDispatcher.ackSubscription(subscribeReturnQos.getSubscriptionId());
1122 }
1123 return subscribeReturnQos;
1124 }
1125 }
1126
1127 /**
1128 * @see I_XmlBlasterAccess#get(GetKey, GetQos)
1129 */
1130 public MsgUnit[] get(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException {
1131 return get(new GetKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),
1132 new GetQos(glob, glob.getQueryQosFactory().readObject(qos)) );
1133 }
1134
1135 /**
1136 * @see I_XmlBlasterAccess#getCached(GetKey, GetQos)
1137 */
1138 public MsgUnit[] getCached(GetKey getKey, GetQos getQos) throws XmlBlasterException {
1139 if (!this.isValid)
1140 throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "getCached");
1141 if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1142 if (this.synchronousCache == null) { //Is synchronousCache installed?
1143 throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME,
1144 "Can't handle getCached(), please install a cache with createSynchronousCache() first");
1145 }
1146
1147 MsgUnit[] msgUnitArr = null;
1148 msgUnitArr = this.synchronousCache.get(getKey, getQos);
1149 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"CacheDump: msgUnitArr=" + msgUnitArr + ": '" + getKey.toXml().trim() + "' \n" + getQos.toXml() + this.synchronousCache.toXml(""));
1150 //not found in this.synchronousCache
1151 if(msgUnitArr == null) {
1152 msgUnitArr = get(getKey, getQos); //get messages from xmlBlaster (synchronous)
1153 SubscribeKey subscribeKey = new SubscribeKey(glob, getKey.getData());
1154 SubscribeQos subscribeQos = new SubscribeQos(glob, getQos.getData());
1155 SubscribeReturnQos subscribeReturnQos = null;
1156 synchronized (this.synchronousCache) {
1157 subscribeReturnQos = subscribe(subscribeKey, subscribeQos); //subscribe to this messages (asynchronous)
1158 this.synchronousCache.newEntry(subscribeReturnQos.getSubscriptionId(), getKey, msgUnitArr); //fill messages to this.synchronousCache
1159 }
1160 log.info(getLogId()+"New entry in this.synchronousCache created (subscriptionId="+subscribeReturnQos.getSubscriptionId()+")");
1161 }
1162 return msgUnitArr;
1163 }
1164
1165 /**
1166 * @see I_XmlBlasterAccess#get(GetKey, GetQos)
1167 */
1168 public MsgUnit[] get(GetKey getKey, GetQos getQos) throws XmlBlasterException {
1169 if (!this.isValid)
1170 throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "get");
1171 if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1172 MsgQueueGetEntry entry = new MsgQueueGetEntry(glob,
1173 this.clientQueue.getStorageId(), getKey, getQos);
1174 MsgUnit[] arr = (MsgUnit[])queueMessage(entry);
1175 return (arr == null) ? new MsgUnit[0] : arr;
1176 }
1177
1178 /**
1179 * @see I_XmlBlasterAccess#unSubscribe(UnSubscribeKey, UnSubscribeQos)
1180 */
1181 public UnSubscribeReturnQos[] unSubscribe(UnSubscribeKey unSubscribeKey, UnSubscribeQos unSubscribeQos) throws XmlBlasterException {
1182 if (!this.isValid)
1183 throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "unSubscribe");
1184 if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1185 MsgQueueUnSubscribeEntry entry = new MsgQueueUnSubscribeEntry(glob,
1186 this.clientQueue.getStorageId(), unSubscribeKey, unSubscribeQos);
1187 UnSubscribeReturnQos[] arr = (UnSubscribeReturnQos[])queueMessage(entry);
1188 this.updateDispatcher.removeCallback(unSubscribeKey.getOid());
1189 return (arr == null) ? new UnSubscribeReturnQos[0] : arr;
1190 }
1191
1192 /**
1193 * @see I_XmlBlasterAccess#unSubscribe(UnSubscribeKey, UnSubscribeQos)
1194 */
1195 public UnSubscribeReturnQos[] unSubscribe(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException {
1196 return unSubscribe(new UnSubscribeKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),
1197 new UnSubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)) );
1198 }
1199
1200 /**
1201 * @see I_XmlBlasterAccess#publish(MsgUnit)
1202 */
1203 public PublishReturnQos publish(MsgUnit msgUnit) throws XmlBlasterException {
1204 if (!this.isValid)
1205 throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publish");
1206 if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1207 MsgQueuePublishEntry entry = new MsgQueuePublishEntry(glob, msgUnit, this.clientQueue.getStorageId());
1208 return (PublishReturnQos)queueMessage(entry);
1209 }
1210
1211 /**
1212 * @see I_XmlBlasterAccess#publishOneway(MsgUnit[])
1213 */
1214 public void publishOneway(org.xmlBlaster.util.MsgUnit [] msgUnitArr) throws XmlBlasterException {
1215 if (!this.isValid)
1216 throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publishOneway");
1217 if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1218 final boolean ONEWAY = true;
1219 for (int ii=0; ii<msgUnitArr.length; ii++) {
1220 MsgQueuePublishEntry entry = new MsgQueuePublishEntry(glob, msgUnitArr[ii],
1221 this.clientQueue.getStorageId(), ONEWAY);
1222 queueMessage(entry);
1223 }
1224 }
1225
1226 // rename to publish()
1227 public PublishReturnQos[] publishArr(org.xmlBlaster.util.MsgUnit[] msgUnitArr) throws XmlBlasterException {
1228 if (!this.isValid)
1229 throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publishArr");
1230 if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1231 if (this.firstWarn) {
1232 log.warning(getLogId()+"Publishing arrays is not atomic implemented - TODO");
1233 this.firstWarn = false;
1234 }
1235 PublishReturnQos[] retQos = new PublishReturnQos[msgUnitArr.length];
1236 for (int ii=0; ii<msgUnitArr.length; ii++) {
1237 MsgQueuePublishEntry entry = new MsgQueuePublishEntry(glob, msgUnitArr[ii],
1238 this.clientQueue.getStorageId());
1239 retQos[ii] = (PublishReturnQos)queueMessage(entry);
1240 }
1241 return retQos;
1242 }
1243
1244 /**
1245 * @see I_XmlBlasterAccess#erase(EraseKey, EraseQos)
1246 */
1247 public EraseReturnQos[] erase(EraseKey eraseKey, EraseQos eraseQos) throws XmlBlasterException {
1248 if (!this.isValid)
1249 throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "erase");
1250 if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1251 MsgQueueEraseEntry entry = new MsgQueueEraseEntry(glob,
1252 this.clientQueue.getStorageId(), eraseKey, eraseQos);
1253 EraseReturnQos[] arr = (EraseReturnQos[])queueMessage(entry);
1254 return (arr == null) ? new EraseReturnQos[0] : arr;
1255 }
1256
1257 /**
1258 * @see I_XmlBlasterAccess#erase(EraseKey, EraseQos)
1259 */
1260 public EraseReturnQos[] erase(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException {
1261 return erase(new EraseKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),
1262 new EraseQos(glob, glob.getQueryQosFactory().readObject(qos)) );
1263 }
1264
1265 /**
1266 * For example called by SOCKET layer (SocketCallbackImpl.java) on EOF.
1267 * Does immediate ping to go to polling mode
1268 * @param xmlBlasterException
1269 * @see org.xmlBlaster.client.I_CallbackExtended#lostConnection(XmlBlasterException)
1270 */
1271 public void lostConnection(XmlBlasterException xmlBlasterException) {
1272 if (log.isLoggable(Level.FINE)) log.fine("Communication layer lost connection: " + ((xmlBlasterException==null)?"":xmlBlasterException.toString()));
1273 this.dispatchManager.pingCallbackServer(false, true);
1274 }
1275
1276 /**
1277 * Force a async ping to re-check connection to server. Status change can be
1278 * got asynchronously via registerConnectionListener()
1279 */
1280 public void ping() {
1281 this.dispatchManager.pingCallbackServer(false, false);
1282 }
1283
1284 /**
1285 * This is the callback method invoked from xmlBlaster
1286 * delivering us a new asynchronous message.
1287 * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
1288 */
1289 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
1290 if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Entering update(updateKey=" + updateKey.getOid() +
1291 ", subscriptionId=" + updateQos.getSubscriptionId() + ", " + ((this.synchronousCache != null) ? "using synchronousCache" : "no synchronousCache") + ") ...");
1292
1293 if (this.synchronousCache != null) {
1294 boolean retVal;
1295 synchronized (this.synchronousCache) {
1296 retVal = this.synchronousCache.update(updateQos.getSubscriptionId(), updateKey, content, updateQos);
1297 }
1298 if (retVal) {
1299 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Putting update message " + updateQos.getSubscriptionId() + " into cache");
1300 return Constants.RET_OK; // "<qos><state id='OK'/></qos>";
1301 }
1302 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Update message " + updateQos.getSubscriptionId() + " is not for cache");
1303 }
1304
1305 Object obj = null;
1306 // sync against subscribe & put
1307 // otherwise if the update was faster then the subscribe to return we miss the entry
1308 synchronized (this.updateDispatcher) {
1309 obj = this.updateDispatcher.getCallback(updateQos.getSubscriptionId());
1310 }
1311
1312 if (obj != null) { // If a special callback was specified for this subscription:
1313 I_Callback cb = (I_Callback)obj;
1314 return cb.update(cbSessionId, updateKey, content, updateQos); // deliver the update to our client
1315 }
1316 else if (this.updateListener != null) {
1317 // If a general callback was specified on login:
1318 return this.updateListener.update(cbSessionId, updateKey, content, updateQos); // deliver the update to our client
1319 }
1320 else {
1321 log.severe(getLogId()+"Ignoring unexpected update message as client has not registered a callback: " + updateKey.toXml() + "" + updateQos.toXml());
1322 }
1323
1324 return Constants.RET_OK; // "<qos><state id='OK'/></qos>";
1325 }
1326
1327 /**
1328 * Call by DispatchManager on connection state transition.
1329 * <p />
1330 * Enforced by interface I_ConnectionStatusListener
1331 */
1332 public void toAlive(I_DispatchManager dispatchManager, ConnectionStateEnum oldState) {
1333 if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE + " connectInProgress=" + this.connectInProgress);
1334 if (this.clientQueue != null && this.clientQueue.getNumOfEntries() > 0) {
1335 log.info(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE +
1336 " connectInProgress=" + this.connectInProgress +
1337 " with " + this.clientQueue.getNumOfEntries() + " client side queued messages");
1338 }
1339 if (this.connectInProgress) {
1340 dispatchManager.trySyncMode(true);
1341 if (this.clientQueue != null && this.clientQueue.getNumOfEntries() > 0) {
1342 try {
1343 MsgQueueEntry entry = (MsgQueueEntry)this.clientQueue.peek();
1344 if (entry.getMethodName() == MethodName.CONNECT) {
1345 this.clientQueue.remove();
1346 log.info(getLogId()+"Removed queued connect message, our new connect has precedence");
1347 }
1348 }
1349 catch (XmlBlasterException e) {
1350 log.severe(getLogId()+"Removing connect entry in client tail back queue failed: " + e.getMessage() + "\n" + toXml());
1351 }
1352 }
1353 return;
1354 }
1355
1356 if (this.clientQueue == null || this.clientQueue.getNumOfEntries() == 0) {
1357 dispatchManager.trySyncMode(true);
1358 }
1359
1360 if (this.connectReturnQos == null || !this.connectReturnQos.isReconnected()) {
1361 cleanupForNewServer();
1362 }
1363
1364 if (this.connectionListener != null) {
1365 this.connectionListener.reachedAlive(oldState, this);
1366 }
1367 }
1368
1369 public void toAliveSync(I_DispatchManager dispatchManager, ConnectionStateEnum oldState) {
1370 if (this.connectionListener != null) {
1371 this.connectionListener.reachedAliveSync(oldState, this);
1372 }
1373 }
1374
1375 /**
1376 * If we have reconnected to xmlBlaster and the xmlBlaster server instance
1377 * is another one which does not know our session state and subscribes we need to clear all
1378 * cached subscribes etc.
1379 */
1380 private void cleanupForNewServer() {
1381 if (this.updateDispatcher.size() > 0) {
1382 int num = this.updateDispatcher.clearAckNonPersistentSubscriptions(); // to avoid memory leaks, subscribes pending in the queue are not cleared
1383 if (num > 0) {
1384 log.info(getLogId()+"Removed " + num + " subscribe specific callback registrations");
1385 }
1386 // TODO: On switch to sync delivery and the client has
1387 // cleared subscribes from the queue manually we have still a memory leak here:
1388 // We would need to call clearNAKSubscriptions()
1389 }
1390 if (this.synchronousCache != null) {
1391 this.synchronousCache.clear(); // we need to re-subscribe
1392 }
1393 }
1394
1395 /**
1396 * Call by DispatchManager on connection state transition.
1397 * <p />
1398 * Enforced by interface I_ConnectionStatusListener
1399 */
1400 public void toPolling(I_DispatchManager dispatchManager, ConnectionStateEnum oldState) {
1401 if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING + " connectInProgress=" + this.connectInProgress);
1402 if (this.connectInProgress) return;
1403 if (this.connectionListener != null) {
1404 this.connectionListener.reachedPolling(oldState, this);
1405 }
1406 }
1407
1408 /**
1409 * Workaround to transport the reason for the toDead() transition as
1410 * the interface {@link I_ConnectionStateListener#reachedDead(ConnectionStateEnum, I_XmlBlasterAccess) is missing
1411 * to pass the exception to the client.
1412 * <p>
1413 * Currently the client needs a downcast to XmlBlasterAccess (not in I_XmlBlasterAccess)
1414 * @return Can be null
1415 */
1416 public XmlBlasterException getToDeadXmlBlasterException() {
1417 return toDeadXmlBlasterException;
1418 }
1419
1420 /**
1421 * Call by DispatchManager on connection state transition.
1422 * <p>Enforced by interface I_ConnectionStatusListener</p>
1423 */
1424 public void toDead(I_DispatchManager dispatchManager, ConnectionStateEnum oldState, XmlBlasterException xmlBlasterException) {
1425 if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD + " connectInProgress=" + this.connectInProgress);
1426 if (this.connectionListener != null) {
1427 this.toDeadXmlBlasterException = xmlBlasterException; // hack, description see #getToDeadXmlBlasterException
1428 this.connectionListener.reachedDead(oldState, this);
1429 }
1430 }
1431
1432 /**
1433 * Access the environment settings of this connection.
1434 * <p>Enforced by interface I_XmlBlasterAccess</p>
1435 * @return The global handle (like a stack with local variables for this connection)
1436 */
1437 public Global getGlobal() {
1438 return this.glob;
1439 }
1440
1441 /**
1442 * <p>Enforced by interface I_ConnectionHandler</p>
1443 * @return The queue used to store tailback messages.
1444 */
1445 public I_Queue getQueue() {
1446 return this.clientQueue;
1447 }
1448
1449 /**
1450 * <p>Enforced by interface I_ConnectionHandler</p>
1451 * @return The current state of the connection
1452 */
1453 public ConnectionStateEnum getState() {
1454 if (!isConnected()) return ConnectionStateEnum.UNDEF;
1455 return this.dispatchManager.getDispatchConnectionsHandler().getState();
1456 }
1457
1458 /**
1459 * Get the connection state.
1460 * String version for JMX access.
1461 * @return "UNDEF", "ALIVE", "POLLING", "DEAD"
1462 */
1463 public String getConnectionState() {
1464 return getState().toString();
1465 }
1466
1467 /**
1468 * <p>Enforced by interface I_ConnectionHandler</p>
1469 * @return true if the connection to xmlBlaster is operational
1470 */
1471 public boolean isAlive() {
1472 if (!isConnected()) return false;
1473 return this.dispatchManager.getDispatchConnectionsHandler().isAlive();
1474 }
1475
1476 /**
1477 * <p>Enforced by interface I_ConnectionHandler</p>
1478 * @return true if we are polling for the server
1479 */
1480 public boolean isPolling() {
1481 if (!isConnected()) return false;
1482 return this.dispatchManager.getDispatchConnectionsHandler().isPolling();
1483 }
1484
1485 /**
1486 * <p>Enforced by interface I_ConnectionHandler</p>
1487 * @return true if we have definitely lost the connection to xmlBlaster and gave up
1488 */
1489 public boolean isDead() {
1490 if (!isConnected()) return false;
1491 return this.dispatchManager.getDispatchConnectionsHandler().isDead();
1492 }
1493
1494 /**
1495 * Access the returned QoS of a connect() call.
1496 * <p>Enforced by interface I_XmlBlasterAccess</p>
1497 * @return Can be null if not connected
1498 */
1499 public ConnectReturnQos getConnectReturnQos() {
1500 return this.connectReturnQos;
1501 }
1502
1503 /**
1504 * Access the current ConnectQos
1505 * <p>Enforced by interface I_XmlBlasterAccess</p>
1506 * @return Can be null if not connected
1507 */
1508 public ConnectQos getConnectQos() {
1509 return this.connectQos;
1510 }
1511
1512 /**
1513 * @return null if no callback is configured
1514 */
1515 public final DispatchStatistic getDispatchStatistic() {
1516 if (this.statistic == null) {
1517 synchronized (this) {
1518 if (this.statistic == null) {
1519 if (this.dispatchManager != null)
1520 this.statistic = this.dispatchManager.getDispatchStatistic();
1521 else
1522 this.statistic = new DispatchStatistic();
1523 }
1524 }
1525 }
1526 return this.statistic;
1527 }
1528
1529 /**
1530 * Access the login name.
1531 * @return your login name or null if you are not logged in
1532 */
1533 public synchronized final String getLoginName() {
1534 SessionName sn = getSessionName();
1535 if (sn == null) return "xmlBlasterClient";
1536 return sn.getLoginName();
1537 /*
1538 //if (this.connectReturnQos != null)
1539 // return this.connectReturnQos.getLoginName();
1540 //try {
1541 if (connectQos != null && connectQos.getSecurityQos() != null) {
1542 String nm = connectQos.getSecurityQos().getUserId();
1543 if (nm != null && nm.length() > 0)
1544 return nm;
1545 }
1546 //}
1547 //catch (XmlBlasterException e) {}
1548 return glob.getId(); // "client?";
1549 */
1550 }
1551
1552 public final boolean isCallbackConfigured() {
1553 return (this.cbServer != null);
1554 }
1555
1556 public final long getUptime() {
1557 return (System.currentTimeMillis() - this.startupTime)/1000L;
1558 }
1559
1560 public final String getLoginDate() {
1561 long ll = this.startupTime;
1562 java.sql.Timestamp tt = new java.sql.Timestamp(ll);
1563 return tt.toString();
1564 }
1565
1566 public synchronized final long getPublicSessionId() {
1567 SessionName sn = getSessionName();
1568 if (sn == null) return 0;
1569 return sn.getPublicSessionId();
1570 }
1571
1572 public final long getNumPublish() {
1573 return getDispatchStatistic().getNumPublish();
1574 }
1575
1576 public final long getNumSubscribe() {
1577 return getDispatchStatistic().getNumSubscribe();
1578 }
1579
1580 public final long getNumUnSubscribe() {
1581 return getDispatchStatistic().getNumUnSubscribe();
1582 }
1583
1584 public final long getNumGet() {
1585 return getDispatchStatistic().getNumGet();
1586 }
1587
1588 public final long getNumErase() {
1589 return getDispatchStatistic().getNumErase();
1590 }
1591
1592 public final long getNumUpdateOneway() {
1593 return getDispatchStatistic().getNumUpdateOneway();
1594 }
1595
1596 public final long getNumUpdate() {
1597 return getDispatchStatistic().getNumUpdate();
1598 }
1599
1600 public synchronized final long getConnectionQueueNumMsgs() {
1601 if (this.clientQueue == null) return 0L;
1602 return this.clientQueue.getNumOfEntries();
1603 }
1604
1605 public synchronized final long getConnectionQueueMaxMsgs() {
1606 if (this.clientQueue == null) return 0L;
1607 return this.clientQueue.getMaxNumOfEntries();
1608 }
1609
1610 public final long getPingRoundTripDelay() {
1611 return getDispatchStatistic().getPingRoundTripDelay();
1612 }
1613
1614 public final long getRoundTripDelay() {
1615 return getDispatchStatistic().getRoundTripDelay();
1616 }
1617
1618 /** JMX **/
1619 public String invokePublish(String key, String content, String qos) throws Exception {
1620 if (key == null || key.length()==0 || key.equalsIgnoreCase("String"))
1621 throw new IllegalArgumentException("Please pass a valid XML key like '<key oid='Hello'/> or the simple oid like 'Hello'");
1622 if (key.indexOf("<") == -1) {
1623 key = "<key oid='" + key + "'/>";
1624 }
1625 qos = checkQueryKeyQos(key, qos);
1626 if (content == null) content = "";
1627 try {
1628 MsgUnit msgUnit = new MsgUnit(key, content, qos);
1629 PublishReturnQos prq = publish(msgUnit);
1630 return prq.toString();
1631 }
1632 catch (XmlBlasterException e) {
1633 throw new Exception(e.toString());
1634 }
1635 }
1636
1637 private String checkQueryKeyQos(String url, String qos) {
1638 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"url=" + url + " qos=" + qos);
1639 if (url == null || url.length()==0 || url.equalsIgnoreCase("String"))
1640 throw new IllegalArgumentException("Please pass a valid URL like 'xpath://key' or a simple oid like 'Hello'");
1641 if (qos == null || qos.length()==0 || qos.equalsIgnoreCase("String")) qos = "<qos/>";
1642 return qos;
1643 }
1644
1645 /** JMX **/
1646 public String[] invokeUnSubscribe(String url, String qos) throws Exception {
1647 qos = checkQueryKeyQos(url, qos);
1648 try {
1649 UnSubscribeKey usk = new UnSubscribeKey(glob, url);
1650 UnSubscribeReturnQos[] usrq = unSubscribe(usk, new UnSubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)));
1651 if (usrq == null) return new String[0];
1652 String[] ret = new String[usrq.length];
1653 if (ret.length < 1) {
1654 return new String[] { "unSubscribe '"+url+"' did not match any subscription" };
1655 }
1656 for (int i=0; i<usrq.length; i++) {
1657 ret[i] = usrq[i].toXml();
1658 }
1659 return ret;
1660 }
1661 catch (XmlBlasterException e) {
1662 throw new Exception(e.toString());
1663 }
1664 }
1665
1666 /** JMX **/
1667 public String invokeSubscribe(String url, String qos) throws Exception {
1668 qos = checkQueryKeyQos(url, qos);
1669 try {
1670 SubscribeKey usk = new SubscribeKey(glob, url);
1671 SubscribeReturnQos srq = subscribe(usk, new SubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)));
1672 if (srq == null) return "";
1673 return srq.toXml();
1674 }
1675 catch (XmlBlasterException e) {
1676 throw new Exception(e.toString());
1677 }
1678 }
1679
1680 /** JMX **/
1681 public String[] invokeGet(String url, String qos) throws Exception {
1682 qos = checkQueryKeyQos(url, qos);
1683 try {
1684 GetKey gk = new GetKey(glob, url);
1685 MsgUnit[] msgs = get(gk, new GetQos(glob, glob.getQueryQosFactory().readObject(qos)));
1686 if (msgs == null) return new String[0];
1687 if (msgs == null || msgs.length < 1) {
1688 return new String[] { "get('"+url+"') did not match any topic" };
1689 }
1690 ArrayList tmpList = new ArrayList();
1691 for (int i=0; i<msgs.length; i++) {
1692 tmpList.add(" "+msgs[i].getKeyData().toXml());
1693 tmpList.add(" "+msgs[i].getContentStr());
1694 tmpList.add(" "+msgs[i].getQosData().toXml());
1695 }
1696 return (String[])tmpList.toArray(new String[tmpList.size()]);
1697 }
1698 catch (XmlBlasterException e) {
1699 throw new Exception(e.toString());
1700 }
1701 }
1702
1703 /** JMX **/
1704 public String[] invokeErase(String url, String qos) throws Exception {
1705 qos = checkQueryKeyQos(url, qos);
1706 try {
1707 EraseKey ek = new EraseKey(glob, url);
1708 EraseReturnQos[] erq = erase(ek, new EraseQos(glob, glob.getQueryQosFactory().readObject(qos)));
1709 if (erq == null) return new String[0];
1710 String[] ret = new String[erq.length];
1711 if (ret.length < 1) {
1712 return new String[] { "erase('"+url+"') did not match any topic, nothing is erased." };
1713 }
1714 for (int i=0; i<erq.length; i++) {
1715 ret[i] = erq[i].toXml();
1716 }
1717 return ret;
1718 }
1719 catch (XmlBlasterException e) {
1720 throw new Exception(e.toString());
1721 }
1722 }
1723
1724 /**
1725 * Sets the DispachManager belonging to this session to active or inactive.
1726 * It is initially active. Setting it to false temporarly inhibits dispatch of
1727 * messages which are in the callback queue. Setting it to true starts the
1728 * dispatch again.
1729 * @param dispatchActive
1730 */
1731 public synchronized void setDispatcherActive(boolean dispatcherActive) {
1732 if (this.dispatchManager != null) {
1733 this.dispatchManager.setDispatcherActive(dispatcherActive);
1734 }
1735 }
1736
1737 public synchronized boolean getDispatcherActive() {
1738 if (this.dispatchManager != null) {
1739 return this.dispatchManager.isDispatcherActive();
1740 }
1741 return false;
1742 }
1743
1744 public void setCallbackDispatcherActive(boolean activate) throws XmlBlasterException {
1745 if (this.streamingCb != null && !isCallbackDispatcherActive() && activate) {
1746 int ret = this.streamingCb.sendInitialQueueEntries();
1747 log.info("locally retrieved '" + ret + "' chunks");
1748 }
1749
1750 String command = getSessionName() + "/?dispatcherActive=" + activate;
1751 sendAdministrativeCommand(command);
1752 this.connectQos.getSessionCbQueueProperty().getCurrentCallbackAddress().setDispatcherActive(activate);
1753 }
1754
1755 public boolean isCallbackDispatcherActive() throws XmlBlasterException {
1756 String command = getSessionName() + "/?dispatcherActive";
1757 boolean ret = "true".equalsIgnoreCase(sendAdministrativeCommand(command));
1758 return ret;
1759 }
1760
1761 public String sendAdministrativeCommand(String command) throws XmlBlasterException {
1762 if (command == null)
1763 throw new IllegalArgumentException("sendAdministrativeCommand() called with null argument");
1764 command = command.trim();
1765 boolean isGet = command.indexOf("get ") == 0 || command.indexOf("GET ") == 0;
1766 boolean isSet = command.indexOf("set ") == 0 || command.indexOf("SET ") == 0;
1767 String cmd = ((isGet || isSet)) ? command.substring(4) : command;
1768
1769 if (isSet || (!isGet && cmd.indexOf("=") != -1)) {
1770 String oid = "__cmd:" + cmd;
1771 PublishKey key = new PublishKey(glob, oid); // oid="__cmd:/client/joe/1/?dispatcherActive=false"
1772 PublishQos qos = new PublishQos(glob);
1773 MsgUnit msgUnit = new MsgUnit(key, "", qos);
1774 try {
1775 PublishReturnQos ret = publish(msgUnit);
1776 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Send '" + cmd + " '");
1777 return ret.getState();
1778 }
1779 catch (XmlBlasterException e) {
1780 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Sending of '" + cmd + " ' failed: " + e.getMessage());
1781 throw e;
1782 }
1783 }
1784 else {
1785 String oid = "__cmd:" + cmd;
1786 GetKey getKey = new GetKey(glob, oid);
1787 GetQos getQos = new GetQos(glob);
1788 try {
1789 MsgUnit[] msgs = get(getKey, getQos);
1790 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Send '" + cmd + " ', got array of size " + msgs.length);
1791 if (msgs.length == 0)
1792 return "";
1793 return msgs[0].getContentStr();
1794 }
1795 catch (XmlBlasterException e) {
1796 if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Sending of '" + cmd + " ' failed: " + e.getMessage());
1797 throw e;
1798 }
1799 }
1800 }
1801
1802 public synchronized String[] peekClientMessages(int numOfEntries) throws Exception {
1803 try {
1804 if (numOfEntries == 0)
1805 return new String[] { "Please pass number of messages to peak" };
1806 if (this.clientQueue == null)
1807 return new String[] { "There is no client queue available" };
1808 if (this.clientQueue.getNumOfEntries() < 1)
1809 return new String[] { "The client queue is empty" };
1810
1811 List<I_Entry> list = this.clientQueue.peek(numOfEntries, -1);
1812
1813 if (list.size() == 0)
1814 return new String[] { "Peeking messages from client queue failed, the reason is not known" };
1815
1816 ArrayList tmpList = new ArrayList();
1817 for (int i=0; i<list.size(); i++) {
1818 MsgQueueEntry entry = (MsgQueueEntry)list.get(i);
1819 if (entry instanceof MsgQueuePublishEntry) {
1820 MsgQueuePublishEntry pe = (MsgQueuePublishEntry)entry;
1821 tmpList.add(" "+pe.getMsgUnit().getKeyData().toXml());
1822 tmpList.add(" "+pe.getMsgUnit().getContentStr());
1823 tmpList.add(" "+pe.getMsgUnit().getQosData().toXml());
1824 }
1825 else if (entry instanceof MsgQueueConnectEntry) {
1826 MsgQueueConnectEntry pe = (MsgQueueConnectEntry)entry;
1827 tmpList.add(" "+pe.getConnectQosData().toXml());
1828 }
1829 else if (entry instanceof MsgQueueDisconnectEntry) {
1830 MsgQueueDisconnectEntry pe = (MsgQueueDisconnectEntry)entry;
1831 tmpList.add(" "+pe.getDisconnectQos().toXml());
1832 }
1833 else if (entry instanceof MsgQueueEraseEntry) {
1834 MsgQueueEraseEntry pe = (MsgQueueEraseEntry)entry;
1835 tmpList.add(" "+pe.getEraseKey().toXml());
1836 tmpList.add(" "+pe.getEraseQos().toXml());
1837 }
1838 else if (entry instanceof MsgQueueGetEntry) {
1839 MsgQueueGetEntry pe = (MsgQueueGetEntry)entry;
1840 tmpList.add(" "+pe.getGetKey().toXml());
1841 tmpList.add(" "+pe.getGetQos().toXml());
1842 }
1843 else if (entry instanceof MsgQueueSubscribeEntry) {
1844 MsgQueueSubscribeEntry pe = (MsgQueueSubscribeEntry)entry;
1845 tmpList.add(" "+pe.getSubscribeKeyData().toXml());
1846 tmpList.add(" "+pe.getSubscribeQosData().toXml());
1847 }
1848 else if (entry instanceof MsgQueueUnSubscribeEntry) {
1849 MsgQueueUnSubscribeEntry pe = (MsgQueueUnSubscribeEntry)entry;
1850 tmpList.add(" "+pe.getUnSubscribeKey().toXml());
1851 tmpList.add(" "+pe.getUnSubscribeQos().toXml());
1852 }
1853 else {
1854 tmpList.add("Unsupported message queue entry '" + entry.getClass().getName() + "'");
1855 }
1856 }
1857
1858 return (String[])tmpList.toArray(new String[tmpList.size()]);
1859 }
1860 catch (XmlBlasterException e) {
1861 throw new Exception(e.toString());
1862 }
1863 }
1864
1865 /**
1866 * Peek messages from client queue and dump them to a file, they are not removed.
1867 * @param numOfEntries The number of messages to peek, taken from the front
1868 * @param path The path to dump the messages to, it is automatically created if missing.
1869 * @return The file names of the dumped messages
1870 */
1871 public synchronized String[] peekClientMessagesToFile(int numOfEntries, String path) throws Exception {
1872 try {
1873 return this.glob.peekQueueMessagesToFile(this.clientQueue, numOfEntries, path, "client");
1874 }
1875 catch (XmlBlasterException e) {
1876 throw new Exception(e.toString());
1877 }
1878 }
1879
1880 /**
1881 * Command line usage.
1882 */
1883 public static String usage(Global glob) {
1884 glob = (glob == null) ? Global.instance() : glob;
1885 StringBuffer sb = new StringBuffer(4096);
1886 sb.append("\n");
1887 sb.append("Choose a connection protocol:\n");
1888 sb.append(" -protocol Specify a protocol to talk with xmlBlaster, 'SOCKET' or 'IOR' or 'RMI' or 'SOAP' or 'XMLRPC'.\n");
1889 sb.append(" This is used for connection to xmlBlaster and for the callback connection.\n");
1890 sb.append(" Current setting is '" + glob.getProperty().get("client.protocol", "IOR") + "'. See below for protocol settings.\n");
1891 sb.append(" -dispatch/connection/protocol <protocol>\n");
1892 sb.append(" Specify the protocol to connect to xmlBlaster only (not for the callback).\n");
1893 sb.append(" -dispatch/callback/protocol <protocol>\n");
1894 sb.append(" Specify the protocol for the callback connection only.\n");
1895 sb.append(" Example: java MyApp -protocol SOCKET\n");
1896 sb.append(" java MyApp -dispatch/connection/protocol RMI -dispatch/connection/plugin/rmi/hostname 192.168.10.34\n");
1897 sb.append(" java MyApp -dispatch/connection/protocol RMI -dispatch/callback/protocol XMLRPC\n");
1898 sb.append("\n");
1899 sb.append("Security features:\n");
1900 sb.append(" -Security.Client.DefaultPlugin \"htpasswd,1.0\"\n");
1901 sb.append(" Force the given authentication schema, here the 'htpasswd' is enforced\n");
1902 sb.append(" Clients can overwrite this with ConnectQos.java\n");
1903 try {
1904 sb.append(new org.xmlBlaster.client.qos.ConnectQos(glob).usage());
1905 } catch (XmlBlasterException e) {}
1906 sb.append(new org.xmlBlaster.util.qos.address.Address(glob).usage());
1907 sb.append(new org.xmlBlaster.util.qos.storage.ClientQueueProperty(glob,null).usage());
1908 sb.append(new org.xmlBlaster.util.qos.address.CallbackAddress(glob).usage());
1909 sb.append(new org.xmlBlaster.util.qos.storage.CbQueueProperty(glob,null,null).usage());
1910 sb.append(new org.xmlBlaster.util.qos.storage.HistoryQueueProperty(glob,null).usage("Control the default size of the history queue for each topic (send with publish calls)"));
1911 sb.append(getPluginUsage("org.xmlBlaster.client.protocol.socket.SocketConnection"));
1912 sb.append(getPluginUsage("org.xmlBlaster.client.protocol.corba.CorbaConnection"));
1913 sb.append(getPluginUsage("org.xmlBlaster.client.protocol.rmi.RmiConnection"));
1914 sb.append(getPluginUsage("org.xmlBlaster.client.protocol.xmlrpc.XmlRpcConnection"));
1915 //sb.append(org.xmlBlaster.util.Global.instance().usage()); // for Logger help
1916 return sb.toString();
1917 }
1918
1919 /**
1920 * Access plugin specific usage()
1921 * @return if plugin is not in CLASSPATH return empty string
1922 */
1923 public static String getPluginUsage(String clazzName) {
1924 try {
1925 Class clazz = java.lang.Class.forName(clazzName);
1926 if (clazz != null) {
1927 Class[] paramCls = new Class[0];
1928 Object[] params = new Object[0];
1929 java.lang.reflect.Method method = clazz.getMethod("usage", paramCls);
1930 String tmp = (String)method.invoke(clazz, params);
1931 return tmp;
1932 }
1933 }
1934 catch (Exception ex) { // java.lang.ClassNotFoundException:
1935 }
1936 return "";
1937 }
1938
1939 /**
1940 * Dump state of this object into a XML ASCII string.
1941 * <br>
1942 * @return internal state of SubjectInfo as a XML ASCII string
1943 */
1944 public final String toXml() {
1945 return toXml((String)null);
1946 }
1947
1948 /**
1949 * Dump state of this object into a XML ASCII string.
1950 * <br>
1951 * @param extraOffset indenting of tags for nice output
1952 * @return internal state of SubjectInfo as a XML ASCII string
1953 */
1954 public final String toXml(String extraOffset) {
1955 StringBuffer sb = new StringBuffer(1024);
1956 if (extraOffset == null) extraOffset = "";
1957 String offset = Constants.OFFSET + extraOffset;
1958
1959 sb.append(offset).append("<XmlBlasterAccess id='").append(this.getId());
1960 if (this.dispatchManager != null && this.dispatchManager.getDispatchConnectionsHandler() != null) {
1961 sb.append("' state='").append(this.dispatchManager.getDispatchConnectionsHandler().getState());
1962 }
1963 sb.append("'>");
1964 sb.append(offset).append(" <connected>").append(isConnected()).append("</connected>");
1965 sb.append(offset).append("</XmlBlasterAccess>");
1966
1967 return sb.toString();
1968 }
1969
1970 /**
1971 * For testing invoke: java org.xmlBlaster.client.XmlBlasterAccess
1972 */
1973 public static void main( String[] args ) {
1974 try {
1975 final Global glob = new Global(args);
1976 final String oid = "HelloWorld";
1977
1978 final I_XmlBlasterAccess xmlBlasterAccess = glob.getXmlBlasterAccess();
1979
1980 /*
1981 try {
1982 log.info(ME, "Hit a key to subscribe on topic " + oid);
1983 try { System.in.read(); } catch(java.io.IOException e) {}
1984 SubscribeKey sk = new SubscribeKey(glob, oid);
1985 SubscribeQos sq = new SubscribeQos(glob);
1986 SubscribeReturnQos subRet = xmlBlasterAccess.subscribe(sk, sq);
1987 log.info(ME, "Subscribed for " + sk.toXml() + "\n" + sq.toXml() + " return:\n" + subRet.toXml());
1988 }
1989 catch(XmlBlasterException e) {
1990 log.error(ME, e.getMessage());
1991 }
1992 */
1993
1994 xmlBlasterAccess.registerConnectionListener(new I_ConnectionStateListener() {
1995 public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
1996 log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE + " with " + connection.getQueue().getNumOfEntries() + " queue entries pending");
1997 }
1998 public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
1999 log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING);
2000 }
2001 public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
2002 log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
2003 }
2004 public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
2005 log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE + " in sync");
2006 }
2007 });
2008
2009 ConnectReturnQos connectReturnQos = xmlBlasterAccess.connect(null, new I_Callback() {
2010 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
2011 log.info("UPDATE: Receiving asynchronous callback message " + updateKey.toXml() + "\n" + updateQos.toXml());
2012 return "";
2013 }
2014 }); // Login to xmlBlaster, default handler for updates
2015 if (xmlBlasterAccess.isAlive()) {
2016 log.info("Successfully connected to xmlBlaster");
2017 }
2018 else {
2019 log.info("We continue in fail safe mode: " + connectReturnQos.toXml());
2020 }
2021
2022 {
2023 log.info("Hit a key to subscribe on topic " + oid);
2024 try { System.in.read(); } catch(java.io.IOException e) {}
2025 SubscribeKey sk = new SubscribeKey(glob, oid);
2026 SubscribeQos sq = new SubscribeQos(glob);
2027 SubscribeReturnQos subRet = xmlBlasterAccess.subscribe(sk, sq);
2028 log.info("Subscribed for " + sk.toXml() + "\n" + sq.toXml() + " return:\n" + subRet.toXml());
2029
2030 log.info("Hit a key to publish '" + oid + "'");
2031 try { System.in.read(); } catch(java.io.IOException e) {}
2032 MsgUnit msgUnit = new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>");
2033 PublishReturnQos publishReturnQos = xmlBlasterAccess.publish(msgUnit);
2034 log.info("Successfully published message to xmlBlaster, msg=" + msgUnit.toXml() + "\n returned QoS=" + publishReturnQos.toXml());
2035 try { Thread.sleep(1000L); } catch( InterruptedException i) {} // wait for update
2036
2037 {
2038 log.info("Hit a key to 3 times publishOneway '" + oid + "'");
2039 try { System.in.read(); } catch(java.io.IOException e) {}
2040 MsgUnit[] msgUnitArr = new MsgUnit[] {
2041 new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>"),
2042 new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>"),
2043 new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>")
2044 };
2045 xmlBlasterAccess.publishOneway(msgUnitArr);
2046 log.info("Successfully published " + msgUnitArr.length + " messages oneway");
2047 try { Thread.sleep(1000L); } catch( InterruptedException i) {} // wait for update
2048 }
2049
2050 {
2051 log.info("Hit a key to 3 times publishArr '" + oid + "'");
2052 try { System.in.read(); } catch(java.io.IOException e) {}
2053 MsgUnit[] msgUnitArr = new MsgUnit[] {
2054 new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>"),
2055 new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>"),
2056 new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>")
2057 };
2058 PublishReturnQos[] retArr = xmlBlasterAccess.publishArr(msgUnitArr);
2059 log.info("Successfully published " + retArr.length + " acknowledged messages");
2060 try { Thread.sleep(1000L); } catch( InterruptedException i) {} // wait for update
2061 }
2062
2063 {
2064 log.info("Hit a key to get '" + oid + "'");
2065 try { System.in.read(); } catch(java.io.IOException e) {}
2066 GetKey gk = new GetKey(glob, oid);
2067 GetQos gq = new GetQos(glob);
2068 MsgUnit[] msgs = xmlBlasterAccess.get(gk, gq);
2069 log.info("Successfully got message from xmlBlaster, msg=" + msgs[0].toXml());
2070 }
2071
2072 int numGetCached = 4;
2073 xmlBlasterAccess.createSynchronousCache(100);
2074 for (int i=0; i<numGetCached; i++) {
2075 log.info("Hit a key to getCached '" + oid + "' #"+i+"/"+numGetCached);
2076 try { System.in.read(); } catch(java.io.IOException e) {}
2077 GetKey gk = new GetKey(glob, oid);
2078 GetQos gq = new GetQos(glob);
2079 MsgUnit[] msgs = xmlBlasterAccess.getCached(gk, gq);
2080 log.info("Successfully got message from xmlBlaster, msg=" + msgs[0].toXml());
2081 }
2082
2083 log.info("Hit a key to unSubscribe on topic '" + oid + "' and '" + subRet.getSubscriptionId() + "'");
2084 try { System.in.read(); } catch(java.io.IOException e) {}
2085 UnSubscribeKey uk = new UnSubscribeKey(glob, subRet.getSubscriptionId());
2086 UnSubscribeQos uq = new UnSubscribeQos(glob);
2087 UnSubscribeReturnQos[] unSubRet = xmlBlasterAccess.unSubscribe(uk, uq);
2088 log.info("UnSubscribed for " + uk.toXml() + "\n" + uq.toXml() + " return:\n" + unSubRet[0].toXml());
2089
2090 log.info("Hit a key to erase on topic " + oid);
2091 try { System.in.read(); } catch(java.io.IOException e) {}
2092 EraseKey ek = new EraseKey(glob, oid);
2093 EraseQos eq = new EraseQos(glob);
2094 EraseReturnQos[] er = xmlBlasterAccess.erase(ek, eq);
2095 log.info("Erased for " + ek.toXml() + "\n" + eq.toXml() + " return:\n" + er[0].toXml());
2096 }
2097
2098 int numPublish = 10;
2099 for (int ii=0; ii<numPublish; ii++) {
2100 log.info("Hit a key to publish #" + (ii+1) + "/" + numPublish);
2101 try { System.in.read(); } catch(java.io.IOException e) {}
2102
2103 MsgUnit msgUnit = new MsgUnit(glob, "<key oid=''/>", ("Hi #"+(ii+1)).getBytes(), "<qos><persistent>true</persistent></qos>");
2104 PublishReturnQos publishReturnQos = xmlBlasterAccess.publish(msgUnit);
2105 log.info("Successfully published message #" + (ii+1) + " to xmlBlaster, msg=" + msgUnit.toXml() + "\n returned QoS=" + publishReturnQos.toXml());
2106 }
2107
2108 log.info("Hit a key to disconnect ...");
2109 try { System.in.read(); } catch(java.io.IOException e) {}
2110 xmlBlasterAccess.disconnect(null);
2111 }
2112 catch (XmlBlasterException xmlBlasterException) {
2113 System.out.println("WARNING: Test failed: " + xmlBlasterException.getMessage());
2114 }
2115 catch (Throwable e) {
2116 e.printStackTrace();
2117 System.out.println("ERROR: Test failed: " + e.toString());
2118 }
2119 System.exit(0);
2120 }
2121
2122 /**
2123 * The implementation which receives the callback messages.
2124 * @return Returns the updateListener or null if none was registered
2125 */
2126 public I_Callback getUpdateListener() {
2127 return this.updateListener;
2128 }
2129
2130 /**
2131 * Register a listener to receive the callback messages.
2132 * <br />
2133 * Note: Usually you don't need to call this method directly
2134 * as you should pass your callback listener with connect().
2135 * @param updateListener The updateListener to set.
2136 */
2137 public void setUpdateListener(I_Callback updateListener) {
2138 this.updateListener = updateListener;
2139 }
2140
2141 public String getVersion() {
2142 return glob.getVersion();
2143 }
2144 public String getRevisionNumber() {
2145 return glob.getRevisionNumber();
2146 }
2147 public String getBuildTimestamp() {
2148 return glob.getBuildTimestamp();
2149 }
2150 public String getBuildJavaVendor() {
2151 return glob.getBuildJavaVendor();
2152 }
2153 public String getBuildJavaVersion() {
2154 return glob.getBuildJavaVersion();
2155 }
2156
2157 /**
2158 * Create a temporay topic.
2159 * You need to erase it yourself when not needed anymore
2160 * @param topicProperty Can be null (the default is no DOM entry)
2161 * @return The details about the created, temporary topic
2162 * @throws XmlBlasterException
2163 * @todo Automatically delete topic when session dies; don't allow other session to subscribe on it
2164 */
2165 public PublishReturnQos createTemporaryTopic(TopicProperty topicProperty) throws XmlBlasterException {
2166 if (topicProperty == null) {
2167 return createTemporaryTopic(-1, 10);
2168 }
2169 PublishKey pk = new PublishKey(glob, "");
2170 PublishQos pq = new PublishQos(glob);
2171 pq.setTopicProperty(topicProperty);
2172 MsgUnit msgUnit = new MsgUnit(pk, new byte[0], pq);
2173 PublishReturnQos prq = publish(msgUnit);
2174 if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Created temporary topic " + prq.getKeyOid());
2175 return prq;
2176 }
2177
2178 public PublishReturnQos createTemporaryTopic(long destroyDelay, int historyMaxMsg) throws XmlBlasterException {
2179 return createTemporaryTopic(null, destroyDelay, historyMaxMsg);
2180 }
2181
2182 /**
2183 *
2184 * @param uniqueTopicId Usually null, can be used to force a topicId.
2185 * e.g. topicIdPrefix="device.joe.request" -> the topic is something like "device.joe.request135823058558"
2186 * @param destroyDelay
2187 * @param historyMaxMsg
2188 * @return
2189 * @throws XmlBlasterException
2190 */
2191 public PublishReturnQos createTemporaryTopic(String uniqueTopicId, long destroyDelay, int historyMaxMsg) throws XmlBlasterException {
2192 if (uniqueTopicId == null) uniqueTopicId = "";
2193 PublishKey pk = new PublishKey(glob, uniqueTopicId);
2194 PublishQos pq = new PublishQos(glob);
2195 TopicProperty topicProperty = new TopicProperty(glob);
2196 topicProperty.setDestroyDelay(destroyDelay);
2197 topicProperty.setCreateDomEntry(false);
2198 topicProperty.setReadonly(false);
2199 pq.setAdministrative(true);
2200 if (historyMaxMsg >= 0L) {
2201 HistoryQueueProperty prop = new HistoryQueueProperty(this.glob, null);
2202 prop.setMaxEntries(historyMaxMsg);
2203 topicProperty.setHistoryQueueProperty(prop);
2204 }
2205 pq.setTopicProperty(topicProperty);
2206 MsgUnit msgUnit = new MsgUnit(pk, new byte[0], pq);
2207 PublishReturnQos prq = publish(msgUnit);
2208 if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Created temporary topic " + prq.getKeyOid());
2209 return prq;
2210 }
2211
2212 // TODO: add other properties, add documentation requirement
2213 // Add own class to support multiple request/reply over same temporary topic
2214 /**
2215 * @see org.xmlBlaster.test.client.TestRequestResponse
2216 */
2217 public MsgUnit[] request(MsgUnit msgUnit, long timeout, int maxEntries) throws XmlBlasterException {
2218 if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Entering request with timeout=" + timeout);
2219 if (msgUnit == null)
2220 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Please supply a valid msgUnit to request()");
2221
2222 // Create a temporary reply topic ...
2223 long destroyDelay = timeout+86400000; // on client crash, cleanup after one day; //long destroyDelay = -1;
2224 // optional, "device.joe.response" -> can be useful for performance, NOT thread safe
2225 boolean createResponseTopic = msgUnit.getQosData().getClientProperty("__createResponseTopic", true);
2226 if (createResponseTopic == false) {
2227 msgUnit.getQosData().getClientProperties().remove("__createResponseTopic");
2228 }
2229 String responseTopicId = msgUnit.getQosData().getClientProperty("__responseTopicId", "");
2230 if (responseTopicId.length() > 0) {
2231 msgUnit.getQosData().getClientProperties().remove("__responseTopicId");
2232 }
2233 else {
2234 // "device.joe.response" -> can be useful for authorization, must be distinguishable to other clients
2235 String responseTopicIdPrefix = msgUnit.getQosData().getClientProperty("__responseTopicIdPrefix", "");
2236 if (responseTopicIdPrefix.length() > 0) {
2237 responseTopicId = responseTopicIdPrefix + new Timestamp().getTimestamp(); // now thread safe for request()s in parallel
2238 msgUnit.getQosData().getClientProperties().remove("__responseTopicIdPrefix");
2239 }
2240 }
2241 if (createResponseTopic) {
2242 PublishReturnQos tempTopic = createTemporaryTopic(responseTopicId, destroyDelay, maxEntries);
2243 responseTopicId = tempTopic.getKeyOid();
2244 }
2245
2246 try {
2247 // Send the request ...
2248 // "__jms:JMSReplyTo"
2249 msgUnit.getQosData().addClientProperty(Constants.addJmsPrefix(Constants.JMS_REPLY_TO, log), responseTopicId); // "__jms:JMSReplyTo"
2250 publish(msgUnit);
2251
2252 // Access the reply ...
2253 MsgUnit[] msgs = receive("topic/"+responseTopicId, maxEntries, timeout, true);
2254
2255 return msgs;
2256 }
2257 finally {
2258 if (createResponseTopic && responseTopicId.length() == 0) {
2259 // Clean up temporary topic ...
2260 EraseKey ek = new EraseKey(glob, responseTopicId);
2261 EraseQos eq = new EraseQos(glob);
2262 eq.setForceDestroy(true);
2263 erase(ek, eq);
2264 }
2265 }
2266 }
2267
2268 public MsgUnit[] receive(String oid, int maxEntries, long timeout, boolean consumable) throws XmlBlasterException {
2269 if (oid == null || oid.length() == 0)
2270 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Please supply a valid oid to receive()");
2271
2272 ContextNode node = ContextNode.valueOf(oid);
2273 if (node == null) {
2274 throw new IllegalArgumentException("Can't parse '" + oid + "' to a ContextNode");
2275 }
2276 if (node.isOfClass(ContextNode.SESSION_MARKER_TAG))
2277 node = node.getParent();
2278 if (node == null) {
2279 throw new IllegalArgumentException("Can't parse '" + oid + "' to a ContextNode");
2280 }
2281 if (node.isOfClass(ContextNode.TOPIC_MARKER_TAG))
2282 oid = "__cmd:"+oid+"/?historyQueueEntries"; // "__cmd:topic/hello/?historyQueueEntries"
2283 else if (node.isOfClass(ContextNode.SUBJECT_MARKER_TAG) && node.getChild(ContextNode.SESSION_MARKER_TAG, null) != null)
2284 oid = "__cmd:"+oid+"/?callbackQueueEntries"; // "__cmd:client/joe/session/1/?callbackQueueEntries";
2285 else if (node.isOfClass(ContextNode.SUBJECT_MARKER_TAG))
2286 oid = "__cmd:"+oid+"/?subjectQueueEntries"; // "__cmd:client/joe/?subjectQueueEntries"
2287
2288 GetKey getKey = new GetKey(glob, oid);
2289 String qos = "<qos>" +
2290 "<querySpec type='QueueQuery'>" +
2291 "maxEntries="+maxEntries+"&maxSize=-1&consumable="+consumable+"&waitingDelay="+timeout+
2292 "</querySpec>" +
2293 "</qos>";
2294 GetQos getQos = new GetQos(glob, glob.getQueryQosFactory().readObject(qos));
2295 MsgUnit[] msgs = get(getKey, getQos);
2296 if (log.isLoggable(Level.FINEST)) log.finest(getLogId()+"Got " + msgs.length + " reply :\n" + ((msgs.length>0)?msgs[0].toXml():""));
2297 return msgs;
2298 }
2299
2300
2301 private PublishReturnQos publishSingleChunk(MsgKeyData keyData, MsgQosData chunkQosData, byte[] buf, int length, boolean isLastChunk, long count, Exception ex) throws XmlBlasterException {
2302 MsgKeyData chunkKeyData = keyData;
2303 MsgUnit msg = new MsgUnit(chunkKeyData, buf, chunkQosData);
2304 if (isLastChunk || ex != null)
2305 chunkQosData.addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_EOF, log), true);
2306 chunkQosData.addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_SEQ, log), count);
2307 if (ex != null)
2308 msg.getQosData().addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_EX, log), ex.getMessage());
2309 return publish(msg);
2310 }
2311
2312 public PublishReturnQos[] publishStream(InputStream is, MsgKeyData keyData, MsgQosData qosData, int maxBufSize, I_ReplaceContent contentReplacer) throws XmlBlasterException {
2313 String streamId = (getGlobal()).getId() + "-" + (new Timestamp()).getTimestamp();
2314 qosData.addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_ID, log), streamId);
2315 int bufSize = 0;
2316 String tmpKey = Constants.addJmsPrefix(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, log);
2317 if (qosData.getClientProperty(tmpKey) != null)
2318 bufSize = qosData.getClientProperty(tmpKey).getIntValue();
2319 if (bufSize > maxBufSize || bufSize == 0)
2320 bufSize = maxBufSize;
2321 long count = 0L;
2322 PublishReturnQos pubRetQos = null;
2323 byte[] buf = new byte[bufSize];
2324 try {
2325 while (true) {
2326 buf = new byte[bufSize];
2327 int offset = 0;
2328 int remainingLength = bufSize;
2329 int lengthRead = 0;
2330 while ((lengthRead = is.read(buf, offset, remainingLength)) != -1) {
2331 remainingLength -= lengthRead;
2332 offset += lengthRead;
2333 if (remainingLength == 0)
2334 break;
2335 }
2336 int length = offset;
2337 // cut the buffer if shorter than maximum buffer size
2338 if (length < buf.length) {
2339 byte[] tmpBuf = buf;
2340 buf = new byte[length];
2341 for (int i=0; i<buf.length;i++)
2342 buf[i] = tmpBuf[i];
2343 }
2344
2345 // We do not need to clone the key since it will not change, but the qos must be cloned
2346 MsgQosData chunkQosData = (MsgQosData)qosData.clone();
2347
2348 if (contentReplacer != null)
2349 buf = contentReplacer.replace(buf, chunkQosData.getClientProperties());
2350 boolean isLastChunk = buf.length < bufSize;
2351 pubRetQos = publishSingleChunk(keyData, chunkQosData, buf, length, isLastChunk, count, null);
2352 count++;
2353 if (length < bufSize)
2354 return new PublishReturnQos[] { pubRetQos };
2355 }
2356 }
2357 catch (IOException ex) {
2358 if (count > 0)
2359 publishSingleChunk(keyData, qosData, buf, 0, true, count, ex);
2360 throw new XmlBlasterException(getGlobal(), ErrorCode.RESOURCE, "Sending Chunked message", "failed due to an IOException", ex);
2361 }
2362 catch (XmlBlasterException ex) {
2363 if (count > 0)
2364 publishSingleChunk(keyData, qosData, buf, 0, true, count, ex);
2365 throw ex;
2366 }
2367 }
2368
2369 public Object getUserObject() {
2370 return userObject;
2371 }
2372
2373 public void setUserObject(Object userObject) {
2374 this.userObject = userObject;
2375 }
2376 }
syntax highlighted by Code2HTML, v. 0.9.1