1 /*------------------------------------------------------------------------------
   2 Name:      XmlBlasterAccess.java
   3 Project:   xmlBlaster.org
   4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
   5 ------------------------------------------------------------------------------*/
   6 package org.xmlBlaster.client;
   7 
   8 import java.io.IOException;
   9 import java.io.InputStream;
  10 import java.util.ArrayList;
  11 import java.util.List;
  12 import java.util.Map;
  13 import java.util.logging.Level;
  14 import java.util.logging.Logger;
  15 
  16 import org.xmlBlaster.authentication.plugins.I_ClientPlugin;
  17 import org.xmlBlaster.client.dispatch.ClientDispatchManager;
  18 import org.xmlBlaster.client.key.EraseKey;
  19 import org.xmlBlaster.client.key.GetKey;
  20 import org.xmlBlaster.client.key.PublishKey;
  21 import org.xmlBlaster.client.key.SubscribeKey;
  22 import org.xmlBlaster.client.key.UnSubscribeKey;
  23 import org.xmlBlaster.client.key.UpdateKey;
  24 import org.xmlBlaster.client.protocol.AbstractCallbackExtended;
  25 import org.xmlBlaster.client.protocol.I_CallbackServer;
  26 import org.xmlBlaster.client.qos.ConnectQos;
  27 import org.xmlBlaster.client.qos.ConnectReturnQos;
  28 import org.xmlBlaster.client.qos.DisconnectQos;
  29 import org.xmlBlaster.client.qos.EraseQos;
  30 import org.xmlBlaster.client.qos.EraseReturnQos;
  31 import org.xmlBlaster.client.qos.GetQos;
  32 import org.xmlBlaster.client.qos.PublishQos;
  33 import org.xmlBlaster.client.qos.PublishReturnQos;
  34 import org.xmlBlaster.client.qos.SubscribeQos;
  35 import org.xmlBlaster.client.qos.SubscribeReturnQos;
  36 import org.xmlBlaster.client.qos.UnSubscribeQos;
  37 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
  38 import org.xmlBlaster.client.qos.UpdateQos;
  39 import org.xmlBlaster.client.queuemsg.MsgQueueConnectEntry;
  40 import org.xmlBlaster.client.queuemsg.MsgQueueDisconnectEntry;
  41 import org.xmlBlaster.client.queuemsg.MsgQueueEraseEntry;
  42 import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;
  43 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
  44 import org.xmlBlaster.client.queuemsg.MsgQueueSubscribeEntry;
  45 import org.xmlBlaster.client.queuemsg.MsgQueueUnSubscribeEntry;
  46 import org.xmlBlaster.jms.XBConnectionMetaData;
  47 import org.xmlBlaster.util.FileDumper;
  48 import org.xmlBlaster.util.Global;
  49 import org.xmlBlaster.util.I_ReplaceContent;
  50 import org.xmlBlaster.util.I_Timeout;
  51 import org.xmlBlaster.util.I_TimeoutManager;
  52 import org.xmlBlaster.util.MsgUnit;
  53 import org.xmlBlaster.util.SessionName;
  54 import org.xmlBlaster.util.Timestamp;
  55 import org.xmlBlaster.util.XmlBlasterException;
  56 import org.xmlBlaster.util.admin.extern.JmxMBeanHandle;
  57 import org.xmlBlaster.util.checkpoint.I_Checkpoint;
  58 import org.xmlBlaster.util.cluster.NodeId;
  59 import org.xmlBlaster.util.context.ContextNode;
  60 import org.xmlBlaster.util.def.Constants;
  61 import org.xmlBlaster.util.def.ErrorCode;
  62 import org.xmlBlaster.util.def.MethodName;
  63 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
  64 import org.xmlBlaster.util.dispatch.DispatchConnection;
  65 import org.xmlBlaster.util.dispatch.DispatchStatistic;
  66 import org.xmlBlaster.util.dispatch.I_ConnectionStatusListener;
  67 import org.xmlBlaster.util.dispatch.I_DispatchManager;
  68 import org.xmlBlaster.util.dispatch.I_PostSendListener;
  69 import org.xmlBlaster.util.error.I_MsgErrorHandler;
  70 import org.xmlBlaster.util.key.MsgKeyData;
  71 import org.xmlBlaster.util.qos.ClientProperty;
  72 import org.xmlBlaster.util.qos.DisconnectQosData;
  73 import org.xmlBlaster.util.qos.MsgQosData;
  74 import org.xmlBlaster.util.qos.TopicProperty;
  75 import org.xmlBlaster.util.qos.address.CallbackAddress;
  76 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
  77 import org.xmlBlaster.util.qos.storage.ClientQueueProperty;
  78 import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;
  79 import org.xmlBlaster.util.queue.I_Entry;
  80 import org.xmlBlaster.util.queue.I_Queue;
  81 import org.xmlBlaster.util.queue.StorageId;
  82 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
  83 
  84 /**
  85  * This is the default implementation of the java client side remote access to xmlBlaster.
  86  * <p>
  87  * It hides a client side queue, the client side dispatcher framework for polling
  88  * or pinging the server and some more features.
  89  * </p>
  90  * <p>
  91  * The interface I_CallbackRaw/I_Callback/I_CallbackExtenden are enforced by AbstractCallbackExtended.
  92  * </p>
  93  */
  94 public /*final*/ class XmlBlasterAccess extends AbstractCallbackExtended
  95                    implements I_XmlBlasterAccess, I_ConnectionStatusListener, I_PostSendListener, XmlBlasterAccessMBean
  96 {
  97    private static Logger log = Logger.getLogger(XmlBlasterAccess.class.getName());
  98    private String ME = "XmlBlasterAccess";
  99    private ContextNode contextNode;
 100    /**
 101     * The cluster node id (name) to which we want to connect, needed for nicer logging, typically null
 102     * Can be set manually from outside before connect
 103     */
 104    private String serverNodeId = null;
 105    private ConnectQos connectQos;
 106    /** The return from connect() */
 107    private ConnectReturnQos connectReturnQos;
 108    private long jmxPublicSessionId;
 109    /** Client side queue during connection failure */
 110    private I_Queue clientQueue;
 111    /** The dispatcher framework **/
 112    private I_DispatchManager dispatchManager;
 113    /** Statistic about send/received messages, can be null if there is a DispatchManager around */
 114    private volatile DispatchStatistic statistic;
 115    /** The object handling message delivery problems */
 116    private I_MsgErrorHandler msgErrorHandler;
 117    /** Client side helper classes to load the authentication xml string */
 118    private I_ClientPlugin secPlgn;
 119    /** The callback server */
 120    private I_CallbackServer cbServer;
 121    /** Handles the registered callback interfaces for given subscriptions. */
 122    private final UpdateDispatcher updateDispatcher;
 123    /** Used to callback the clients default update() method (as given on connect()) */
 124    private I_Callback updateListener;
 125    /** Is not null if the client wishes to be notified about connection state changes in fail safe operation */
 126    private I_ConnectionStateListener connectionListener;
 127    private I_PostSendListener postSendListener;
 128    /** Allow to cache updated messages for simulated synchronous access with get().
 129     * Do behind a get() a subscribe to allow cached synchronous get() access */
 130    private SynchronousCache synchronousCache;
 131    private boolean disconnectInProgress;
 132    private boolean connectInProgress;
 133    private String[] checkPointContext;
 134 
 135    /** this I_XmlBlasterAccess is valid until a 'leaveServer' invocation is done.*/
 136    private boolean isValid = true;
 137 
 138    private boolean firstWarn = true;
 139 
 140    private Timestamp sessionRefreshTimeoutHandle;
 141    /** My JMX registration */
 142    private JmxMBeanHandle mbeanHandle;
 143    /** First call to connect() in millis */
 144    private long startupTime;
 145 
 146    StreamingCallback streamingCb;
 147 
 148    private String storageIdPrefix;
 149 
 150    private FileDumper fileDumper;
 151 
 152    private boolean shutdown = false;
 153    
 154    private Object userObject;
 155    
 156    private XmlBlasterException toDeadXmlBlasterException;
 157 
 158    /**
 159     * Create an xmlBlaster accessor.
 160     * Please don't create directly but use the factory instead:
 161     * <pre>
 162     *   import org.xmlBlaster.util.Global;
 163     *   ...
 164     *   final Global glob = new Global(args);
 165     *   final I_XmlBlasterAccess xmlBlasterAccess = glob.getXmlBlasterAccess();
 166     * </pre>
 167     * @param glob Your environment handle or null to use the default Global.instance()
 168     *        You must use a cloned Global for each XmlBlasterAccess created.
 169     *        engine.Global is not allowed here, only util.Global is supported
 170     * @exception IllegalArgumentException If engine.Global is used as parameter
 171     */
 172    public XmlBlasterAccess(Global glob) {
 173       super((glob==null) ? Global.instance() : glob);
 174       //if (glob.wantsHelp()) {
 175       //   usage();
 176       //}
 177       if (super.glob.getNodeId() != null) {
 178          // it is a engine.Global!
 179          throw new IllegalArgumentException("XmlBlasterAccess can't be created with a engine.Global, please clone a org.xmlBlaster.util.Global to create me");
 180       }
 181       this.updateDispatcher = new UpdateDispatcher(super.glob);
 182    }
 183 
 184    /**
 185     * Create an xmlBlaster accessor.
 186     * Please don't create directly but use the factory instead:
 187     * <pre>
 188     *   final Global glob = new Global(args);
 189     *   final I_XmlBlasterAccess xmlBlasterAccess = glob.getXmlBlasterAccess();
 190     * </pre>
 191     * @param args Your command line arguments
 192     */
 193    public XmlBlasterAccess(String[] args) {
 194       super(new Global(args, true, false));
 195       this.updateDispatcher = new UpdateDispatcher(super.glob);
 196    }
 197 
 198    /**
 199     * @see org.xmlBlaster.client.I_XmlBlasterAccess#registerConnectionListener(I_ConnectionStateListener)
 200     */
 201    public synchronized void registerConnectionListener(I_ConnectionStateListener connectionListener) {
 202       if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Initializing registering connectionListener");
 203       this.connectionListener = connectionListener;
 204    }
 205 
 206    /**
 207     * Register a listener to get notifications when a messages is successfully send from
 208     * the client side tail back queue.
 209     * Max one can be registered, any old one will be overwritten
 210     * @param postSendListener The postSendListener to set.
 211     * @return the old listener or null if no previous was registered
 212     */
 213    public final I_PostSendListener registerPostSendListener(I_PostSendListener postSendListener) {
 214       I_PostSendListener old = this.postSendListener;
 215       this.postSendListener = postSendListener;
 216       return old;
 217    }
 218 
 219    /**
 220     * Called after a messages is send from the client side queue, but not for oneway messages.
 221     * Enforced by I_PostSendListener
 222     * @param msgQueueEntry, includes the returned QoS (e.g. PublisReturnQos)
 223     */
 224    public final void postSend(MsgQueueEntry[] entries) {
 225       for (int i=0; i<entries.length; i++) {
 226          MsgQueueEntry msgQueueEntry = entries[i];
 227          if (msgQueueEntry.getMethodName() == MethodName.CONNECT) {
 228             this.connectReturnQos = (ConnectReturnQos)msgQueueEntry.getReturnObj();
 229             if (this.connectReturnQos != null) {
 230                setContextNodeId(this.connectReturnQos.getServerInstanceId());
 231                // break; Loop to the latest if any
 232             }
 233             else {
 234                //log.warning("Expected connectReturnQos for " + msgQueueEntry.toXml() + " " + Global.getStackTraceAsString(null));
 235                if (log.isLoggable(Level.FINE)) log.fine("Expected connectReturnQos for " + msgQueueEntry.toXml() + " " + Global.getStackTraceAsString(null));
 236             }
 237          }
 238       }
 239       I_PostSendListener l = this.postSendListener;
 240       if (l != null) {
 241          try {
 242             l.postSend(entries);
 243          }
 244          catch (Throwable e) {
 245             e.printStackTrace();
 246          }
 247       }
 248    }
 249    
 250    public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException exception) {
 251       I_PostSendListener l = this.postSendListener;
 252       try {
 253          if (l == null) {
 254             for (int i=0; i<entries.length; i++) {
 255                MsgUnit msgUnit = entries[i].getMsgUnit();
 256                if (msgUnit != null) {
 257                   String fn = this.getFileDumper().dumpMessage(msgUnit.getKeyData(), msgUnit.getContent(),
 258                         msgUnit.getQosData());
 259                   log.severe("Async sending of message failed for message " + msgUnit.getKeyOid() + ", is dumped to "
 260                         + fn + ": " + exception.getMessage());
 261                } else {
 262                   log.severe("Async sending of message failed: " + entries[i].toXml() + ": " + exception.getMessage());
 263                }
 264             }
 265          }
 266          else {
 267             return l.sendingFailed(entries, exception);
 268          }
 269       }
 270       catch (Throwable e) {
 271          e.printStackTrace();
 272          for (int i=0; i<entries.length; i++)
 273             log.severe("Async sending of message failed for message " + entries[i].toXml() +"\nreason is: " + exception.getMessage());
 274       }
 275       return false;
 276    }
 277    
 278    public FileDumper getFileDumper() throws XmlBlasterException {
 279       if (this.fileDumper == null) {
 280          synchronized (this) {
 281             if (this.fileDumper == null) {
 282                this.fileDumper = new FileDumper(this.glob);
 283             }
 284          }
 285       }
 286       return this.fileDumper;
 287    }
 288 
 289 
 290    /**
 291     */
 292    public SynchronousCache createSynchronousCache(int size) {
 293       if (this.synchronousCache != null)
 294          return this.synchronousCache; // Is initialized already
 295       if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Initializing synchronous cache: size=" + size);
 296       this.synchronousCache = new SynchronousCache(glob, size);
 297       log.info(getLogId()+"SynchronousCache has been initialized with size="+size);
 298       return this.synchronousCache;
 299    }
 300 
 301    public void setClientErrorHandler(I_MsgErrorHandler msgErrorHandler) {
 302       this.msgErrorHandler = msgErrorHandler;
 303    }
 304 
 305    public String getConnectionQueueId() {
 306       if (this.clientQueue != null) {
 307          return this.clientQueue.getStorageId().toString();
 308       }
 309       return "";
 310    }
 311 
 312    /**
 313     * The unique name of this session instance.
 314     * @return Never null, for example "/xmlBlaster/node/heron/client/joe/session/-2"
 315     */
 316    public final ContextNode getContextNode() {
 317       return this.contextNode;
 318    }
 319 
 320    public boolean forcePollingForTesting() {
 321       if (!isAlive())
 322          return false;
 323       DispatchConnection dcon = this.dispatchManager.getDispatchConnectionsHandler().getAliveDispatchConnection();
 324       if (dcon == null)
 325          return false;
 326       XmlBlasterException e = new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION,
 327             "forcePollingForTesting", "Forcing POLLING");
 328       try {
 329          dcon.handleTransition(true, e);
 330       } catch (XmlBlasterException e1) {
 331          e1.printStackTrace();
 332          return false;
 333       }
 334       return true;
 335    }
 336 
 337    
 338    public ConnectReturnQos connect(ConnectQos qos, I_StreamingCallback streamingUpdateListener, boolean withQueue) throws XmlBlasterException {
 339       if (streamingUpdateListener == null)
 340          throw new XmlBlasterException(this.glob, ErrorCode.USER_ILLEGALARGUMENT, "connect", "the streamingUpdateListener is null, you must provide one");
 341       this.streamingCb = new StreamingCallback(this.glob, streamingUpdateListener, 0, 0, withQueue);
 342       if (withQueue)
 343          registerConnectionListener(this.streamingCb);
 344       return connect(qos, this.streamingCb);
 345    }
 346    
 347    /**
 348     * The storageId must remain the same after a client restart
 349     * 
 350     * @param relating
 351     *           xbType like Constants.RELATING_CLIENT
 352     * @return
 353     */
 354    public StorageId createStorageId(String relating) {
 355       StorageId storageId = null;
 356       if (getStorageIdStr() != null && getStorageIdStr().length() > 0) {
 357          // client code forces a named client side storageId -
 358          // dangerous if the name conflicts with server name in same DB
 359          storageId = new StorageId(glob, serverNodeId, relating, getStorageIdStr());
 360       } else {
 361          if (getPublicSessionId() == 0) {
 362             // having no public sessionId we need to generate a unique
 363             // queue name
 364             storageId = new StorageId(glob, serverNodeId, relating, getId() + System.currentTimeMillis()
 365                   + Global.getCounter());
 366          } else {
 367             SessionName ses = getSessionName();
 368             if (ses != null)
 369                storageId = new StorageId(glob, serverNodeId, relating, ses);
 370             else
 371                storageId = new StorageId(glob, serverNodeId, relating, getId() + System.currentTimeMillis()
 372                      + Global.getCounter());
 373          }
 374       }
 375       return storageId;
 376    }
 377 
 378    /**
 379     * @see org.xmlBlaster.client.I_XmlBlasterAccess#connect(ConnectQos, I_Callback)
 380     */
 381    public ConnectReturnQos connect(ConnectQos qos, I_Callback updateListener) throws XmlBlasterException {
 382       if (!this.isValid)
 383          throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "connect");
 384 
 385       synchronized (this) {
 386 
 387          if (this.startupTime == 0) {
 388             this.startupTime = System.currentTimeMillis();
 389          }
 390 
 391          if (isConnected() || this.connectInProgress) {
 392             String text = "connect() rejected, you are connected already, please check your code";
 393             throw new XmlBlasterException(glob, ErrorCode.USER_CONNECT_MULTIPLE, ME, text);
 394          }
 395 
 396          this.connectInProgress = true;
 397 
 398          try {
 399             this.connectQos = (qos==null) ? new ConnectQos(glob) : qos;
 400 
 401             ClientProperty tmp = this.connectQos.getClientProperty(Constants.UPDATE_BULK_ACK);
 402             if (tmp != null) {
 403                if (tmp.getBooleanValue()) {
 404                   log.info("Setting the flag '" + Constants.UPDATE_BULK_ACK + "' to 'true' since specified in ConnectQos");
 405                   this.updateBulkAck = true;
 406                }
 407             }
 408 
 409 
 410             // We need to set a unique ID for this client so that global.getId() is unique
 411             // which is used e.g. in the JDBC plugin
 412             SessionName sn = getSessionName();
 413             if (sn != null) {
 414                if (sn.isPubSessionIdUser()) {
 415                   this.glob.setId(sn.toString());
 416                }
 417                else {
 418                   this.glob.setId(sn.toString() + System.currentTimeMillis()); // Not secure if two clients start simultaneously
 419                }
 420             }
 421             else {
 422                this.glob.setId(getLoginName() + System.currentTimeMillis()); // Not secure if two clients start simultaneously
 423             }
 424             this.glob.resetInstanceId();
 425             this.connectQos.getData().setInstanceId(this.glob.getInstanceId());
 426 
 427             if (connectQos.getData().getGlobal().isServerSide()) {
 428                String text = "Your ConnectQos.getData() contains a ServerScope instead of a Global instance, this is not allowed";
 429                throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, text);
 430             }
 431 
 432             this.updateListener = updateListener;
 433 
 434             // TODO: This is done by ConnectQos already, isn't it?
 435             initSecuritySettings(this.connectQos.getData().getClientPluginType(),
 436                                  this.connectQos.getData().getClientPluginVersion());
 437 
 438             this.ME = "XmlBlasterAccess-" + getId();
 439             setContextNodeId(getServerNodeId());
 440 
 441             try {
 442                ClientQueueProperty prop = this.connectQos.getClientQueueProperty();
 443                StorageId storageId = createStorageId(Constants.RELATING_CLIENT);
 444                this.clientQueue = glob.getQueuePluginManager().getPlugin(prop.getType(), prop.getVersion(), storageId,
 445                                                       this.connectQos.getClientQueueProperty());
 446                if (this.clientQueue == null) {
 447                   String text = "The client queue plugin is not found with this configuration, please check your connect QoS: " + prop.toXml();
 448                   throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME, text);
 449                }
 450 
 451                if (this.msgErrorHandler == null) {
 452                   this.msgErrorHandler = new ClientErrorHandler(glob, this);
 453                }
 454 
 455                boolean forceCbAddressCreation = (updateListener != null);
 456                this.dispatchManager = new ClientDispatchManager(glob, this.msgErrorHandler,
 457                                        getSecurityPlugin(), this.clientQueue, this,
 458                                        this.connectQos.getAddresses(forceCbAddressCreation), sn);
 459                // the above can call toDead() and the client may have called shutdown(): this.connectQos == null again
 460                if (this.dispatchManager.isDead())
 461                    throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "connect call failed, your toDead() code did shutdown?");
 462 
 463                getDispatchStatistic(); // Force creation of dispatchStatistic as this syncs on 'this' and could deadlock if don later from a update()
 464 
 465                this.dispatchManager.getDispatchConnectionsHandler().registerPostSendListener(this);
 466 
 467                if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Switching to synchronous delivery mode ...");
 468                this.dispatchManager.trySyncMode(true);
 469 
 470                if (this.updateListener != null) { // Start a default callback server using same protocol
 471                   createDefaultCbServer();
 472                }
 473 
 474                if (this.connectQos.doSendConnect()) {
 475                   // Try to connect to xmlBlaster ...
 476                   sendConnectQos();
 477                }
 478                else {
 479                   log.info(getLogId()+"Initialized client library, but no connect() is send to xmlBlaster, a delegate should do any subscribe if required");
 480                }
 481             }
 482             catch (XmlBlasterException e) {
 483                if (isConnected()) disconnect((DisconnectQos)null);
 484                throw e;
 485             }
 486             catch (Throwable e) {
 487                if (isConnected()) disconnect((DisconnectQos)null);
 488                throw XmlBlasterException.convert(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "Connection failed", e);
 489             }
 490          }
 491          finally {
 492             this.connectInProgress = false;
 493          }
 494       } // synchronized
 495 
 496       if (this.connectQos.getRefreshSession()) {
 497          startSessionRefresher();
 498       }
 499 
 500       if (isAlive()) {
 501          if (this.connectionListener != null) {
 502             this.connectionListener.reachedAlive(ConnectionStateEnum.UNDEF, this);
 503          }
 504          log.info(glob.getReleaseId() + ": Successful " + this.connectQos.getAddress().getType() + " login as " + getId());
 505 
 506          if (this.clientQueue.getNumOfEntries() > 0) {
 507             long num = this.clientQueue.getNumOfEntries();
 508             log.info(getLogId()+"We have " + num + " client side queued tail back messages");
 509             this.dispatchManager.switchToASyncMode();
 510             while (this.clientQueue.getNumOfEntries() > 0) {
 511                try { Thread.sleep(20L); } catch( InterruptedException i) {}
 512             }
 513             log.info((num-this.clientQueue.getNumOfEntries()) + " client side queued tail back messages sent");
 514             this.dispatchManager.switchToSyncMode();
 515          }
 516          else {
 517             if (this.connectionListener != null)
 518                this.connectionListener.reachedAliveSync(ConnectionStateEnum.ALIVE, this);
 519          }
 520       }
 521       else {
 522          if (this.connectionListener != null) {
 523             this.connectionListener.reachedPolling(ConnectionStateEnum.UNDEF, this);
 524          }
 525          log.info(glob.getReleaseId() + ": Login request as " + getId() + " is queued");
 526       }
 527 
 528       if (this.connectReturnQos != null) {
 529          setContextNodeId(this.connectReturnQos.getServerInstanceId());
 530       }
 531 
 532       return this.connectReturnQos; // new ConnectReturnQos(glob, "");
 533    }
 534 
 535    /**
 536     * Sends the current connectQos to xmlBlaster and stores the connectReturnQos.
 537     * @throws XmlBlasterException
 538     */
 539    private void sendConnectQos() throws XmlBlasterException {
 540       MsgQueueConnectEntry entry = new MsgQueueConnectEntry(this.glob, this.clientQueue.getStorageId(), this.connectQos.getData());
 541       // Try to connect to xmlBlaster ...
 542       this.connectReturnQos = (ConnectReturnQos)queueMessage(entry);
 543       this.connectReturnQos.getData().setInitialConnectionState(this.dispatchManager.getDispatchConnectionsHandler().getState());
 544    }
 545 
 546    public boolean isConnected() {
 547       if (this.dispatchManager != null) {
 548          return this.connectReturnQos != null && !this.dispatchManager.getDispatchConnectionsHandler().isDead();
 549       }
 550       return this.connectReturnQos != null;
 551    }
 552 
 553    private void startSessionRefresher() {
 554       if (this.connectQos == null) return;
 555       long sessionTimeout = this.connectQos.getSessionQos().getSessionTimeout();
 556       final long MIN = 2000L; // Sessions which live less than 2 seconds are not supported
 557       if (sessionTimeout >= MIN) {
 558          long gap = (sessionTimeout < 60*1000L) ? sessionTimeout/2 : sessionTimeout-30*1000L;
 559          final long refreshTimeout = sessionTimeout - gap;
 560          final I_TimeoutManager timeout = this.glob.getPingTimer();
 561          this.sessionRefreshTimeoutHandle = timeout.addTimeoutListener(new I_Timeout() {
 562                public void timeout(Object userData) {
 563                   if (isAlive()) {
 564                      if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Refreshing session to not expire");
 565                      try {
 566                         refreshSession();
 567                      }
 568                      catch (XmlBlasterException e) {
 569                         log.warning(getLogId()+"Can't refresh the login session '" + getId() + "': " + e.toString());
 570                      }
 571                   }
 572                   else {
 573                      if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Can't refresh session as we have no connection");
 574                   }
 575                   try {
 576                      sessionRefreshTimeoutHandle = timeout.addOrRefreshTimeoutListener(this, refreshTimeout, null, sessionRefreshTimeoutHandle) ;
 577                   }
 578                   catch (XmlBlasterException e) {
 579                      log.warning(getLogId()+"Can't refresh the login session '" + getId() + "': " + e.toString());
 580                   }
 581                }
 582             },
 583             refreshTimeout, null);
 584       }
 585       else {
 586          log.warning(getLogId()+"Auto-refreshing session is not supported for session timeouts smaller " + MIN + " seconds");
 587 
 588       }
 589    }
 590 
 591 
 592    /**
 593     * @see I_XmlBlasterAccess#refreshSession()
 594     */
 595    public void refreshSession() throws XmlBlasterException {
 596       GetKey gk = new GetKey(glob, "__refresh");
 597       GetQos gq = new GetQos(glob);
 598       get(gk, gq);
 599    }
 600 
 601    /**
 602     * Extracts address data from ConnectQos (or adds default if missing)
 603     * and instantiate a callback server as specified in ConnectQos
 604     */
 605    private void createDefaultCbServer() throws XmlBlasterException {
 606       CbQueueProperty prop = connectQos.getSessionCbQueueProperty(); // Creates a default property for us if none is available
 607       CallbackAddress addr = prop.getCurrentCallbackAddress(); // may return null
 608       if (addr == null)
 609          addr = new CallbackAddress(glob);
 610 
 611       this.cbServer = initCbServer(getLoginName(), addr);
 612 
 613       addr.setType(this.cbServer.getCbProtocol());
 614       addr.setRawAddress(this.cbServer.getCbAddress());
 615       //addr.setVersion(this.cbServer.getVersion());
 616       //addr.setSecretSessionId(cbSessionId);
 617       prop.setCallbackAddress(addr);
 618 
 619       log.info(getLogId()+"Callback settings: " + prop.getSettings());
 620    }
 621 
 622    /**
 623     * @see I_XmlBlasterAccess#initCbServer(String, CallbackAddress)
 624     */
 625    public I_CallbackServer initCbServer(String loginName, CallbackAddress callbackAddress) throws XmlBlasterException {
 626       if (callbackAddress == null)
 627          callbackAddress = new CallbackAddress(glob);
 628       callbackAddress.setSessionName(this.getSessionName());
 629       if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Using 'client.cbProtocol=" + callbackAddress.getType() + "' to be used by " + getServerNodeId() + ", trying to create the callback server ...");
 630       I_CallbackServer server = glob.getCbServerPluginManager().getPlugin(callbackAddress.getType(), callbackAddress.getVersion());
 631       server.initialize(this.glob, loginName, callbackAddress, this);
 632       return server;
 633    }
 634 
 635    /**
 636     * Initializes the little client helper framework for authentication.
 637     * <p />
 638     * The first goal is a proper loginQoS xml string for authentication.
 639     * <p />
 640     * The second goal is to intercept the messages for encryption (or whatever the
 641     * plugin supports).
 642     * <p />
 643     * See xmlBlaster.properties, for example:
 644     * <pre>
 645     *   Security.Client.DefaultPlugin=gui,1.0
 646     *   Security.Client.Plugin[gui][1.0]=org.xmlBlaster.authentication.plugins.gui.ClientSecurityHelper
 647     * </pre>
 648     */
 649    private void initSecuritySettings(String secMechanism, String secVersion) {
 650       PluginLoader secPlgnMgr = glob.getClientSecurityPluginLoader();
 651       try {
 652          this.secPlgn = secPlgnMgr.getClientPlugin(secMechanism, secVersion);
 653          if (secMechanism != null)  // to avoid double logging for login()
 654             log.info(getLogId()+"Loaded security plugin=" + secMechanism + " version=" + secVersion);
 655       }
 656       catch (XmlBlasterException e) {
 657          log.severe(getLogId()+"Security plugin '" + secMechanism + "/" + secVersion +
 658                        "' initialization failed. Reason: "+e.getMessage());
 659          this.secPlgn = null;
 660       }
 661    }
 662 
 663    public I_ClientPlugin getSecurityPlugin() {
 664       return this.secPlgn;
 665    }
 666 
 667    /**
 668     * @see org.xmlBlaster.client.XmlBlasterAccessMBean#disconnect(String)
 669     */
 670    public String disconnect(String disconnectQos) {
 671       DisconnectQosData dqd = new DisconnectQosData(this.glob, null, disconnectQos);
 672       boolean success = disconnect(new DisconnectQos(this.glob, dqd));
 673       return "Disconnect called, success=" + success;
 674    }
 675 
 676    /**
 677     * @see org.xmlBlaster.client.I_XmlBlasterAccess#disconnect(DisconnectQos)
 678     * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.disconnect.html">interface.disconnect requirement</a>
 679     */
 680    public synchronized boolean disconnect(DisconnectQos disconnectQos) {
 681       if (!this.isValid) return false;
 682       // Relaxed check to allow shutdown of database without successful connection
 683       if (this.connectQos == null /*!isConnected()*/) {
 684          log.warning(getLogId()+"You called disconnect() but you are are not logged in, we ignore it.");
 685          if (glob != null)
 686           glob.shutdown();
 687          //shutdown(disconnectQos);
 688          return false;
 689       }
 690 
 691       if (disconnectQos == null)
 692          disconnectQos = new DisconnectQos(glob);
 693 
 694       if (!disconnectQos.getClearClientQueueProp().isModified()) {
 695          boolean clearClientQueue = true;
 696          if (this.connectQos != null) {
 697             if (this.connectQos.getSessionName().isPubSessionIdUser())
 698                clearClientQueue = false;  // Keep tail back messages
 699          }
 700          disconnectQos.clearClientQueue(clearClientQueue);
 701       }
 702 
 703       return shutdown(disconnectQos);
 704    }
 705 
 706    /**
 707     * @see org.xmlBlaster.client.XmlBlasterAccessMBean#leaveServer()
 708     */
 709    public String leaveServer() {
 710       leaveServer(null);
 711       return "Clientlibrary is shutdown";
 712    }
 713 
 714    /**
 715     * @see org.xmlBlaster.client.I_XmlBlasterAccess#leaveServer(Map)
 716     */
 717    public void leaveServer(Map map) {
 718       if (!this.isValid) return;
 719       synchronized(this) {
 720          this.isValid = false;
 721          DisconnectQos disconnectQos = new DisconnectQos(glob);
 722          disconnectQos.clearClientQueue(false);
 723          disconnectQos.clearSessions(false);
 724          disconnectQos.deleteSubjectQueue(false);
 725          disconnectQos.setLeaveServer(true);
 726          disconnectQos.shutdownCbServer(true);
 727          disconnectQos.shutdownDispatcher(true);
 728          shutdown(disconnectQos);
 729       }
 730    }
 731 
 732    private synchronized boolean shutdown(DisconnectQos disconnectQos) {
 733       if (this.disconnectInProgress) {
 734          log.warning(getLogId()+"Calling disconnect again is ignored, you are in shutdown progress already");
 735          return false;
 736       }
 737 
 738       this.disconnectInProgress = true;
 739 
 740       this.glob.unregisterMBean(this.mbeanHandle);
 741 
 742       if (disconnectQos == null)
 743          disconnectQos = new DisconnectQos(glob);
 744 
 745       if (isConnected()) {
 746 
 747          if (this.clientQueue != null) {
 748             long remainingEntries = this.clientQueue.getNumOfEntries();
 749             if (remainingEntries > 0) {
 750                if (disconnectQos.clearClientQueue())
 751                   log.warning(getLogId()+"You called disconnect(). Please note that there are " + remainingEntries +
 752                                " unsent invocations/messages in the queue which are discarded now.");
 753                else
 754                   log.info(getLogId()+"You called disconnect(). Please note that there are " + remainingEntries +
 755                                " unsent invocations/messages in the queue which are sent on next connect of the same client with the same public session ID.");
 756             }
 757          }
 758 
 759          if (!disconnectQos.isLeaveServer()) {
 760             String[] subscriptionIdArr = this.updateDispatcher.getSubscriptionIds();
 761             for (int ii=0; ii<subscriptionIdArr.length; ii++) {
 762                String subscriptionId = subscriptionIdArr[ii];
 763                UnSubscribeKey key = new UnSubscribeKey(glob, subscriptionId);
 764                try {
 765                   unSubscribe(key, null);
 766                }
 767                catch(XmlBlasterException e) {
 768                   if (e.isCommunication()) {
 769                      break;
 770                   }
 771                   log.warning(getLogId()+"Couldn't unsubscribe '" + subscriptionId + "' : " + e.getMessage());
 772                }
 773             }
 774          }
 775 
 776          // Now send the disconnect() to the server ...
 777          if (!disconnectQos.isLeaveServer() && this.clientQueue != null) {
 778             try {
 779                MsgQueueDisconnectEntry entry = new MsgQueueDisconnectEntry(this.glob, this.clientQueue.getStorageId(), disconnectQos);
 780                queueMessage(entry);  // disconnects are always transient
 781                log.info(getLogId()+"Successful disconnect from " + getServerNodeId());
 782             } catch(Throwable e) {
 783                e.printStackTrace();
 784                log.warning(e.toString());
 785             }
 786          }
 787       }
 788 
 789       if (this.synchronousCache != null) {
 790          this.synchronousCache.clear();
 791       }
 792 
 793       if (this.clientQueue != null && disconnectQos.clearClientQueue()) {
 794          this.clientQueue.clear();
 795       }
 796 
 797       if (disconnectQos.shutdownDispatcher()) {
 798          if (this.dispatchManager != null) {
 799             this.dispatchManager.shutdown();
 800             //this.dispatchManager = null;
 801          }
 802          if (this.clientQueue != null) {
 803             this.clientQueue.shutdown(); // added to make hsqldb shutdown
 804             this.clientQueue = null;
 805          }
 806       }
 807 
 808       if (disconnectQos.shutdownCbServer() && this.cbServer != null) {
 809          try {
 810             this.cbServer.shutdown();
 811             this.cbServer = null;
 812          } catch (Throwable e) {
 813             e.printStackTrace();
 814             log.warning(e.toString());
 815          }
 816       }
 817 
 818       this.updateDispatcher.clear();
 819 
 820       if (this.secPlgn != null) {
 821          this.secPlgn = null;
 822       }
 823 
 824       this.connectQos = null;
 825       this.connectReturnQos = null;
 826       this.disconnectInProgress = false;
 827       this.msgErrorHandler = null;
 828       this.updateListener = null;
 829 
 830       this.streamingCb = null;
 831 
 832       super.glob.shutdown();
 833 
 834       this.shutdown = true;
 835       return true;
 836    }
 837 
 838    /**
 839     * @return true if shutdown was called, typically by disconnect()
 840     */
 841    public boolean isShutdown() {
 842       return this.shutdown;
 843    }
 844 
 845    /**
 846     * Access the callback server.
 847     * @return null if no callback server is established
 848     */
 849    public I_CallbackServer getCbServer() {
 850       return this.cbServer;
 851    }
 852 
 853    /**
 854     * Create a descriptive ME, for logging only
 855     * @return e.g. "/node/heron/client/joe/3" or "UNKNOWN_SESSION" if connect() was not successful
 856     */
 857    public String getId() {
 858       SessionName sessionName = getSessionName();
 859       return (sessionName == null) ? "UNKNOWN_SESSION" : sessionName.getAbsoluteName();
 860    }
 861 
 862    /**
 863     * Useful as a logging prefix.
 864     * @return For example "client/TheDesperate/-6: "
 865     */
 866    public String getLogId() {
 867       SessionName sessionName = getSessionName();
 868       return (sessionName == null) ? "" : sessionName.getRelativeName() + ": ";
 869    }
 870 
 871    /**
 872     * The public session ID of this login session.
 873     */
 874    public SessionName getSessionName() {
 875       if (this.connectReturnQos != null)
 876          return this.connectReturnQos.getSessionName();
 877       if (this.connectQos != null) {
 878          SessionName sessionName = this.connectQos.getSessionName();
 879          if (sessionName != null && sessionName.getNodeIdStr() == null && this.serverNodeId != null) {
 880             // In cluster setup the remote cluster node id is forced
 881             SessionName sn = new SessionName(glob, new NodeId(this.serverNodeId), sessionName.getLoginName(),
 882                   sessionName.getPublicSessionId());
 883             // log.info("Using sessionName=" + sn.getAbsoluteName());
 884             this.connectQos.setSessionName(sn);
 885             return sn;
 886          }
 887          return sessionName;
 888       }
 889       return null;
 890    }
 891 
 892    /**
 893     * @see I_XmlBlasterAccess#getStorageIdStr()
 894     */
 895    public String getStorageIdStr() {
 896       return this.storageIdPrefix;
 897    }
 898 
 899    /**
 900     * @see I_XmlBlasterAccess#setStorageIdStr(String)
 901     */
 902    public void setStorageIdStr(String prefix) {
 903       this.storageIdPrefix = Global.getStrippedString(prefix);
 904    }
 905 
 906 
 907    /**
 908     * Allows to set the node name for nicer logging.
 909     * Typically used by cluster clients and not by ordinary clients
 910     * @param serverNodeId For example "/node/heron/instanceId/1233435" or "/node/heron"
 911     */
 912    public void setServerNodeId(String nodeId) {
 913       if (nodeId == null) return;
 914       if (nodeId.startsWith("/node") || nodeId.startsWith("/xmlBlaster/node"))
 915          this.serverNodeId = nodeId;
 916       else
 917          this.serverNodeId = "/node/" + nodeId;
 918    }
 919 
 920    /**
 921     * The cluster node id (name) to which we want to connect.
 922     * <p />
 923     * Needed for client queue storage identifier. see: setStorageIdStr()
 924     * <p />
 925     * for nicer logging when running in a cluster.<br />
 926     * Is configurable with "-server.node.id golan" until a successful connect
 927     * 
 928     * @return e.g. "/node/golan" or /xmlBlaster/node/heron"
 929     */
 930    public String getServerNodeId() {
 931       if (this.contextNode != null) return this.contextNode.getParent(ContextNode.CLUSTER_MARKER_TAG).getAbsoluteName();
 932       if (this.serverNodeId != null) return this.serverNodeId;
 933       return this.glob.getInstanceId(); // Changes for each restart
 934    }
 935 
 936    /**
 937     * Set my identity.
 938     * @param serverNodeId For example "/node/heron/instanceId/1233435" or "/node/heron"
 939     */
 940    private void setContextNodeId(String nodeId) {
 941       // Not for cluster with given serverNodeId: It is invariant
 942       if (this.serverNodeId != null)
 943          nodeId = this.serverNodeId;
 944       
 945       if (nodeId == null) return;
 946       if (nodeId.indexOf("/") == -1) nodeId = "/node/"+nodeId; // add CLUSTER_MARKER_TAG to e.g. "/node/avalon.mycomp.com"
 947 
 948       String oldClusterObjectName = "";      // e.g. "org.xmlBlaster:nodeClass=node,node=clientSUB1"
 949       String oldServerNodeInstanceName = ""; // e.g. "clientSUB1"
 950       ContextNode clusterContext = null;
 951       if (this.contextNode != null) {
 952          // same instance as glob.getContextNode():
 953          clusterContext = this.contextNode.getParent(ContextNode.CLUSTER_MARKER_TAG);
 954          oldServerNodeInstanceName = clusterContext.getInstanceName();
 955          oldClusterObjectName = clusterContext.getAbsoluteName(ContextNode.SCHEMA_JMX);
 956       }
 957 
 958       // Verify the publicSessionId ...
 959       if (this.glob.supportJmx()) {
 960          try {
 961             if (this.mbeanHandle != null && this.jmxPublicSessionId != getPublicSessionId()) {
 962             /*int count = */this.glob.getJmxWrapper().renameMBean(this.mbeanHandle.getObjectInstance().getObjectName().toString(),
 963                            ContextNode.SESSION_MARKER_TAG, ""+getPublicSessionId());
 964                this.mbeanHandle.getContextNode().setInstanceName(""+getPublicSessionId());
 965                this.jmxPublicSessionId = getPublicSessionId();
 966             }
 967             if (this.mbeanHandle == null &&
 968                 this.contextNode != null &&
 969                 !this.contextNode.getInstanceName().equals(""+getPublicSessionId())) {
 970                this.contextNode.setInstanceName(""+getPublicSessionId());
 971             }
 972          }
 973          catch (XmlBlasterException e) {
 974             log.warning(getLogId()+"Ignoring problem during JMX session registration: " + e.toString());
 975          }
 976       }
 977       else {
 978           this.jmxPublicSessionId = getPublicSessionId();
 979       }
 980 
 981       // parse new cluster node name ...
 982       ContextNode tmp = ContextNode.valueOf(nodeId);
 983       ContextNode tmpClusterContext = (tmp==null)?null:tmp.getParent(ContextNode.CLUSTER_MARKER_TAG);
 984       if (tmpClusterContext == null) {
 985          log.severe(getLogId()+"Ignoring unknown serverNodeId '" + nodeId + "'");
 986          return;
 987       }
 988       String newServerNodeInstanceName = tmpClusterContext.getInstanceName(); // e.g. "heron"
 989 
 990       if (oldServerNodeInstanceName.equals(newServerNodeInstanceName)) {
 991          return; // nothing to do, same cluster name
 992       }
 993 
 994       this.glob.getContextNode().setInstanceName(newServerNodeInstanceName);
 995       if (clusterContext == null) {
 996          clusterContext = this.glob.getContextNode();
 997          String ln = getLoginName();
 998          if (ln != null && ln.length() > 0) {
 999             String instanceName = this.glob.validateJmxValue(ln);
1000             ContextNode contextNodeSubject = new ContextNode(ContextNode.CONNECTION_MARKER_TAG, instanceName, clusterContext);
1001             this.contextNode = new ContextNode(ContextNode.SESSION_MARKER_TAG, ""+getPublicSessionId(), contextNodeSubject);
1002          }
1003       }
1004       else {
1005          clusterContext.setInstanceName(newServerNodeInstanceName);
1006       }
1007 
1008       this.glob.setScopeContextNode(this.contextNode);
1009 
1010       if (this.glob.supportJmx()) {
1011          try {
1012             // Query all "org.xmlBlaster:nodeClass=node,node=clientSUB1" + ",*" sub-nodes and replace the name by "heron"
1013             // For example our connectionQueue or our plugins like Pop3Driver
1014             if (oldClusterObjectName.length() > 0) {
1015                int num = this.glob.getJmxWrapper().renameMBean(oldClusterObjectName, ContextNode.CLUSTER_MARKER_TAG, this.contextNode);
1016                if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Renamed " + num + " jmx nodes to new '" + nodeId + "'");
1017             }
1018 
1019             if (this.mbeanHandle == null && this.contextNode != null) {   // "org.xmlBlaster:nodeClass=node,node=heron"
1020                this.mbeanHandle = this.glob.registerMBean(this.contextNode, this);
1021             }
1022          }
1023          catch (XmlBlasterException e) {
1024              log.warning(getLogId()+"Ignoring problem during JMX registration: " + e.toString());
1025          }
1026       }
1027 
1028       setCheckpointContext(getLogId());
1029    }
1030 
1031    private void setCheckpointContext(String id) {
1032       if (id == null || id.length() < 1) {
1033          this.checkPointContext = null;
1034          return;
1035       }
1036       this.checkPointContext = new String[] { "sessionName", id };
1037    }
1038 
1039    /**
1040     * Put the given message entry into the queue
1041     */
1042    private Object queueMessage(MsgQueueEntry entry) throws XmlBlasterException {
1043       try {
1044          final I_Checkpoint cp = glob.getCheckpointPlugin();
1045          if (cp != null) {
1046             cp.passingBy(I_Checkpoint.CP_CONNECTION_PUBLISH_ENTER, entry.getMsgUnit(), null, this.checkPointContext);
1047          }
1048          this.clientQueue.put(entry, I_Queue.USE_PUT_INTERCEPTOR);
1049          if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Forwarded one '" + entry.getEmbeddedType() + "' message, current state is " + getState().toString());
1050          return entry.getReturnObj();
1051       }
1052       catch (XmlBlasterException e) {
1053          if (log.isLoggable(Level.FINE)) log.fine(e.getMessage());
1054          throw e;
1055       }
1056       catch (Throwable e) {
1057          if (log.isLoggable(Level.FINE)) log.fine(e.toString());
1058          XmlBlasterException xmlBlasterException = XmlBlasterException.convert(glob,null,null,e);
1059          //msgErrorHandler.handleError(new MsgErrorInfo(glob, entry, null, xmlBlasterException));
1060          throw xmlBlasterException; // internal errors or not in failsafe mode: throw back to client
1061       }
1062    }
1063 
1064    /**
1065     * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos)
1066     */
1067    public SubscribeReturnQos subscribe(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException {
1068       return subscribe(new SubscribeKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),
1069                        new SubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)) );
1070    }
1071 
1072    /**
1073     * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos)
1074     */
1075    public SubscribeReturnQos subscribe(SubscribeKey subscribeKey, SubscribeQos subscribeQos) throws XmlBlasterException {
1076       if (!this.isValid)
1077          throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "subscribe");
1078       if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1079       if (getSessionName().isPubSessionIdUser() &&
1080           subscribeQos.getData().getMultiSubscribe()==false &&
1081           !subscribeQos.getData().hasSubscriptionId()) {
1082           // For failsave clients we generate on client side the subscriptionId
1083           // In case of offline/clientSideQueued operation we guarantee like this a not changing
1084           // subscriptionId and the client code can reliably use the subscriptionId for further dispatching
1085           // of update() messages.
1086           subscribeQos.getData().generateSubscriptionId(getSessionName(), subscribeKey.getData());
1087       }
1088       MsgQueueSubscribeEntry entry  = new MsgQueueSubscribeEntry(glob,
1089                                       this.clientQueue.getStorageId(), subscribeKey.getData(), subscribeQos.getData());
1090       return (SubscribeReturnQos)queueMessage(entry);
1091    }
1092 
1093    /**
1094     * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos, I_Callback)
1095     */
1096    public SubscribeReturnQos subscribe(java.lang.String xmlKey, java.lang.String qos, I_Callback cb) throws XmlBlasterException {
1097       return subscribe(new SubscribeKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),
1098                        new SubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)),
1099                        cb );
1100    }
1101 
1102    /**
1103     * @see I_XmlBlasterAccess#subscribe(SubscribeKey, SubscribeQos, I_Callback)
1104     */
1105    public SubscribeReturnQos subscribe(SubscribeKey subscribeKey, SubscribeQos subscribeQos, I_Callback cb) throws XmlBlasterException {
1106       if (!this.isValid)
1107          throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "subscribe");
1108       if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1109       if (this.updateListener == null) {
1110          String text = "No callback listener is registered. " +
1111                        " Please use XmlBlasterAccess.connect() with default I_Callback given.";
1112          throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, text);
1113       }
1114 
1115       // sync subscribe & put against update()'s check for entry
1116       // otherwise if the update was faster then the subscribe to return we miss the entry
1117       synchronized (this.updateDispatcher) {
1118          SubscribeReturnQos subscribeReturnQos = subscribe(subscribeKey, subscribeQos);
1119          this.updateDispatcher.addCallback(subscribeReturnQos.getSubscriptionId(), cb, subscribeQos.getPersistent());
1120          if (!subscribeReturnQos.isFakedReturn()) {
1121             this.updateDispatcher.ackSubscription(subscribeReturnQos.getSubscriptionId());
1122          }
1123          return subscribeReturnQos;
1124       }
1125    }
1126 
1127    /**
1128     * @see I_XmlBlasterAccess#get(GetKey, GetQos)
1129     */
1130    public MsgUnit[] get(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException {
1131       return get(new GetKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),
1132                  new GetQos(glob, glob.getQueryQosFactory().readObject(qos)) );
1133    }
1134 
1135    /**
1136     * @see I_XmlBlasterAccess#getCached(GetKey, GetQos)
1137     */
1138    public MsgUnit[] getCached(GetKey getKey, GetQos getQos) throws XmlBlasterException {
1139       if (!this.isValid)
1140          throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "getCached");
1141       if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1142       if (this.synchronousCache == null) {  //Is synchronousCache installed?
1143          throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME,
1144               "Can't handle getCached(), please install a cache with createSynchronousCache() first");
1145       }
1146 
1147       MsgUnit[] msgUnitArr = null;
1148       msgUnitArr = this.synchronousCache.get(getKey, getQos);
1149       if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"CacheDump: msgUnitArr=" + msgUnitArr + ": '" + getKey.toXml().trim() + "' \n" + getQos.toXml() + this.synchronousCache.toXml(""));
1150       //not found in this.synchronousCache
1151       if(msgUnitArr == null) {
1152          msgUnitArr = get(getKey, getQos);  //get messages from xmlBlaster (synchronous)
1153          SubscribeKey subscribeKey = new SubscribeKey(glob, getKey.getData());
1154          SubscribeQos subscribeQos = new SubscribeQos(glob, getQos.getData());
1155          SubscribeReturnQos subscribeReturnQos = null;
1156          synchronized (this.synchronousCache) {
1157             subscribeReturnQos = subscribe(subscribeKey, subscribeQos); //subscribe to this messages (asynchronous)
1158             this.synchronousCache.newEntry(subscribeReturnQos.getSubscriptionId(), getKey, msgUnitArr);     //fill messages to this.synchronousCache
1159          }
1160          log.info(getLogId()+"New entry in this.synchronousCache created (subscriptionId="+subscribeReturnQos.getSubscriptionId()+")");
1161       }
1162       return msgUnitArr;
1163    }
1164 
1165    /**
1166     * @see I_XmlBlasterAccess#get(GetKey, GetQos)
1167     */
1168    public MsgUnit[] get(GetKey getKey, GetQos getQos) throws XmlBlasterException {
1169       if (!this.isValid)
1170          throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "get");
1171       if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1172       MsgQueueGetEntry entry  = new MsgQueueGetEntry(glob,
1173                                       this.clientQueue.getStorageId(), getKey, getQos);
1174       MsgUnit[] arr = (MsgUnit[])queueMessage(entry);
1175       return (arr == null) ? new MsgUnit[0] : arr;
1176    }
1177 
1178    /**
1179     * @see I_XmlBlasterAccess#unSubscribe(UnSubscribeKey, UnSubscribeQos)
1180     */
1181    public UnSubscribeReturnQos[] unSubscribe(UnSubscribeKey unSubscribeKey, UnSubscribeQos unSubscribeQos) throws XmlBlasterException {
1182       if (!this.isValid)
1183          throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "unSubscribe");
1184       if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1185       MsgQueueUnSubscribeEntry entry  = new MsgQueueUnSubscribeEntry(glob,
1186                                       this.clientQueue.getStorageId(), unSubscribeKey, unSubscribeQos);
1187       UnSubscribeReturnQos[] arr = (UnSubscribeReturnQos[])queueMessage(entry);
1188       this.updateDispatcher.removeCallback(unSubscribeKey.getOid());
1189       return (arr == null) ? new UnSubscribeReturnQos[0] : arr;
1190    }
1191 
1192    /**
1193     * @see I_XmlBlasterAccess#unSubscribe(UnSubscribeKey, UnSubscribeQos)
1194     */
1195    public UnSubscribeReturnQos[] unSubscribe(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException {
1196       return unSubscribe(new UnSubscribeKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),
1197                        new UnSubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)) );
1198    }
1199 
1200    /**
1201     * @see I_XmlBlasterAccess#publish(MsgUnit)
1202     */
1203    public PublishReturnQos publish(MsgUnit msgUnit) throws XmlBlasterException {
1204       if (!this.isValid)
1205          throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publish");
1206       if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1207       MsgQueuePublishEntry entry  = new MsgQueuePublishEntry(glob, msgUnit, this.clientQueue.getStorageId());
1208       return (PublishReturnQos)queueMessage(entry);
1209    }
1210 
1211    /**
1212     * @see I_XmlBlasterAccess#publishOneway(MsgUnit[])
1213     */
1214    public void publishOneway(org.xmlBlaster.util.MsgUnit [] msgUnitArr) throws XmlBlasterException {
1215       if (!this.isValid)
1216          throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publishOneway");
1217       if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1218       final boolean ONEWAY = true;
1219       for (int ii=0; ii<msgUnitArr.length; ii++) {
1220          MsgQueuePublishEntry entry  = new MsgQueuePublishEntry(glob, msgUnitArr[ii],
1221                                           this.clientQueue.getStorageId(), ONEWAY);
1222          queueMessage(entry);
1223       }
1224    }
1225 
1226    // rename to publish()
1227    public PublishReturnQos[] publishArr(org.xmlBlaster.util.MsgUnit[] msgUnitArr) throws XmlBlasterException {
1228       if (!this.isValid)
1229          throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "publishArr");
1230       if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1231       if (this.firstWarn) {
1232          log.warning(getLogId()+"Publishing arrays is not atomic implemented - TODO");
1233          this.firstWarn = false;
1234       }
1235       PublishReturnQos[] retQos = new PublishReturnQos[msgUnitArr.length];
1236       for (int ii=0; ii<msgUnitArr.length; ii++) {
1237          MsgQueuePublishEntry entry  = new MsgQueuePublishEntry(glob, msgUnitArr[ii],
1238                                           this.clientQueue.getStorageId());
1239          retQos[ii] = (PublishReturnQos)queueMessage(entry);
1240       }
1241       return retQos;
1242    }
1243 
1244    /**
1245     * @see I_XmlBlasterAccess#erase(EraseKey, EraseQos)
1246     */
1247    public EraseReturnQos[] erase(EraseKey eraseKey, EraseQos eraseQos) throws XmlBlasterException {
1248       if (!this.isValid)
1249          throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "erase");
1250       if (!isConnected()) throw new XmlBlasterException(glob, ErrorCode.USER_NOT_CONNECTED, ME);
1251       MsgQueueEraseEntry entry  = new MsgQueueEraseEntry(glob,
1252                                       this.clientQueue.getStorageId(), eraseKey, eraseQos);
1253       EraseReturnQos[] arr = (EraseReturnQos[])queueMessage(entry);
1254       return (arr == null) ? new EraseReturnQos[0] : arr;
1255    }
1256 
1257    /**
1258     * @see I_XmlBlasterAccess#erase(EraseKey, EraseQos)
1259     */
1260    public EraseReturnQos[] erase(java.lang.String xmlKey, java.lang.String qos) throws XmlBlasterException {
1261       return erase(new EraseKey(glob, glob.getQueryKeyFactory().readObject(xmlKey)),
1262                        new EraseQos(glob, glob.getQueryQosFactory().readObject(qos)) );
1263    }
1264    
1265    /**
1266     * For example called by SOCKET layer (SocketCallbackImpl.java) on EOF. 
1267     * Does immediate ping to go to polling mode
1268     * @param xmlBlasterException
1269     * @see org.xmlBlaster.client.I_CallbackExtended#lostConnection(XmlBlasterException)
1270     */
1271    public void lostConnection(XmlBlasterException xmlBlasterException) {
1272       if (log.isLoggable(Level.FINE)) log.fine("Communication layer lost connection: " + ((xmlBlasterException==null)?"":xmlBlasterException.toString()));
1273       this.dispatchManager.pingCallbackServer(false, true);
1274    }
1275 
1276    /**
1277     * Force a async ping to re-check connection to server. Status change can be
1278     * got asynchronously via registerConnectionListener()
1279     */
1280    public void ping() {
1281       this.dispatchManager.pingCallbackServer(false, false);
1282    }
1283 
1284    /**
1285     * This is the callback method invoked from xmlBlaster
1286     * delivering us a new asynchronous message.
1287     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey, byte[], UpdateQos)
1288     */
1289    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
1290       if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Entering update(updateKey=" + updateKey.getOid() +
1291                     ", subscriptionId=" + updateQos.getSubscriptionId() + ", " + ((this.synchronousCache != null) ? "using synchronousCache" : "no synchronousCache") + ") ...");
1292 
1293       if (this.synchronousCache != null) {
1294          boolean retVal;
1295          synchronized (this.synchronousCache) {
1296             retVal = this.synchronousCache.update(updateQos.getSubscriptionId(), updateKey, content, updateQos);
1297          }
1298          if (retVal) {
1299             if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Putting update message " + updateQos.getSubscriptionId() + " into cache");
1300             return Constants.RET_OK; // "<qos><state id='OK'/></qos>";
1301          }
1302          if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Update message " + updateQos.getSubscriptionId() + " is not for cache");
1303       }
1304 
1305       Object obj = null;
1306       // sync against subscribe & put
1307       // otherwise if the update was faster then the subscribe to return we miss the entry
1308       synchronized (this.updateDispatcher) {
1309          obj = this.updateDispatcher.getCallback(updateQos.getSubscriptionId());
1310       }
1311 
1312       if (obj != null) {  // If a special callback was specified for this subscription:
1313          I_Callback cb = (I_Callback)obj;
1314          return cb.update(cbSessionId, updateKey, content, updateQos); // deliver the update to our client
1315       }
1316       else if (this.updateListener != null) {
1317          // If a general callback was specified on login:
1318          return this.updateListener.update(cbSessionId, updateKey, content, updateQos); // deliver the update to our client
1319       }
1320       else {
1321          log.severe(getLogId()+"Ignoring unexpected update message as client has not registered a callback: " + updateKey.toXml() + "" + updateQos.toXml());
1322       }
1323 
1324       return Constants.RET_OK; // "<qos><state id='OK'/></qos>";
1325    }
1326 
1327    /**
1328     * Call by DispatchManager on connection state transition.
1329     * <p />
1330     * Enforced by interface I_ConnectionStatusListener
1331     */
1332    public void toAlive(I_DispatchManager dispatchManager, ConnectionStateEnum oldState) {
1333       if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE + " connectInProgress=" + this.connectInProgress);
1334       if (this.clientQueue != null && this.clientQueue.getNumOfEntries() > 0) {
1335          log.info(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE +
1336                       " connectInProgress=" + this.connectInProgress +
1337                       " with " + this.clientQueue.getNumOfEntries() + " client side queued messages");
1338       }
1339       if (this.connectInProgress) {
1340          dispatchManager.trySyncMode(true);
1341          if (this.clientQueue != null && this.clientQueue.getNumOfEntries() > 0) {
1342             try {
1343                MsgQueueEntry entry = (MsgQueueEntry)this.clientQueue.peek();
1344                if (entry.getMethodName() == MethodName.CONNECT) {
1345                   this.clientQueue.remove();
1346                   log.info(getLogId()+"Removed queued connect message, our new connect has precedence");
1347                }
1348             }
1349             catch (XmlBlasterException e) {
1350                log.severe(getLogId()+"Removing connect entry in client tail back queue failed: " + e.getMessage() + "\n" + toXml());
1351             }
1352          }
1353          return;
1354       }
1355 
1356       if (this.clientQueue == null || this.clientQueue.getNumOfEntries() == 0) {
1357          dispatchManager.trySyncMode(true);
1358       }
1359 
1360       if (this.connectReturnQos == null || !this.connectReturnQos.isReconnected()) {
1361          cleanupForNewServer();
1362       }
1363 
1364       if (this.connectionListener != null) {
1365          this.connectionListener.reachedAlive(oldState, this);
1366       }
1367    }
1368 
1369    public void toAliveSync(I_DispatchManager dispatchManager, ConnectionStateEnum oldState) {
1370       if (this.connectionListener != null) {
1371          this.connectionListener.reachedAliveSync(oldState, this);
1372       }
1373    }
1374    
1375    /**
1376     * If we have reconnected to xmlBlaster and the xmlBlaster server instance
1377     * is another one which does not know our session state and subscribes we need to clear all
1378     * cached subscribes etc.
1379     */
1380    private void cleanupForNewServer() {
1381       if (this.updateDispatcher.size() > 0) {
1382          int num = this.updateDispatcher.clearAckNonPersistentSubscriptions(); // to avoid memory leaks, subscribes pending in the queue are not cleared
1383          if (num > 0) {
1384             log.info(getLogId()+"Removed " + num + " subscribe specific callback registrations");
1385          }
1386          // TODO: On switch to sync delivery and the client has
1387          // cleared subscribes from the queue manually we have still a memory leak here:
1388          // We would need to call clearNAKSubscriptions()
1389       }
1390       if (this.synchronousCache != null) {
1391          this.synchronousCache.clear(); // we need to re-subscribe
1392       }
1393    }
1394 
1395    /**
1396     * Call by DispatchManager on connection state transition.
1397     * <p />
1398     * Enforced by interface I_ConnectionStatusListener
1399     */
1400    public void toPolling(I_DispatchManager dispatchManager, ConnectionStateEnum oldState) {
1401       if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING + " connectInProgress=" + this.connectInProgress);
1402       if (this.connectInProgress) return;
1403       if (this.connectionListener != null) {
1404          this.connectionListener.reachedPolling(oldState, this);
1405       }
1406    }
1407 
1408    /**
1409     * Workaround to transport the reason for the toDead() transition as
1410     * the interface {@link I_ConnectionStateListener#reachedDead(ConnectionStateEnum, I_XmlBlasterAccess) is missing
1411     * to pass the exception to the client.
1412     * <p>
1413     * Currently the client needs a downcast to XmlBlasterAccess (not in I_XmlBlasterAccess)
1414     * @return Can be null
1415     */
1416    public XmlBlasterException getToDeadXmlBlasterException() {
1417       return toDeadXmlBlasterException;
1418    }
1419 
1420    /**
1421     * Call by DispatchManager on connection state transition.
1422     * <p>Enforced by interface I_ConnectionStatusListener</p>
1423     */
1424    public void toDead(I_DispatchManager dispatchManager, ConnectionStateEnum oldState, XmlBlasterException xmlBlasterException) {
1425       if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD + " connectInProgress=" + this.connectInProgress);
1426       if (this.connectionListener != null) {
1427        this.toDeadXmlBlasterException = xmlBlasterException; // hack, description see #getToDeadXmlBlasterException
1428          this.connectionListener.reachedDead(oldState, this);
1429       }
1430    }
1431 
1432    /**
1433     * Access the environment settings of this connection.
1434     * <p>Enforced by interface I_XmlBlasterAccess</p>
1435     * @return The global handle (like a stack with local variables for this connection)
1436     */
1437    public Global getGlobal() {
1438       return this.glob;
1439    }
1440 
1441    /**
1442     * <p>Enforced by interface I_ConnectionHandler</p>
1443     * @return The queue used to store tailback messages.
1444     */
1445    public I_Queue getQueue() {
1446       return this.clientQueue;
1447    }
1448 
1449    /**
1450     * <p>Enforced by interface I_ConnectionHandler</p>
1451     * @return The current state of the connection
1452     */
1453    public ConnectionStateEnum getState() {
1454       if (!isConnected()) return ConnectionStateEnum.UNDEF;
1455       return this.dispatchManager.getDispatchConnectionsHandler().getState();
1456    }
1457 
1458    /**
1459     * Get the connection state.
1460     * String version for JMX access.
1461     * @return "UNDEF", "ALIVE", "POLLING", "DEAD"
1462     */
1463    public String getConnectionState() {
1464       return getState().toString();
1465    }
1466 
1467    /**
1468     * <p>Enforced by interface I_ConnectionHandler</p>
1469     * @return true if the connection to xmlBlaster is operational
1470     */
1471    public boolean isAlive() {
1472       if (!isConnected()) return false;
1473       return this.dispatchManager.getDispatchConnectionsHandler().isAlive();
1474    }
1475 
1476    /**
1477     * <p>Enforced by interface I_ConnectionHandler</p>
1478     * @return true if we are polling for the server
1479     */
1480    public boolean isPolling() {
1481       if (!isConnected()) return false;
1482       return this.dispatchManager.getDispatchConnectionsHandler().isPolling();
1483    }
1484 
1485    /**
1486     * <p>Enforced by interface I_ConnectionHandler</p>
1487     * @return true if we have definitely lost the connection to xmlBlaster and gave up
1488     */
1489    public boolean isDead() {
1490       if (!isConnected()) return false;
1491       return this.dispatchManager.getDispatchConnectionsHandler().isDead();
1492    }
1493 
1494    /**
1495     * Access the returned QoS of a connect() call.
1496     * <p>Enforced by interface I_XmlBlasterAccess</p>
1497     * @return Can be null if not connected
1498     */
1499    public ConnectReturnQos getConnectReturnQos() {
1500       return this.connectReturnQos;
1501    }
1502 
1503    /**
1504     * Access the current ConnectQos
1505     * <p>Enforced by interface I_XmlBlasterAccess</p>
1506     * @return Can be null if not connected
1507     */
1508    public ConnectQos getConnectQos() {
1509       return this.connectQos;
1510    }
1511 
1512    /**
1513     * @return null if no callback is configured
1514     */
1515    public final DispatchStatistic getDispatchStatistic() {
1516       if (this.statistic == null) {
1517          synchronized (this) {
1518             if (this.statistic == null) {
1519                if (this.dispatchManager != null)
1520                   this.statistic = this.dispatchManager.getDispatchStatistic();
1521                else
1522                   this.statistic = new DispatchStatistic();
1523             }
1524          }
1525       }
1526       return this.statistic;
1527    }
1528 
1529    /**
1530     * Access the login name.
1531     * @return your login name or null if you are not logged in
1532     */
1533    public synchronized final String getLoginName() {
1534       SessionName sn = getSessionName();
1535       if (sn == null) return "xmlBlasterClient";
1536       return sn.getLoginName();
1537       /*
1538       //if (this.connectReturnQos != null)
1539       //   return this.connectReturnQos.getLoginName();
1540       //try {
1541          if (connectQos != null && connectQos.getSecurityQos() != null) {
1542             String nm = connectQos.getSecurityQos().getUserId();
1543             if (nm != null && nm.length() > 0)
1544                return nm;
1545          }
1546       //}
1547       //catch (XmlBlasterException e) {}
1548       return glob.getId(); // "client?";
1549       */
1550    }
1551 
1552    public final boolean isCallbackConfigured() {
1553       return (this.cbServer != null);
1554    }
1555 
1556    public final long getUptime() {
1557       return (System.currentTimeMillis() - this.startupTime)/1000L;
1558    }
1559 
1560    public final String getLoginDate() {
1561       long ll = this.startupTime;
1562       java.sql.Timestamp tt = new java.sql.Timestamp(ll);
1563       return tt.toString();
1564    }
1565 
1566    public synchronized final long getPublicSessionId() {
1567       SessionName sn = getSessionName();
1568       if (sn == null) return 0;
1569       return sn.getPublicSessionId();
1570    }
1571 
1572    public final long getNumPublish() {
1573       return getDispatchStatistic().getNumPublish();
1574    }
1575 
1576    public final long getNumSubscribe() {
1577       return getDispatchStatistic().getNumSubscribe();
1578    }
1579 
1580    public final long getNumUnSubscribe() {
1581       return getDispatchStatistic().getNumUnSubscribe();
1582    }
1583 
1584    public final long getNumGet() {
1585       return getDispatchStatistic().getNumGet();
1586    }
1587 
1588    public final long getNumErase() {
1589       return getDispatchStatistic().getNumErase();
1590    }
1591 
1592    public final long getNumUpdateOneway() {
1593       return getDispatchStatistic().getNumUpdateOneway();
1594    }
1595 
1596    public final long getNumUpdate() {
1597       return getDispatchStatistic().getNumUpdate();
1598    }
1599 
1600    public synchronized final long getConnectionQueueNumMsgs() {
1601       if (this.clientQueue == null) return 0L;
1602       return this.clientQueue.getNumOfEntries();
1603    }
1604 
1605    public synchronized final long getConnectionQueueMaxMsgs() {
1606       if (this.clientQueue == null) return 0L;
1607       return this.clientQueue.getMaxNumOfEntries();
1608    }
1609 
1610    public final long getPingRoundTripDelay() {
1611       return getDispatchStatistic().getPingRoundTripDelay();
1612    }
1613 
1614    public final long getRoundTripDelay() {
1615       return getDispatchStatistic().getRoundTripDelay();
1616    }
1617 
1618    /** JMX **/
1619    public String invokePublish(String key, String content, String qos) throws Exception {
1620       if (key == null || key.length()==0 || key.equalsIgnoreCase("String"))
1621          throw new IllegalArgumentException("Please pass a valid XML key like '<key oid='Hello'/> or the simple oid like 'Hello'");
1622       if (key.indexOf("<") == -1) {
1623          key = "<key oid='" + key + "'/>";
1624       }
1625       qos = checkQueryKeyQos(key, qos);
1626       if (content == null) content = "";
1627       try {
1628          MsgUnit msgUnit = new MsgUnit(key, content, qos);
1629          PublishReturnQos prq = publish(msgUnit);
1630          return prq.toString();
1631       }
1632       catch (XmlBlasterException e) {
1633          throw new Exception(e.toString());
1634       }
1635    }
1636 
1637    private String checkQueryKeyQos(String url, String qos) {
1638       if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"url=" + url + " qos=" + qos);
1639       if (url == null || url.length()==0 || url.equalsIgnoreCase("String"))
1640          throw new IllegalArgumentException("Please pass a valid URL like 'xpath://key' or a simple oid like 'Hello'");
1641       if (qos == null || qos.length()==0 || qos.equalsIgnoreCase("String")) qos = "<qos/>";
1642       return qos;
1643    }
1644 
1645    /** JMX **/
1646    public String[] invokeUnSubscribe(String url, String qos) throws Exception {
1647       qos = checkQueryKeyQos(url, qos);
1648       try {
1649          UnSubscribeKey usk = new UnSubscribeKey(glob, url);
1650          UnSubscribeReturnQos[] usrq = unSubscribe(usk, new UnSubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)));
1651          if (usrq == null) return new String[0];
1652          String[] ret = new String[usrq.length];
1653          if (ret.length < 1) {
1654             return new String[] { "unSubscribe '"+url+"' did not match any subscription" };
1655          }
1656          for (int i=0; i<usrq.length; i++) {
1657             ret[i] = usrq[i].toXml();
1658          }
1659          return ret;
1660       }
1661       catch (XmlBlasterException e) {
1662          throw new Exception(e.toString());
1663       }
1664    }
1665 
1666    /** JMX **/
1667    public String invokeSubscribe(String url, String qos) throws Exception {
1668       qos = checkQueryKeyQos(url, qos);
1669       try {
1670          SubscribeKey usk = new SubscribeKey(glob, url);
1671          SubscribeReturnQos srq = subscribe(usk, new SubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)));
1672          if (srq == null) return "";
1673          return srq.toXml();
1674       }
1675       catch (XmlBlasterException e) {
1676          throw new Exception(e.toString());
1677       }
1678    }
1679 
1680    /** JMX **/
1681    public String[] invokeGet(String url, String qos) throws Exception {
1682       qos = checkQueryKeyQos(url, qos);
1683       try {
1684          GetKey gk = new GetKey(glob, url);
1685          MsgUnit[] msgs = get(gk, new GetQos(glob, glob.getQueryQosFactory().readObject(qos)));
1686          if (msgs == null) return new String[0];
1687          if (msgs == null || msgs.length < 1) {
1688             return new String[] { "get('"+url+"') did not match any topic" };
1689          }
1690          ArrayList tmpList = new ArrayList();
1691          for (int i=0; i<msgs.length; i++) {
1692             tmpList.add("  "+msgs[i].getKeyData().toXml());
1693             tmpList.add("  "+msgs[i].getContentStr());
1694             tmpList.add("  "+msgs[i].getQosData().toXml());
1695          }
1696          return (String[])tmpList.toArray(new String[tmpList.size()]);
1697       }
1698       catch (XmlBlasterException e) {
1699          throw new Exception(e.toString());
1700       }
1701    }
1702 
1703    /** JMX **/
1704    public String[] invokeErase(String url, String qos) throws Exception {
1705       qos = checkQueryKeyQos(url, qos);
1706       try {
1707          EraseKey ek = new EraseKey(glob, url);
1708          EraseReturnQos[] erq = erase(ek, new EraseQos(glob, glob.getQueryQosFactory().readObject(qos)));
1709          if (erq == null) return new String[0];
1710          String[] ret = new String[erq.length];
1711          if (ret.length < 1) {
1712             return new String[] { "erase('"+url+"') did not match any topic, nothing is erased." };
1713          }
1714          for (int i=0; i<erq.length; i++) {
1715             ret[i] = erq[i].toXml();
1716          }
1717          return ret;
1718       }
1719       catch (XmlBlasterException e) {
1720          throw new Exception(e.toString());
1721       }
1722    }
1723 
1724    /**
1725     * Sets the DispachManager belonging to this session to active or inactive.
1726     * It is initially active. Setting it to false temporarly inhibits dispatch of
1727     * messages which are in the callback queue. Setting it to true starts the
1728     * dispatch again.
1729     * @param dispatchActive
1730     */
1731    public synchronized void setDispatcherActive(boolean dispatcherActive) {
1732       if (this.dispatchManager != null) {
1733          this.dispatchManager.setDispatcherActive(dispatcherActive);
1734       }
1735    }
1736 
1737    public synchronized  boolean getDispatcherActive() {
1738       if (this.dispatchManager != null) {
1739          return this.dispatchManager.isDispatcherActive();
1740       }
1741       return false;
1742    }
1743 
1744    public void setCallbackDispatcherActive(boolean activate) throws XmlBlasterException {
1745       if (this.streamingCb != null && !isCallbackDispatcherActive() && activate) {
1746          int ret = this.streamingCb.sendInitialQueueEntries();
1747          log.info("locally retrieved '" + ret + "' chunks");
1748       }
1749 
1750       String command = getSessionName() + "/?dispatcherActive=" + activate;
1751       sendAdministrativeCommand(command);
1752       this.connectQos.getSessionCbQueueProperty().getCurrentCallbackAddress().setDispatcherActive(activate);
1753    }
1754 
1755    public boolean isCallbackDispatcherActive() throws XmlBlasterException {
1756       String command = getSessionName() + "/?dispatcherActive";
1757       boolean ret = "true".equalsIgnoreCase(sendAdministrativeCommand(command));
1758       return ret;
1759    }
1760 
1761    public String sendAdministrativeCommand(String command) throws XmlBlasterException {
1762       if (command == null)
1763          throw new IllegalArgumentException("sendAdministrativeCommand() called with null argument");
1764       command = command.trim();
1765       boolean isGet = command.indexOf("get ") == 0 || command.indexOf("GET ") == 0;
1766       boolean isSet = command.indexOf("set ") == 0 || command.indexOf("SET ") == 0;
1767       String cmd = ((isGet || isSet)) ? command.substring(4) : command;
1768 
1769       if (isSet || (!isGet && cmd.indexOf("=") != -1)) {
1770          String oid = "__cmd:" + cmd;
1771          PublishKey key = new PublishKey(glob, oid); // oid="__cmd:/client/joe/1/?dispatcherActive=false"
1772          PublishQos qos = new PublishQos(glob);
1773          MsgUnit msgUnit = new MsgUnit(key, "", qos);
1774          try {
1775             PublishReturnQos ret = publish(msgUnit);
1776             if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Send '" + cmd + " '");
1777             return ret.getState();
1778          }
1779          catch (XmlBlasterException e) {
1780             if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Sending of '" + cmd + " ' failed: " + e.getMessage());
1781             throw e;
1782          }
1783       }
1784       else {
1785          String oid = "__cmd:" + cmd;
1786          GetKey getKey = new GetKey(glob, oid);
1787          GetQos getQos = new GetQos(glob);
1788          try {
1789             MsgUnit[] msgs = get(getKey, getQos);
1790             if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Send '" + cmd + " ', got array of size " + msgs.length);
1791             if (msgs.length == 0)
1792                return "";
1793             return msgs[0].getContentStr();
1794          }
1795          catch (XmlBlasterException e) {
1796             if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Sending of '" + cmd + " ' failed: " + e.getMessage());
1797             throw e;
1798          }
1799       }
1800    }
1801 
1802    public synchronized String[] peekClientMessages(int numOfEntries) throws Exception {
1803       try {
1804          if (numOfEntries == 0)
1805             return new String[] { "Please pass number of messages to peak" };
1806          if (this.clientQueue == null)
1807             return new String[] { "There is no client queue available" };
1808          if (this.clientQueue.getNumOfEntries() < 1)
1809             return new String[] { "The client queue is empty" };
1810 
1811          List<I_Entry> list = this.clientQueue.peek(numOfEntries, -1);
1812 
1813          if (list.size() == 0)
1814             return new String[] { "Peeking messages from client queue failed, the reason is not known" };
1815 
1816          ArrayList tmpList = new ArrayList();
1817          for (int i=0; i<list.size(); i++) {
1818             MsgQueueEntry entry = (MsgQueueEntry)list.get(i);
1819             if (entry instanceof MsgQueuePublishEntry) {
1820                MsgQueuePublishEntry pe = (MsgQueuePublishEntry)entry;
1821                tmpList.add("  "+pe.getMsgUnit().getKeyData().toXml());
1822                tmpList.add("  "+pe.getMsgUnit().getContentStr());
1823                tmpList.add("  "+pe.getMsgUnit().getQosData().toXml());
1824             }
1825             else if (entry instanceof MsgQueueConnectEntry) {
1826                MsgQueueConnectEntry pe = (MsgQueueConnectEntry)entry;
1827                 tmpList.add("  "+pe.getConnectQosData().toXml());
1828             }
1829             else if (entry instanceof MsgQueueDisconnectEntry) {
1830                MsgQueueDisconnectEntry pe = (MsgQueueDisconnectEntry)entry;
1831                 tmpList.add("  "+pe.getDisconnectQos().toXml());
1832             }
1833             else if (entry instanceof MsgQueueEraseEntry) {
1834                MsgQueueEraseEntry pe = (MsgQueueEraseEntry)entry;
1835                 tmpList.add("  "+pe.getEraseKey().toXml());
1836                 tmpList.add("  "+pe.getEraseQos().toXml());
1837             }
1838             else if (entry instanceof MsgQueueGetEntry) {
1839                MsgQueueGetEntry pe = (MsgQueueGetEntry)entry;
1840                 tmpList.add("  "+pe.getGetKey().toXml());
1841                 tmpList.add("  "+pe.getGetQos().toXml());
1842             }
1843             else if (entry instanceof MsgQueueSubscribeEntry) {
1844                MsgQueueSubscribeEntry pe = (MsgQueueSubscribeEntry)entry;
1845                 tmpList.add("  "+pe.getSubscribeKeyData().toXml());
1846                 tmpList.add("  "+pe.getSubscribeQosData().toXml());
1847             }
1848             else if (entry instanceof MsgQueueUnSubscribeEntry) {
1849                MsgQueueUnSubscribeEntry pe = (MsgQueueUnSubscribeEntry)entry;
1850                 tmpList.add("  "+pe.getUnSubscribeKey().toXml());
1851                 tmpList.add("  "+pe.getUnSubscribeQos().toXml());
1852             }
1853             else {
1854                tmpList.add("Unsupported message queue entry '" + entry.getClass().getName() + "'");
1855             }
1856          }
1857 
1858          return (String[])tmpList.toArray(new String[tmpList.size()]);
1859       }
1860       catch (XmlBlasterException e) {
1861          throw new Exception(e.toString());
1862       }
1863    }
1864 
1865    /**
1866     * Peek messages from client queue and dump them to a file, they are not removed.
1867     * @param numOfEntries The number of messages to peek, taken from the front
1868     * @param path The path to dump the messages to, it is automatically created if missing.
1869     * @return The file names of the dumped messages
1870     */
1871    public synchronized String[] peekClientMessagesToFile(int numOfEntries, String path) throws Exception {
1872       try {
1873          return this.glob.peekQueueMessagesToFile(this.clientQueue, numOfEntries, path, "client");
1874       }
1875       catch (XmlBlasterException e) {
1876          throw new Exception(e.toString());
1877       }
1878    }
1879 
1880    /**
1881     * Command line usage.
1882     */
1883    public static String usage(Global glob) {
1884       glob = (glob == null) ? Global.instance() : glob;
1885       StringBuffer sb = new StringBuffer(4096);
1886       sb.append("\n");
1887       sb.append("Choose a connection protocol:\n");
1888       sb.append("   -protocol           Specify a protocol to talk with xmlBlaster, 'SOCKET' or 'IOR' or 'RMI' or 'SOAP' or 'XMLRPC'.\n");
1889       sb.append("                       This is used for connection to xmlBlaster and for the callback connection.\n");
1890       sb.append("                       Current setting is '" + glob.getProperty().get("client.protocol", "IOR") + "'. See below for protocol settings.\n");
1891       sb.append("   -dispatch/connection/protocol <protocol>\n");
1892       sb.append("                       Specify the protocol to connect to xmlBlaster only (not for the callback).\n");
1893       sb.append("   -dispatch/callback/protocol <protocol>\n");
1894       sb.append("                       Specify the protocol for the callback connection only.\n");
1895       sb.append("              Example: java MyApp -protocol SOCKET\n");
1896       sb.append("                       java MyApp -dispatch/connection/protocol RMI -dispatch/connection/plugin/rmi/hostname 192.168.10.34\n");
1897       sb.append("                       java MyApp -dispatch/connection/protocol RMI -dispatch/callback/protocol XMLRPC\n");
1898       sb.append("\n");
1899       sb.append("Security features:\n");
1900       sb.append("   -Security.Client.DefaultPlugin \"htpasswd,1.0\"\n");
1901       sb.append("                       Force the given authentication schema, here the 'htpasswd' is enforced\n");
1902       sb.append("                       Clients can overwrite this with ConnectQos.java\n");
1903       try {
1904       sb.append(new org.xmlBlaster.client.qos.ConnectQos(glob).usage());
1905       } catch (XmlBlasterException e) {}
1906       sb.append(new org.xmlBlaster.util.qos.address.Address(glob).usage());
1907       sb.append(new org.xmlBlaster.util.qos.storage.ClientQueueProperty(glob,null).usage());
1908       sb.append(new org.xmlBlaster.util.qos.address.CallbackAddress(glob).usage());
1909       sb.append(new org.xmlBlaster.util.qos.storage.CbQueueProperty(glob,null,null).usage());
1910       sb.append(new org.xmlBlaster.util.qos.storage.HistoryQueueProperty(glob,null).usage("Control the default size of the history queue for each topic (send with publish calls)"));
1911       sb.append(getPluginUsage("org.xmlBlaster.client.protocol.socket.SocketConnection"));
1912       sb.append(getPluginUsage("org.xmlBlaster.client.protocol.corba.CorbaConnection"));
1913       sb.append(getPluginUsage("org.xmlBlaster.client.protocol.rmi.RmiConnection"));
1914       sb.append(getPluginUsage("org.xmlBlaster.client.protocol.xmlrpc.XmlRpcConnection"));
1915       //sb.append(org.xmlBlaster.util.Global.instance().usage()); // for Logger help
1916       return sb.toString();
1917    }
1918 
1919    /**
1920     * Access plugin specific usage()
1921     * @return if plugin is not in CLASSPATH return empty string
1922     */
1923    public static String getPluginUsage(String clazzName) {
1924       try {
1925          Class clazz = java.lang.Class.forName(clazzName);
1926          if (clazz != null) {
1927             Class[] paramCls = new Class[0];
1928             Object[] params = new Object[0];
1929             java.lang.reflect.Method method = clazz.getMethod("usage", paramCls);
1930             String tmp = (String)method.invoke(clazz, params);
1931             return tmp;
1932          }
1933       }
1934       catch (Exception ex) { // java.lang.ClassNotFoundException:
1935       }
1936       return "";
1937    }
1938 
1939    /**
1940     * Dump state of this object into a XML ASCII string.
1941     * <br>
1942     * @return internal state of SubjectInfo as a XML ASCII string
1943     */
1944    public final String toXml() {
1945       return toXml((String)null);
1946    }
1947 
1948    /**
1949     * Dump state of this object into a XML ASCII string.
1950     * <br>
1951     * @param extraOffset indenting of tags for nice output
1952     * @return internal state of SubjectInfo as a XML ASCII string
1953     */
1954    public final String toXml(String extraOffset) {
1955       StringBuffer sb = new StringBuffer(1024);
1956       if (extraOffset == null) extraOffset = "";
1957       String offset = Constants.OFFSET + extraOffset;
1958 
1959       sb.append(offset).append("<XmlBlasterAccess id='").append(this.getId());
1960       if (this.dispatchManager != null && this.dispatchManager.getDispatchConnectionsHandler() != null) {
1961          sb.append("' state='").append(this.dispatchManager.getDispatchConnectionsHandler().getState());
1962       }
1963       sb.append("'>");
1964       sb.append(offset).append(" <connected>").append(isConnected()).append("</connected>");
1965       sb.append(offset).append("</XmlBlasterAccess>");
1966 
1967       return sb.toString();
1968    }
1969 
1970    /**
1971     *  For testing invoke: java org.xmlBlaster.client.XmlBlasterAccess
1972     */
1973    public static void main( String[] args ) {
1974       try {
1975          final Global glob = new Global(args);
1976          final String oid = "HelloWorld";
1977 
1978          final I_XmlBlasterAccess xmlBlasterAccess = glob.getXmlBlasterAccess();
1979 
1980          /*
1981          try {
1982             log.info(ME, "Hit a key to subscribe on topic " + oid);
1983             try { System.in.read(); } catch(java.io.IOException e) {}
1984             SubscribeKey sk = new SubscribeKey(glob, oid);
1985             SubscribeQos sq = new SubscribeQos(glob);
1986             SubscribeReturnQos subRet = xmlBlasterAccess.subscribe(sk, sq);
1987             log.info(ME, "Subscribed for " + sk.toXml() + "\n" + sq.toXml() + " return:\n" + subRet.toXml());
1988          }
1989          catch(XmlBlasterException e) {
1990             log.error(ME, e.getMessage());
1991          }
1992          */
1993 
1994          xmlBlasterAccess.registerConnectionListener(new I_ConnectionStateListener() {
1995             public void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
1996                log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE + " with " + connection.getQueue().getNumOfEntries() + " queue entries pending");
1997             }
1998             public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
1999                log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.POLLING);
2000             }
2001             public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
2002                log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.DEAD);
2003             }
2004             public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
2005                log.severe("DEBUG ONLY: Changed from connection state " + oldState + " to " + ConnectionStateEnum.ALIVE + " in sync");
2006             }
2007          });
2008 
2009          ConnectReturnQos connectReturnQos = xmlBlasterAccess.connect(null, new I_Callback() {
2010                public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
2011                   log.info("UPDATE: Receiving asynchronous callback message " + updateKey.toXml() + "\n" + updateQos.toXml());
2012                   return "";
2013                }
2014             });  // Login to xmlBlaster, default handler for updates
2015          if (xmlBlasterAccess.isAlive()) {
2016             log.info("Successfully connected to xmlBlaster");
2017          }
2018          else {
2019             log.info("We continue in fail safe mode: " + connectReturnQos.toXml());
2020          }
2021 
2022          {
2023             log.info("Hit a key to subscribe on topic " + oid);
2024             try { System.in.read(); } catch(java.io.IOException e) {}
2025             SubscribeKey sk = new SubscribeKey(glob, oid);
2026             SubscribeQos sq = new SubscribeQos(glob);
2027             SubscribeReturnQos subRet = xmlBlasterAccess.subscribe(sk, sq);
2028             log.info("Subscribed for " + sk.toXml() + "\n" + sq.toXml() + " return:\n" + subRet.toXml());
2029 
2030             log.info("Hit a key to publish '" + oid + "'");
2031             try { System.in.read(); } catch(java.io.IOException e) {}
2032             MsgUnit msgUnit = new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>");
2033             PublishReturnQos publishReturnQos = xmlBlasterAccess.publish(msgUnit);
2034             log.info("Successfully published message to xmlBlaster, msg=" + msgUnit.toXml() + "\n returned QoS=" + publishReturnQos.toXml());
2035             try { Thread.sleep(1000L); } catch( InterruptedException i) {} // wait for update
2036 
2037             {
2038                log.info("Hit a key to 3 times publishOneway '" + oid + "'");
2039                try { System.in.read(); } catch(java.io.IOException e) {}
2040                MsgUnit[] msgUnitArr = new MsgUnit[] {
2041                   new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>"),
2042                   new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>"),
2043                   new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>")
2044                };
2045                xmlBlasterAccess.publishOneway(msgUnitArr);
2046                log.info("Successfully published " + msgUnitArr.length + " messages oneway");
2047                try { Thread.sleep(1000L); } catch( InterruptedException i) {} // wait for update
2048             }
2049 
2050             {
2051                log.info("Hit a key to 3 times publishArr '" + oid + "'");
2052                try { System.in.read(); } catch(java.io.IOException e) {}
2053                MsgUnit[] msgUnitArr = new MsgUnit[] {
2054                   new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>"),
2055                new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>"),
2056                   new MsgUnit(glob, "<key oid='"+oid+"'/>", "Hi".getBytes(), "<qos><persistent>true</persistent></qos>")
2057                };
2058                PublishReturnQos[] retArr = xmlBlasterAccess.publishArr(msgUnitArr);
2059                log.info("Successfully published " + retArr.length + " acknowledged messages");
2060                try { Thread.sleep(1000L); } catch( InterruptedException i) {} // wait for update
2061             }
2062 
2063             {
2064                log.info("Hit a key to get '" + oid + "'");
2065                try { System.in.read(); } catch(java.io.IOException e) {}
2066                GetKey gk = new GetKey(glob, oid);
2067                GetQos gq = new GetQos(glob);
2068                MsgUnit[] msgs = xmlBlasterAccess.get(gk, gq);
2069                log.info("Successfully got message from xmlBlaster, msg=" + msgs[0].toXml());
2070             }
2071 
2072             int numGetCached = 4;
2073             xmlBlasterAccess.createSynchronousCache(100);
2074             for (int i=0; i<numGetCached; i++) {
2075                log.info("Hit a key to getCached '" + oid + "' #"+i+"/"+numGetCached);
2076                try { System.in.read(); } catch(java.io.IOException e) {}
2077                GetKey gk = new GetKey(glob, oid);
2078                GetQos gq = new GetQos(glob);
2079                MsgUnit[] msgs = xmlBlasterAccess.getCached(gk, gq);
2080                log.info("Successfully got message from xmlBlaster, msg=" + msgs[0].toXml());
2081             }
2082 
2083             log.info("Hit a key to unSubscribe on topic '" + oid + "' and '" + subRet.getSubscriptionId() + "'");
2084             try { System.in.read(); } catch(java.io.IOException e) {}
2085             UnSubscribeKey uk = new UnSubscribeKey(glob, subRet.getSubscriptionId());
2086             UnSubscribeQos uq = new UnSubscribeQos(glob);
2087             UnSubscribeReturnQos[] unSubRet = xmlBlasterAccess.unSubscribe(uk, uq);
2088             log.info("UnSubscribed for " + uk.toXml() + "\n" + uq.toXml() + " return:\n" + unSubRet[0].toXml());
2089 
2090             log.info("Hit a key to erase on topic " + oid);
2091             try { System.in.read(); } catch(java.io.IOException e) {}
2092             EraseKey ek = new EraseKey(glob, oid);
2093             EraseQos eq = new EraseQos(glob);
2094             EraseReturnQos[] er = xmlBlasterAccess.erase(ek, eq);
2095             log.info("Erased for " + ek.toXml() + "\n" + eq.toXml() + " return:\n" + er[0].toXml());
2096          }
2097 
2098          int numPublish = 10;
2099          for (int ii=0; ii<numPublish; ii++) {
2100             log.info("Hit a key to publish #" + (ii+1) + "/" + numPublish);
2101             try { System.in.read(); } catch(java.io.IOException e) {}
2102 
2103             MsgUnit msgUnit = new MsgUnit(glob, "<key oid=''/>", ("Hi #"+(ii+1)).getBytes(), "<qos><persistent>true</persistent></qos>");
2104             PublishReturnQos publishReturnQos = xmlBlasterAccess.publish(msgUnit);
2105             log.info("Successfully published message #" + (ii+1) + " to xmlBlaster, msg=" + msgUnit.toXml() + "\n returned QoS=" + publishReturnQos.toXml());
2106          }
2107 
2108          log.info("Hit a key to disconnect ...");
2109          try { System.in.read(); } catch(java.io.IOException e) {}
2110          xmlBlasterAccess.disconnect(null);
2111       }
2112       catch (XmlBlasterException xmlBlasterException) {
2113          System.out.println("WARNING: Test failed: " + xmlBlasterException.getMessage());
2114       }
2115       catch (Throwable e) {
2116          e.printStackTrace();
2117          System.out.println("ERROR: Test failed: " + e.toString());
2118       }
2119       System.exit(0);
2120    }
2121 
2122    /**
2123     * The implementation which receives the callback messages.
2124     * @return Returns the updateListener or null if none was registered
2125     */
2126    public I_Callback getUpdateListener() {
2127       return this.updateListener;
2128    }
2129 
2130    /**
2131     * Register a listener to receive the callback messages.
2132     * <br />
2133     * Note: Usually you don't need to call this method directly
2134     * as you should pass your callback listener with connect().
2135     * @param updateListener The updateListener to set.
2136     */
2137    public void setUpdateListener(I_Callback updateListener) {
2138       this.updateListener = updateListener;
2139    }
2140 
2141    public String getVersion() {
2142       return glob.getVersion();
2143    }
2144    public String getRevisionNumber() {
2145       return glob.getRevisionNumber();
2146    }
2147    public String getBuildTimestamp() {
2148       return glob.getBuildTimestamp();
2149    }
2150    public String getBuildJavaVendor() {
2151       return glob.getBuildJavaVendor();
2152    }
2153    public String getBuildJavaVersion() {
2154       return glob.getBuildJavaVersion();
2155    }
2156 
2157    /**
2158     * Create a temporay topic.
2159     * You need to erase it yourself when not needed anymore
2160     * @param topicProperty Can be null (the default is no DOM entry)
2161     * @return The details about the created, temporary topic
2162     * @throws XmlBlasterException
2163     * @todo Automatically delete topic when session dies; don't allow other session to subscribe on it
2164     */
2165    public PublishReturnQos createTemporaryTopic(TopicProperty topicProperty) throws XmlBlasterException {
2166       if (topicProperty == null) {
2167          return createTemporaryTopic(-1, 10);
2168       }
2169       PublishKey pk = new PublishKey(glob, "");
2170       PublishQos pq = new PublishQos(glob);
2171       pq.setTopicProperty(topicProperty);
2172       MsgUnit msgUnit = new MsgUnit(pk, new byte[0], pq);
2173       PublishReturnQos prq = publish(msgUnit);
2174       if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Created temporary topic " + prq.getKeyOid());
2175       return prq;
2176    }
2177 
2178    public PublishReturnQos createTemporaryTopic(long destroyDelay, int historyMaxMsg) throws XmlBlasterException {
2179       return createTemporaryTopic(null, destroyDelay, historyMaxMsg);
2180    }
2181 
2182    /**
2183     * 
2184     * @param uniqueTopicId Usually null, can be used to force a topicId. 
2185     * e.g. topicIdPrefix="device.joe.request" -> the topic is something like "device.joe.request135823058558"
2186     * @param destroyDelay
2187     * @param historyMaxMsg
2188     * @return
2189     * @throws XmlBlasterException
2190     */
2191    public PublishReturnQos createTemporaryTopic(String uniqueTopicId, long destroyDelay, int historyMaxMsg) throws XmlBlasterException {
2192      if (uniqueTopicId == null) uniqueTopicId = "";
2193       PublishKey pk = new PublishKey(glob, uniqueTopicId);
2194       PublishQos pq = new PublishQos(glob);
2195       TopicProperty topicProperty = new TopicProperty(glob);
2196       topicProperty.setDestroyDelay(destroyDelay);
2197       topicProperty.setCreateDomEntry(false);
2198       topicProperty.setReadonly(false);
2199       pq.setAdministrative(true);
2200       if (historyMaxMsg >= 0L) {
2201          HistoryQueueProperty prop = new HistoryQueueProperty(this.glob, null);
2202          prop.setMaxEntries(historyMaxMsg);
2203          topicProperty.setHistoryQueueProperty(prop);
2204       }
2205       pq.setTopicProperty(topicProperty);
2206       MsgUnit msgUnit = new MsgUnit(pk, new byte[0], pq);
2207       PublishReturnQos prq = publish(msgUnit);
2208       if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Created temporary topic " + prq.getKeyOid());
2209       return prq;
2210    }
2211 
2212    // TODO: add other properties, add documentation requirement
2213    //       Add own class to support multiple request/reply over same temporary topic
2214    /**
2215     * @see org.xmlBlaster.test.client.TestRequestResponse
2216     */
2217    public MsgUnit[] request(MsgUnit msgUnit, long timeout, int maxEntries) throws XmlBlasterException {
2218       if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Entering request with timeout=" + timeout);
2219       if (msgUnit == null)
2220          throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Please supply a valid msgUnit to request()");
2221 
2222       // Create a temporary reply topic ...
2223       long destroyDelay = timeout+86400000; // on client crash, cleanup after one day; //long destroyDelay = -1;
2224       // optional, "device.joe.response" -> can be useful for performance, NOT thread safe
2225       boolean createResponseTopic = msgUnit.getQosData().getClientProperty("__createResponseTopic", true);
2226       if (createResponseTopic == false) {
2227           msgUnit.getQosData().getClientProperties().remove("__createResponseTopic");
2228       }
2229       String responseTopicId = msgUnit.getQosData().getClientProperty("__responseTopicId", "");
2230       if (responseTopicId.length() > 0) {
2231           msgUnit.getQosData().getClientProperties().remove("__responseTopicId");
2232       }
2233       else {
2234           // "device.joe.response" -> can be useful for authorization, must be distinguishable to other clients
2235           String responseTopicIdPrefix = msgUnit.getQosData().getClientProperty("__responseTopicIdPrefix", "");
2236           if (responseTopicIdPrefix.length() > 0) {
2237              responseTopicId = responseTopicIdPrefix + new Timestamp().getTimestamp(); // now thread safe for request()s in parallel
2238              msgUnit.getQosData().getClientProperties().remove("__responseTopicIdPrefix");
2239           }
2240       }
2241       if (createResponseTopic) {
2242         PublishReturnQos tempTopic = createTemporaryTopic(responseTopicId, destroyDelay, maxEntries);
2243           responseTopicId = tempTopic.getKeyOid();
2244       }
2245      
2246       try {
2247          // Send the request ...
2248          // "__jms:JMSReplyTo"
2249          msgUnit.getQosData().addClientProperty(Constants.addJmsPrefix(Constants.JMS_REPLY_TO, log), responseTopicId); // "__jms:JMSReplyTo"
2250          publish(msgUnit);
2251       
2252          // Access the reply ...
2253          MsgUnit[] msgs = receive("topic/"+responseTopicId, maxEntries, timeout, true);
2254 
2255          return msgs;
2256       }
2257       finally {
2258          if (createResponseTopic && responseTopicId.length() == 0) {
2259             // Clean up temporary topic ...
2260             EraseKey ek = new EraseKey(glob, responseTopicId);
2261             EraseQos eq = new EraseQos(glob);
2262             eq.setForceDestroy(true);
2263             erase(ek, eq);
2264          }
2265       }
2266    }
2267 
2268    public MsgUnit[] receive(String oid, int maxEntries, long timeout, boolean consumable) throws XmlBlasterException {
2269       if (oid == null || oid.length() == 0)
2270          throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Please supply a valid oid to receive()");
2271 
2272       ContextNode node = ContextNode.valueOf(oid);
2273       if (node == null) {
2274          throw new IllegalArgumentException("Can't parse '" + oid + "' to a ContextNode");
2275       }
2276       if (node.isOfClass(ContextNode.SESSION_MARKER_TAG))
2277          node = node.getParent();
2278       if (node == null) {
2279          throw new IllegalArgumentException("Can't parse '" + oid + "' to a ContextNode");
2280       }
2281       if (node.isOfClass(ContextNode.TOPIC_MARKER_TAG))
2282          oid = "__cmd:"+oid+"/?historyQueueEntries"; // "__cmd:topic/hello/?historyQueueEntries"
2283       else if (node.isOfClass(ContextNode.SUBJECT_MARKER_TAG) && node.getChild(ContextNode.SESSION_MARKER_TAG, null) != null)
2284             oid = "__cmd:"+oid+"/?callbackQueueEntries"; // "__cmd:client/joe/session/1/?callbackQueueEntries";
2285       else if (node.isOfClass(ContextNode.SUBJECT_MARKER_TAG))
2286             oid = "__cmd:"+oid+"/?subjectQueueEntries"; // "__cmd:client/joe/?subjectQueueEntries"
2287 
2288       GetKey getKey = new GetKey(glob, oid);
2289       String qos = "<qos>" +
2290                    "<querySpec type='QueueQuery'>" +
2291                    "maxEntries="+maxEntries+"&amp;maxSize=-1&amp;consumable="+consumable+"&amp;waitingDelay="+timeout+
2292                    "</querySpec>" +
2293                    "</qos>";
2294       GetQos getQos = new GetQos(glob, glob.getQueryQosFactory().readObject(qos));
2295       MsgUnit[] msgs = get(getKey, getQos);
2296       if (log.isLoggable(Level.FINEST)) log.finest(getLogId()+"Got " + msgs.length + " reply :\n" + ((msgs.length>0)?msgs[0].toXml():""));
2297       return msgs;
2298    }
2299 
2300    
2301    private PublishReturnQos publishSingleChunk(MsgKeyData keyData, MsgQosData chunkQosData, byte[] buf, int length, boolean isLastChunk, long count, Exception ex) throws XmlBlasterException {
2302       MsgKeyData chunkKeyData = keyData;
2303       MsgUnit msg = new MsgUnit(chunkKeyData, buf, chunkQosData);
2304       if (isLastChunk || ex != null)
2305          chunkQosData.addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_EOF, log), true);
2306       chunkQosData.addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_SEQ, log), count);
2307       if (ex != null)
2308          msg.getQosData().addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_EX, log), ex.getMessage());
2309       return publish(msg);
2310    }
2311    
2312    public PublishReturnQos[] publishStream(InputStream is, MsgKeyData keyData, MsgQosData qosData, int maxBufSize, I_ReplaceContent contentReplacer) throws XmlBlasterException {
2313       String streamId = (getGlobal()).getId() + "-" + (new Timestamp()).getTimestamp();
2314       qosData.addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_ID, log), streamId);
2315       int bufSize = 0;
2316       String tmpKey = Constants.addJmsPrefix(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, log); 
2317       if (qosData.getClientProperty(tmpKey) != null)
2318          bufSize = qosData.getClientProperty(tmpKey).getIntValue();
2319       if (bufSize > maxBufSize || bufSize == 0)
2320          bufSize = maxBufSize;
2321       long count = 0L;
2322       PublishReturnQos pubRetQos = null;
2323       byte[] buf = new byte[bufSize];
2324       try {
2325          while (true) {
2326             buf = new byte[bufSize];
2327             int offset = 0;
2328             int remainingLength = bufSize;
2329             int lengthRead = 0;
2330             while ((lengthRead = is.read(buf, offset, remainingLength)) != -1) {
2331                remainingLength -= lengthRead;
2332                offset += lengthRead;
2333                if (remainingLength == 0)
2334                   break;
2335             }
2336             int length = offset;
2337             // cut the buffer if shorter than maximum buffer size
2338             if (length < buf.length) {
2339                byte[] tmpBuf = buf;
2340                buf = new byte[length];
2341                for (int i=0; i<buf.length;i++)
2342                   buf[i] = tmpBuf[i];
2343             }
2344             
2345             // We do not need to clone the key since it will not change, but the qos must be cloned
2346             MsgQosData chunkQosData = (MsgQosData)qosData.clone();
2347             
2348             if (contentReplacer != null)
2349                buf = contentReplacer.replace(buf, chunkQosData.getClientProperties());
2350             boolean isLastChunk = buf.length < bufSize;
2351             pubRetQos = publishSingleChunk(keyData, chunkQosData, buf, length, isLastChunk, count, null);
2352             count++;
2353             if (length < bufSize)
2354                return new PublishReturnQos[] { pubRetQos };
2355          }            
2356       }
2357       catch (IOException ex) {
2358          if (count > 0)
2359             publishSingleChunk(keyData, qosData, buf, 0, true, count, ex);
2360          throw new XmlBlasterException(getGlobal(), ErrorCode.RESOURCE, "Sending Chunked message", "failed due to an IOException", ex);
2361       }
2362       catch (XmlBlasterException ex) {
2363          if (count > 0)
2364             publishSingleChunk(keyData, qosData, buf, 0, true, count, ex);
2365          throw ex;
2366       }
2367    }
2368    
2369    public Object getUserObject() {
2370       return userObject;
2371    }
2372 
2373    public void setUserObject(Object userObject) {
2374       this.userObject = userObject;
2375    }
2376 }


syntax highlighted by Code2HTML, v. 0.9.1