1 /*------------------------------------------------------------------------------
   2 Name:      XmlBlasterAccess.java
   3 Project:   xmlBlaster.org
   4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
   5 ------------------------------------------------------------------------------*/
   6 package org.xmlBlaster.client;
   7 
   8 import java.io.IOException;
   9 import java.io.InputStream;
  10 import java.util.ArrayList;
  11 import java.util.Map;
  12 import java.util.logging.Level;
  13 import java.util.logging.Logger;
  14 
  15 import org.xmlBlaster.authentication.plugins.I_ClientPlugin;
  16 import org.xmlBlaster.client.key.EraseKey;
  17 import org.xmlBlaster.client.key.GetKey;
  18 import org.xmlBlaster.client.key.PublishKey;
  19 import org.xmlBlaster.client.key.SubscribeKey;
  20 import org.xmlBlaster.client.key.UnSubscribeKey;
  21 import org.xmlBlaster.client.key.UpdateKey;
  22 import org.xmlBlaster.client.protocol.AbstractCallbackExtended;
  23 import org.xmlBlaster.client.protocol.I_CallbackServer;
  24 import org.xmlBlaster.client.qos.ConnectQos;
  25 import org.xmlBlaster.client.qos.ConnectReturnQos;
  26 import org.xmlBlaster.client.qos.DisconnectQos;
  27 import org.xmlBlaster.client.qos.EraseQos;
  28 import org.xmlBlaster.client.qos.EraseReturnQos;
  29 import org.xmlBlaster.client.qos.GetQos;
  30 import org.xmlBlaster.client.qos.PublishQos;
  31 import org.xmlBlaster.client.qos.PublishReturnQos;
  32 import org.xmlBlaster.client.qos.SubscribeQos;
  33 import org.xmlBlaster.client.qos.SubscribeReturnQos;
  34 import org.xmlBlaster.client.qos.UnSubscribeQos;
  35 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
  36 import org.xmlBlaster.client.qos.UpdateQos;
  37 import org.xmlBlaster.client.queuemsg.MsgQueueConnectEntry;
  38 import org.xmlBlaster.client.queuemsg.MsgQueueDisconnectEntry;
  39 import org.xmlBlaster.client.queuemsg.MsgQueueEraseEntry;
  40 import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;
  41 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
  42 import org.xmlBlaster.client.queuemsg.MsgQueueSubscribeEntry;
  43 import org.xmlBlaster.client.queuemsg.MsgQueueUnSubscribeEntry;
  44 import org.xmlBlaster.jms.XBConnectionMetaData;
  45 import org.xmlBlaster.util.FileDumper;
  46 import org.xmlBlaster.util.Global;
  47 import org.xmlBlaster.util.I_ReplaceContent;
  48 import org.xmlBlaster.util.I_Timeout;
  49 import org.xmlBlaster.util.MsgUnit;
  50 import org.xmlBlaster.util.SessionName;
  51 import org.xmlBlaster.util.Timeout;
  52 import org.xmlBlaster.util.Timestamp;
  53 import org.xmlBlaster.util.XmlBlasterException;
  54 import org.xmlBlaster.util.admin.extern.JmxMBeanHandle;
  55 import org.xmlBlaster.util.checkpoint.I_Checkpoint;
  56 import org.xmlBlaster.util.cluster.NodeId;
  57 import org.xmlBlaster.util.context.ContextNode;
  58 import org.xmlBlaster.util.def.Constants;
  59 import org.xmlBlaster.util.def.ErrorCode;
  60 import org.xmlBlaster.util.def.MethodName;
  61 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
  62 import org.xmlBlaster.util.dispatch.DispatchManager;
  63 import org.xmlBlaster.util.dispatch.DispatchStatistic;
  64 import org.xmlBlaster.util.dispatch.I_ConnectionStatusListener;
  65 import org.xmlBlaster.util.dispatch.I_PostSendListener;
  66 import org.xmlBlaster.util.error.I_MsgErrorHandler;
  67 import org.xmlBlaster.util.key.MsgKeyData;
  68 import org.xmlBlaster.util.qos.ClientProperty;
  69 import org.xmlBlaster.util.qos.DisconnectQosData;
  70 import org.xmlBlaster.util.qos.MsgQosData;
  71 import org.xmlBlaster.util.qos.TopicProperty;
  72 import org.xmlBlaster.util.qos.address.CallbackAddress;
  73 import org.xmlBlaster.util.qos.storage.CbQueueProperty;
  74 import org.xmlBlaster.util.qos.storage.ClientQueueProperty;
  75 import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;
  76 import org.xmlBlaster.util.queue.I_Queue;
  77 import org.xmlBlaster.util.queue.StorageId;
  78 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
  79 
  80 /**
  81  * This is the default implementation of the java client side remote access to xmlBlaster.
  82  * <p>
  83  * It hides a client side queue, the client side dispatcher framework for polling
  84  * or pinging the server and some more features.
  85  * </p>
  86  * <p>
  87  * The interface I_CallbackRaw/I_Callback/I_CallbackExtenden are enforced by AbstractCallbackExtended.
  88  * </p>
  89  */
  90 public /*final*/ class XmlBlasterAccess extends AbstractCallbackExtended
  91                    implements I_XmlBlasterAccess, I_ConnectionStatusListener, I_PostSendListener, XmlBlasterAccessMBean
  92 {
  93    private static Logger log = Logger.getLogger(XmlBlasterAccess.class.getName());
  94    private String ME = "XmlBlasterAccess";
  95    private ContextNode contextNode;
  96    /**
  97     * The cluster node id (name) to which we want to connect, needed for nicer logging, typically null
  98     * Can be set manually from outside before connect
  99     */
 100    private String serverNodeId = null;
 101    private ConnectQos connectQos;
 102    /** The return from connect() */
 103    private ConnectReturnQos connectReturnQos;
 104    private long jmxPublicSessionId;
 105    /** Client side queue during connection failure */
 106    private I_Queue clientQueue;
 107    /** The dispatcher framework **/
 108    private DispatchManager dispatchManager;
 109    /** Statistic about send/received messages, can be null if there is a DispatchManager around */
 110    private volatile DispatchStatistic statistic;
 111    /** The object handling message delivery problems */
 112    private I_MsgErrorHandler msgErrorHandler;
 113    /** Client side helper classes to load the authentication xml string */
 114    private I_ClientPlugin secPlgn;
 115    /** The callback server */
 116    private I_CallbackServer cbServer;
 117    /** Handles the registered callback interfaces for given subscriptions. */
 118    private final UpdateDispatcher updateDispatcher;
 119    /** Used to callback the clients default update() method (as given on connect()) */
 120    private I_Callback updateListener;
 121    /** Is not null if the client wishes to be notified about connection state changes in fail safe operation */
 122    private I_ConnectionStateListener connectionListener;
 123    private I_PostSendListener postSendListener;
 124    /** Allow to cache updated messages for simulated synchronous access with get().
 125     * Do behind a get() a subscribe to allow cached synchronous get() access */
 126    private SynchronousCache synchronousCache;
 127    private boolean disconnectInProgress;
 128    private boolean connectInProgress;
 129    private String[] checkPointContext;
 130 
 131    /** this I_XmlBlasterAccess is valid until a 'leaveServer' invocation is done.*/
 132    private boolean isValid = true;
 133 
 134    private boolean firstWarn = true;
 135 
 136    private Timestamp sessionRefreshTimeoutHandle;
 137    /** My JMX registration */
 138    private JmxMBeanHandle mbeanHandle;
 139    /** First call to connect() in millis */
 140    private long startupTime;
 141 
 142    StreamingCallback streamingCb;
 143 
 144    private String storageIdPrefix;
 145 
 146    private FileDumper fileDumper;
 147 
 148    private boolean shutdown = false;
 149    
 150    private Object userObject;
 151 
 152    /**
 153     * Create an xmlBlaster accessor.
 154     * Please don't create directly but use the factory instead:
 155     * <pre>
 156     *   import org.xmlBlaster.util.Global;
 157     *   ...
 158     *   final Global glob = new Global(args);
 159     *   final I_XmlBlasterAccess xmlBlasterAccess = glob.getXmlBlasterAccess();
 160     * </pre>
 161     * @param glob Your environment handle or null to use the default Global.instance()
 162     *        You must use a cloned Global for each XmlBlasterAccess created.
 163     *        engine.Global is not allowed here, only util.Global is supported
 164     * @exception IllegalArgumentException If engine.Global is used as parameter
 165     */
 166    public XmlBlasterAccess(Global glob) {
 167       super((glob==null) ? Global.instance() : glob);
 168       //if (glob.wantsHelp()) {
 169       //   usage();
 170       //}
 171       if (super.glob.getNodeId() != null) {
 172          // it is a engine.Global!
 173          throw new IllegalArgumentException("XmlBlasterAccess can't be created with a engine.Global, please clone a org.xmlBlaster.util.Global to create me");
 174       }
 175       this.updateDispatcher = new UpdateDispatcher(super.glob);
 176    }
 177 
 178    /**
 179     * Create an xmlBlaster accessor.
 180     * Please don't create directly but use the factory instead:
 181     * <pre>
 182     *   final Global glob = new Global(args);
 183     *   final I_XmlBlasterAccess xmlBlasterAccess = glob.getXmlBlasterAccess();
 184     * </pre>
 185     * @param args Your command line arguments
 186     */
 187    public XmlBlasterAccess(String[] args) {
 188       super(new Global(args, true, false));
 189       this.updateDispatcher = new UpdateDispatcher(super.glob);
 190    }
 191 
 192    /**
 193     * @see org.xmlBlaster.client.I_XmlBlasterAccess#registerConnectionListener(I_ConnectionStateListener)
 194     */
 195    public synchronized void registerConnectionListener(I_ConnectionStateListener connectionListener) {
 196       if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Initializing registering connectionListener");
 197       this.connectionListener = connectionListener;
 198    }
 199 
 200    /**
 201     * Register a listener to get notifications when a messages is successfully send from
 202     * the client side tail back queue.
 203     * Max one can be registered, any old one will be overwritten
 204     * @param postSendListener The postSendListener to set.
 205     * @return the old listener or null if no previous was registered
 206     */
 207    public final I_PostSendListener registerPostSendListener(I_PostSendListener postSendListener) {
 208       I_PostSendListener old = this.postSendListener;
 209       this.postSendListener = postSendListener;
 210       return old;
 211    }
 212 
 213    /**
 214     * Called after a messages is send from the client side queue, but not for oneway messages.
 215     * Enforced by I_PostSendListener
 216     * @param msgQueueEntry, includes the returned QoS (e.g. PublisReturnQos)
 217     */
 218    public final void postSend(MsgQueueEntry[] entries) {
 219       for (int i=0; i<entries.length; i++) {
 220          MsgQueueEntry msgQueueEntry = entries[i];
 221          if (msgQueueEntry.getMethodName() == MethodName.CONNECT) {
 222             this.connectReturnQos = (ConnectReturnQos)msgQueueEntry.getReturnObj();
 223             if (this.connectReturnQos != null) {
 224                setContextNodeId(this.connectReturnQos.getServerInstanceId());
 225                // break; Loop to the latest if any
 226             }
 227             else {
 228                //log.warning("Expected connectReturnQos for " + msgQueueEntry.toXml() + " " + Global.getStackTraceAsString(null));
 229                if (log.isLoggable(Level.FINE)) log.fine("Expected connectReturnQos for " + msgQueueEntry.toXml() + " " + Global.getStackTraceAsString(null));
 230             }
 231          }
 232       }
 233       I_PostSendListener l = this.postSendListener;
 234       if (l != null) {
 235          try {
 236             l.postSend(entries);
 237          }
 238          catch (Throwable e) {
 239             e.printStackTrace();
 240          }
 241       }
 242    }
 243    
 244    public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException exception) {
 245       I_PostSendListener l = this.postSendListener;
 246       try {
 247          if (l == null) {
 248             for (int i=0; i<entries.length; i++) {
 249                MsgUnit msgUnit = entries[i].getMsgUnit();
 250                if (msgUnit != null) {
 251                   String fn = this.getFileDumper().dumpMessage(msgUnit.getKeyData(), msgUnit.getContent(),
 252                         msgUnit.getQosData());
 253                   log.severe("Async sending of message failed for message " + msgUnit.getKeyOid() + ", is dumped to "
 254                         + fn + ": " + exception.getMessage());
 255                } else {
 256                   log.severe("Async sending of message failed: " + entries[i].toXml() + ": " + exception.getMessage());
 257                }
 258             }
 259          }
 260          else {
 261             return l.sendingFailed(entries, exception);
 262          }
 263       }
 264       catch (Throwable e) {
 265          e.printStackTrace();
 266          for (int i=0; i<entries.length; i++)
 267             log.severe("Async sending of message failed for message " + entries[i].toXml() +"\nreason is: " + exception.getMessage());
 268       }
 269       return false;
 270    }
 271    
 272    public FileDumper getFileDumper() throws XmlBlasterException {
 273       if (this.fileDumper == null) {
 274          synchronized (this) {
 275             if (this.fileDumper == null) {
 276                this.fileDumper = new FileDumper(this.glob);
 277             }
 278          }
 279       }
 280       return this.fileDumper;
 281    }
 282 
 283 
 284    /**
 285     */
 286    public SynchronousCache createSynchronousCache(int size) {
 287       if (this.synchronousCache != null)
 288          return this.synchronousCache; // Is initialized already
 289       if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Initializing synchronous cache: size=" + size);
 290       this.synchronousCache = new SynchronousCache(glob, size);
 291       log.info(getLogId()+"SynchronousCache has been initialized with size="+size);
 292       return this.synchronousCache;
 293    }
 294 
 295    public void setClientErrorHandler(I_MsgErrorHandler msgErrorHandler) {
 296       this.msgErrorHandler = msgErrorHandler;
 297    }
 298 
 299    public String getConnectionQueueId() {
 300       if (this.clientQueue != null) {
 301          return this.clientQueue.getStorageId().toString();
 302       }
 303       return "";
 304    }
 305 
 306    /**
 307     * The unique name of this session instance.
 308     * @return Never null, for example "/xmlBlaster/node/heron/client/joe/session/-2"
 309     */
 310    public final ContextNode getContextNode() {
 311       return this.contextNode;
 312    }
 313 
 314 
 315 
 316    public ConnectReturnQos connect(ConnectQos qos, I_StreamingCallback streamingUpdateListener, boolean withQueue) throws XmlBlasterException {
 317       if (streamingUpdateListener == null)
 318          throw new XmlBlasterException(this.glob, ErrorCode.USER_ILLEGALARGUMENT, "connect", "the streamingUpdateListener is null, you must provide one");
 319       this.streamingCb = new StreamingCallback(this.glob, streamingUpdateListener, 0, 0, withQueue);
 320       if (withQueue)
 321          registerConnectionListener(this.streamingCb);
 322       return connect(qos, this.streamingCb);
 323    }
 324    
 325    /**
 326     * The storageId must remain the same after a client restart
 327     * 
 328     * @param relating
 329     *           xbType like Constants.RELATING_CLIENT
 330     * @return
 331     */
 332    public StorageId createStorageId(String relating) {
 333       StorageId storageId = null;
 334       if (getStorageIdStr() != null && getStorageIdStr().length() > 0) {
 335          // client code forces a named client side storageId -
 336          // dangerous if the name conflicts with server name in same DB
 337          storageId = new StorageId(glob, serverNodeId, relating, getStorageIdStr());
 338       } else {
 339          if (getPublicSessionId() == 0) {
 340             // having no public sessionId we need to generate a unique
 341             // queue name
 342             storageId = new StorageId(glob, serverNodeId, relating, getId() + System.currentTimeMillis()
 343                   + Global.getCounter());
 344          } else {
 345             SessionName ses = getSessionName();
 346             if (ses != null)
 347                storageId = new StorageId(glob, serverNodeId, relating, ses);
 348             else
 349                storageId = new StorageId(glob, serverNodeId, relating, getId() + System.currentTimeMillis()
 350                      + Global.getCounter());
 351          }
 352       }
 353       return storageId;
 354    }
 355 
 356    /**
 357     * @see org.xmlBlaster.client.I_XmlBlasterAccess#connect(ConnectQos, I_Callback)
 358     */
 359    public ConnectReturnQos connect(ConnectQos qos, I_Callback updateListener) throws XmlBlasterException {
 360       if (!this.isValid)
 361          throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "connect");
 362 
 363       synchronized (this) {
 364 
 365          if (this.startupTime == 0) {
 366             this.startupTime = System.currentTimeMillis();
 367          }
 368 
 369          if (isConnected() || this.connectInProgress) {
 370             String text = "connect() rejected, you are connected already, please check your code";
 371             throw new XmlBlasterException(glob, ErrorCode.USER_CONNECT_MULTIPLE, ME, text);
 372          }
 373 
 374          this.connectInProgress = true;
 375 
 376          try {
 377             this.connectQos = (qos==null) ? new ConnectQos(glob) : qos;
 378 
 379             ClientProperty tmp = this.connectQos.getClientProperty(Constants.UPDATE_BULK_ACK);
 380             if (tmp != null) {
 381                if (tmp.getBooleanValue()) {
 382                   log.info("Setting the flag '" + Constants.UPDATE_BULK_ACK + "' to 'true' since specified in ConnectQos");
 383                   this.updateBulkAck = true;
 384                }
 385             }
 386 
 387 
 388             // We need to set a unique ID for this client so that global.getId() is unique
 389             // which is used e.g. in the JDBC plugin
 390             SessionName sn = getSessionName();
 391             if (sn != null) {
 392                if (sn.isPubSessionIdUser()) {
 393                   this.glob.setId(sn.toString());
 394                }
 395                else {
 396                   this.glob.setId(sn.toString() + System.currentTimeMillis()); // Not secure if two clients start simultaneously
 397                }
 398             }
 399             else {
 400                this.glob.setId(getLoginName() + System.currentTimeMillis()); // Not secure if two clients start simultaneously
 401             }
 402             this.glob.resetInstanceId();
 403             this.connectQos.getData().setInstanceId(this.glob.getInstanceId());
 404 
 405             if (connectQos.getData().getGlobal().isServerSide()) {
 406                String text = "Your ConnectQos.getData() contains a ServerScope instead of a Global instance, this is not allowed";
 407                throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, text);
 408             }
 409 
 410             this.updateListener = updateListener;
 411 
 412             // TODO: This is done by ConnectQos already, isn't it?
 413             initSecuritySettings(this.connectQos.getData().getClientPluginType(),
 414                                  this.connectQos.getData().getClientPluginVersion());
 415 
 416             this.ME = "XmlBlasterAccess-" + getId();
 417             setContextNodeId(getServerNodeId());
 418 
 419             try {
 420                ClientQueueProperty prop = this.connectQos.getClientQueueProperty();
 421                StorageId storageId = createStorageId(Constants.RELATING_CLIENT);
 422                this.clientQueue = glob.getQueuePluginManager().getPlugin(prop.getType(), prop.getVersion(), storageId,
 423                                                       this.connectQos.getClientQueueProperty());
 424                if (this.clientQueue == null) {
 425                   String text = "The client queue plugin is not found with this configuration, please check your connect QoS: " + prop.toXml();
 426                   throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME, text);
 427                }
 428 
 429                if (this.msgErrorHandler == null) {
 430                   this.msgErrorHandler = new ClientErrorHandler(glob, this);
 431                }
 432 
 433                this.dispatchManager = new DispatchManager(glob, this.msgErrorHandler,
 434                                        getSecurityPlugin(), this.clientQueue, this,
 435                                        this.connectQos.getAddresses(), sn);
 436 
 437                getDispatchStatistic(); // Force creation of dispatchStatistic as this syncs on 'this' and could deadlock if don later from a update()
 438 
 439                this.dispatchManager.getDispatchConnectionsHandler().registerPostSendListener(this);
 440 
 441                if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Switching to synchronous delivery mode ...");
 442                this.dispatchManager.trySyncMode(true);
 443 
 444                if (this.updateListener != null) { // Start a default callback server using same protocol
 445                   createDefaultCbServer();
 446                }
 447 
 448                if (this.connectQos.doSendConnect()) {
 449                   // Try to connect to xmlBlaster ...
 450                   sendConnectQos();
 451                }
 452                else {
 453                   log.info(getLogId()+"Initialized client library, but no connect() is send to xmlBlaster, a delegate should do any subscribe if required");
 454                }
 455             }
 456             catch (XmlBlasterException e) {
 457                if (isConnected()) disconnect((DisconnectQos)null);
 458                throw e;
 459             }
 460             catch (Throwable e) {
 461                if (isConnected()) disconnect((DisconnectQos)null);
 462                throw XmlBlasterException.convert(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "Connection failed", e);
 463             }
 464          }
 465          finally {
 466             this.connectInProgress = false;
 467          }
 468       } // synchronized
 469 
 470       if (this.connectQos.getRefreshSession()) {
 471          startSessionRefresher();
 472       }
 473 
 474       if (isAlive()) {
 475          if (this.connectionListener != null) {
 476             this.connectionListener.reachedAlive(ConnectionStateEnum.UNDEF, this);
 477          }
 478          log.info(glob.getReleaseId() + ": Successful " + this.connectQos.getAddress().getType() + " login as " + getId());
 479 
 480          if (this.clientQueue.getNumOfEntries() > 0) {
 481             long num = this.clientQueue.getNumOfEntries();
 482             log.info(getLogId()+"We have " + num + " client side queued tail back messages");
 483             this.dispatchManager.switchToASyncMode();
 484             while (this.clientQueue.getNumOfEntries() > 0) {
 485                try { Thread.sleep(20L); } catch( InterruptedException i) {}
 486             }
 487             log.info((num-this.clientQueue.getNumOfEntries()) + " client side queued tail back messages sent");
 488             this.dispatchManager.switchToSyncMode();
 489          }
 490       }
 491       else {
 492          if (this.connectionListener != null) {
 493             this.connectionListener.reachedPolling(ConnectionStateEnum.UNDEF, this);
 494          }
 495          log.info(glob.getReleaseId() + ": Login request as " + getId() + " is queued");
 496       }
 497 
 498       if (this.connectReturnQos != null) {
 499          setContextNodeId(this.connectReturnQos.getServerInstanceId());
 500       }
 501 
 502       return this.connectReturnQos; // new ConnectReturnQos(glob, "");
 503    }
 504 
 505    /**
 506     * Sends the current connectQos to xmlBlaster and stores the connectReturnQos.
 507     * @throws XmlBlasterException
 508     */
 509    private void sendConnectQos() throws XmlBlasterException {
 510       MsgQueueConnectEntry entry = new MsgQueueConnectEntry(this.glob, this.clientQueue.getStorageId(), this.connectQos.getData());
 511       // Try to connect to xmlBlaster ...
 512       this.connectReturnQos = (ConnectReturnQos)queueMessage(entry);
 513       this.connectReturnQos.getData().setInitialConnectionState(this.dispatchManager.getDispatchConnectionsHandler().getState());
 514    }
 515 
 516    public boolean isConnected() {
 517       if (this.dispatchManager != null) {
 518          return this.connectReturnQos != null && !this.dispatchManager.getDispatchConnectionsHandler().isDead();
 519       }
 520       return this.connectReturnQos != null;
 521    }
 522 
 523    private void startSessionRefresher() {
 524       if (this.connectQos == null) return;
 525       long sessionTimeout = this.connectQos.getSessionQos().getSessionTimeout();
 526       final long MIN = 2000L; // Sessions which live less than 2 seconds are not supported
 527       if (sessionTimeout >= MIN) {
 528          long gap = (sessionTimeout < 60*1000L) ? sessionTimeout/2 : sessionTimeout-30*1000L;
 529          final long refreshTimeout = sessionTimeout - gap;
 530          final Timeout timeout = this.glob.getPingTimer();
 531          this.sessionRefreshTimeoutHandle = timeout.addTimeoutListener(new I_Timeout() {
 532                public void timeout(Object userData) {
 533                   if (isAlive()) {
 534                      if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Refreshing session to not expire");
 535                      try {
 536                         refreshSession();
 537                      }
 538                      catch (XmlBlasterException e) {
 539                         log.warning(getLogId()+"Can't refresh the login session '" + getId() + "': " + e.toString());
 540                      }
 541                   }
 542                   else {
 543                      if (log.isLoggable(Level.FINE)) log.fine(getLogId()+"Can't refresh session as we have no connection");
 544                   }
 545                   try {
 546                      sessionRefreshTimeoutHandle = timeout.addOrRefreshTimeoutListener(this, refreshTimeout, null, sessionRefreshTimeoutHandle) ;
 547                   }
 548                   catch (XmlBlasterException e) {
 549                      log.warning(getLogId()+"Can't refresh the login session '" + getId() + "': " + e.toString());
 550                   }
 551                }
 552             },
 553             refreshTimeout, null);
 554       }
 555       else {
 556          log.warning(getLogId()+"Auto-refreshing session is not supported for session timeouts smaller " + MIN + " seconds");
 557 
 558       }
 559    }
 560 
 561 
 562    /**
 563     * @see I_XmlBlasterAccess#refreshSession()
 564     */
 565    public void refreshSession() throws XmlBlasterException {
 566       GetKey gk = new GetKey(glob, "__refresh");
 567       GetQos gq = new GetQos(glob);
 568       get(gk, gq);
 569    }
 570 
 571    /**
 572     * Extracts address data from ConnectQos (or adds default if missing)
 573     * and instantiate a callback server as specified in ConnectQos
 574     */
 575    private void createDefaultCbServer() throws XmlBlasterException {
 576       CbQueueProperty prop = connectQos.getSessionCbQueueProperty(); // Creates a default property for us if none is available
 577       CallbackAddress addr = prop.getCurrentCallbackAddress(); // may return null
 578       if (addr == null)
 579          addr = new CallbackAddress(glob);
 580 
 581       this.cbServer = initCbServer(getLoginName(), addr);
 582 
 583       addr.setType(this.cbServer.getCbProtocol());
 584       addr.setRawAddress(this.cbServer.getCbAddress());
 585       //addr.setVersion(this.cbServer.getVersion());
 586       //addr.setSecretSessionId(cbSessionId);
 587       prop.setCallbackAddress(addr);
 588 
 589       log.info(getLogId()+"Callback settings: " + prop.getSettings());
 590    }
 591 
 592    /**
 593     * @see I_XmlBlasterAccess#initCbServer(St