1 /*------------------------------------------------------------------------------
   2 Name:      DispatchManager.java
   3 Project:   xmlBlaster.org
   4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
   5 ------------------------------------------------------------------------------*/
   6 package org.xmlBlaster.client.dispatch;
   7 import java.util.ArrayList;
   8 import java.util.HashSet;
   9 import java.util.List;
  10 import java.util.logging.Level;
  11 import java.util.logging.Logger;
  12 
  13 import org.xmlBlaster.authentication.plugins.I_MsgSecurityInterceptor;
  14 import org.xmlBlaster.client.I_XmlBlasterAccess;
  15 import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;
  16 import org.xmlBlaster.util.Global;
  17 import org.xmlBlaster.util.MsgUnit;
  18 import org.xmlBlaster.util.SessionName;
  19 import org.xmlBlaster.util.Timestamp;
  20 import org.xmlBlaster.util.XmlBlasterException;
  21 import org.xmlBlaster.util.def.Constants;
  22 import org.xmlBlaster.util.def.ErrorCode;
  23 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
  24 import org.xmlBlaster.util.dispatch.DispatchConnection;
  25 import org.xmlBlaster.util.dispatch.DispatchConnectionsHandler;
  26 import org.xmlBlaster.util.dispatch.DispatchStatistic;
  27 import org.xmlBlaster.util.dispatch.DispatchWorker;
  28 import org.xmlBlaster.util.dispatch.I_ConnectionStatusListener;
  29 import org.xmlBlaster.util.dispatch.I_DispatchManager;
  30 import org.xmlBlaster.util.dispatch.I_PostSendListener;
  31 import org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor;
  32 import org.xmlBlaster.util.error.I_MsgErrorHandler;
  33 import org.xmlBlaster.util.error.MsgErrorInfo;
  34 import org.xmlBlaster.util.plugin.PluginManagerBase;
  35 import org.xmlBlaster.util.property.PropString;
  36 import org.xmlBlaster.util.qos.address.AddressBase;
  37 import org.xmlBlaster.util.qos.address.CallbackAddress;
  38 import org.xmlBlaster.util.queue.I_Entry;
  39 import org.xmlBlaster.util.queue.I_Queue;
  40 import org.xmlBlaster.util.queue.I_QueueEntry;
  41 import org.xmlBlaster.util.queue.I_QueuePutListener;
  42 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
  43 
  44 /**
  45  * Manages the sending of messages and commands and does error recovery
  46  * further we communicate with the dispatcher plugin if one is configured.
  47  * <p />
  48  * There is one instance of this class per queue and remote connection.
  49  * @author xmlBlaster@marcelruff.info
  50  */
  51 public final class ClientDispatchManager implements I_DispatchManager
  52 {
  53    public final String ME;
  54    private final Global glob;
  55    private static Logger log = Logger.getLogger(ClientDispatchManager.class.getName());
  56    private final I_Queue msgQueue;
  57    private final ClientDispatchConnectionsHandler dispatchConnectionsHandler;
  58    private final I_MsgErrorHandler failureListener;
  59    private final I_MsgSecurityInterceptor securityInterceptor;
  60    private final I_MsgDispatchInterceptor msgInterceptor;
  61    private HashSet connectionStatusListeners;
  62    private final String typeVersion;
  63    /** If > 0 does burst mode */
  64    private long collectTime = -1L;
  65    private long toAliveTime = 0;
  66    private long toPollingTime = 0;
  67 
  68    private boolean dispatchWorkerIsActive = false;
  69 
  70    /** The worker for synchronous invocations */
  71    private DispatchWorker syncDispatchWorker;
  72 
  73    private Timestamp timerKey = null;
  74 
  75    private int notifyCounter = 0;
  76 
  77    private boolean isShutdown = false;
  78    private boolean isSyncMode = false;
  79    private boolean trySyncMode = false; // true: client side queue embedding, false: server side callback queue
  80 
  81    private boolean inAliveTransition = false;
  82    private final Object ALIVE_TRANSITION_MONITOR = new Object();
  83 
  84    private int burstModeMaxEntries = -1;
  85    private long burstModeMaxBytes = -1L;
  86 
  87    /** async delivery is activated only when this flag is 'true'. Used to temporarly inhibit dispatch of messages */
  88    private boolean dispatcherActive = true;
  89    
  90    private boolean shallCallToAliveSync;
  91    private boolean inDispatchManagerCtor;
  92 
  93    private SessionName sessionName;
  94 
  95    /**
  96     * @param msgQueue The message queue which i use (!!! TODO: this changes, we should pass it on every method where needed)
  97     * @param connectionStatusListener The implementation which listens on connectionState events (e.g. XmlBlasterAccess.java), or null
  98     * @param addrArr The addresses i shall connect to
  99     */
 100    public ClientDispatchManager(Global glob, I_MsgErrorHandler failureListener,
 101                           I_MsgSecurityInterceptor securityInterceptor,
 102                           I_Queue msgQueue, I_ConnectionStatusListener connectionStatusListener,
 103                           AddressBase[] addrArr, SessionName sessionName) throws XmlBlasterException {
 104       if (failureListener == null || msgQueue == null)
 105          throw new IllegalArgumentException("DispatchManager failureListener=" + failureListener + " msgQueue=" + msgQueue);
 106       this.inDispatchManagerCtor = true;
 107       this.ME = msgQueue.getStorageId().getId();
 108       this.glob = glob;
 109 
 110       this.sessionName = sessionName;
 111 
 112       if (log.isLoggable(Level.FINE)) log.fine(ME+": Loading DispatchManager ...");
 113 
 114       this.msgQueue = msgQueue;
 115       this.failureListener = failureListener;
 116       this.securityInterceptor = securityInterceptor;
 117       this.dispatchConnectionsHandler = new ClientDispatchConnectionsHandler(glob, this);
 118       this.connectionStatusListeners = new HashSet();
 119       if (connectionStatusListener != null) this.connectionStatusListeners.add(connectionStatusListener);
 120 
 121       initDispatcherActive(addrArr);
 122 
 123       /*
 124        * Check i a plugin is configured ("DispatchPlugin/defaultPlugin")
 125        * If configured, the plugin instance is searched in the Global scope
 126        * and if none is found one is created (see DispatcherPluginManager)
 127        * Default server setting is to use no dispatcher plugin
 128        */
 129       PropString propString = new PropString(PluginManagerBase.NO_PLUGIN_TYPE); // "undef";
 130       if (addrArr != null && addrArr.length > 0) // Check if client wishes a specific plugin
 131          propString.setValue(addrArr[0].getDispatchPlugin());
 132       this.typeVersion = propString.getValue();
 133       this.msgInterceptor = glob.getDispatchPluginManager().getPlugin(this.typeVersion); // usually from cache
 134       if (log.isLoggable(Level.FINE)) log.fine(ME+": DispatchPlugin/defaultPlugin=" + propString.getValue() + " this.msgInterceptor="  + this.msgInterceptor);
 135       if (this.msgInterceptor != null) {
 136          this.msgInterceptor.addDispatchManager(this);
 137          if (log.isLoggable(Level.FINE)) log.fine(ME+": Activated dispatcher plugin '" + this.typeVersion + "'");
 138       }
 139 
 140       this.msgQueue.addPutListener(this); // to get putPre() and putPost() events
 141 
 142       this.dispatchConnectionsHandler.initialize(addrArr);
 143       this.inDispatchManagerCtor = false;
 144    }
 145 
 146    /**
 147     * @return Never null
 148     */
 149    public SessionName getSessionName() {
 150       return this.sessionName;
 151    }
 152 
 153    public boolean isSyncMode() {
 154       return this.isSyncMode;
 155    }
 156 
 157    /**
 158     * Set behavior of dispatch framework.
 159     * @param trySyncMode true: client side queue embedding, false: server side callback queue
 160     * defaults to false
 161     */
 162    public void trySyncMode(boolean trySyncMode) {
 163       this.trySyncMode = trySyncMode;
 164       switchToSyncMode();
 165    }
 166 
 167    /**
 168     * Reconfigure dispatcher with given properties.
 169     *
 170     * Note that only a limited re-configuration is supported
 171     * @param addressArr The new configuration
 172     */
 173    public final void updateProperty(CallbackAddress[] addressArr) throws XmlBlasterException {
 174       initDispatcherActive(addressArr);
 175       this.dispatchConnectionsHandler.initialize(addressArr);
 176    }
 177 
 178    public void finalize() {
 179       try {
 180          removeBurstModeTimer();
 181          //if (log.isLoggable(Level.FINE)) log.fine(ME+": finalize - garbage collected");
 182       }
 183       catch (Throwable e) {
 184          e.printStackTrace();
 185       }
 186       try {
 187          super.finalize();
 188       }
 189       catch (Throwable e) {
 190          e.printStackTrace();
 191       }
 192    }
 193 
 194    public I_Queue getQueue() {
 195       return this.msgQueue;
 196    }
 197 
 198    /*
 199     * Register yourself if you want to be informed about the remote connection status.
 200     * @param connectionStatusListener The implementation which listens on connectionState events (e.g. XmlBlasterAccess.java)
 201     * @return true if we did not already contain the specified element.
 202     */
 203    public synchronized boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener) {
 204       return this.connectionStatusListeners.add(connectionStatusListener);
 205    }
 206 
 207    public synchronized boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener, boolean fireInitial) {
 208       if (connectionStatusListener == null) return true;
 209       boolean ret = this.connectionStatusListeners.add(connectionStatusListener);
 210       if (fireInitial) {
 211          if (isDead())
 212             connectionStatusListener.toDead(this, ConnectionStateEnum.DEAD, null/*"Initial call"*/);
 213          else if (isPolling())
 214             connectionStatusListener.toPolling(this, ConnectionStateEnum.POLLING);
 215          else
 216             connectionStatusListener.toAlive(this, ConnectionStateEnum.ALIVE);
 217       }
 218       return ret;
 219    }
 220 
 221 
 222    /**
 223     * Remove the given listener
 224     * @param connectionStatusListener
 225     * @return true if it was removed
 226     */
 227    public synchronized boolean removeConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener) {
 228       return this.connectionStatusListeners.remove(connectionStatusListener);
 229    }
 230 
 231    public synchronized I_ConnectionStatusListener[] getConnectionStatusListeners() {
 232       if (this.connectionStatusListeners.size() == 0)
 233          return new I_ConnectionStatusListener[0];
 234       return (I_ConnectionStatusListener[])this.connectionStatusListeners.toArray(new I_ConnectionStatusListener[this.connectionStatusListeners.size()]);
 235    }
 236 
 237    /**
 238     * The name in the configuration file for the plugin
 239     * @return e.g. "Priority,1.0"
 240     */
 241    public String getTypeVersion() {
 242       return this.typeVersion;
 243    }
 244 
 245    /**
 246     * @return The import/export encrypt handle or null if created by a SubjectInfo (no session info available)
 247     */
 248    public I_MsgSecurityInterceptor getMsgSecurityInterceptor() {
 249       return this.securityInterceptor;
 250    }
 251 
 252    /**
 253     * @return The handler of all callback plugins, is never null
 254     */
 255    public final DispatchConnectionsHandler getDispatchConnectionsHandler() {
 256       return this.dispatchConnectionsHandler;
 257    }
 258 
 259    /**
 260     * How many messages maximum shall the callback thread take in one bulk out of the
 261     * callback queue and deliver in one bulk.
 262     */
 263    public final int getBurstModeMaxEntries() {
 264       return this.burstModeMaxEntries;
 265    }
 266 
 267    /**
 268     * How many bytes maximum shall the callback thread take in one bulk out of the
 269     * callback queue and deliver in one bulk.
 270     */
 271    public final long getBurstModeMaxBytes() {
 272       return this.burstModeMaxBytes;
 273    }
 274 
 275    /**
 276     * Get timestamp when we went to ALIVE state.
 277     * @return millis timestamp
 278     */
 279    public final long getAliveSinceTime() {
 280       return this.toAliveTime;
 281    }
 282 
 283    /**
 284     * Get timestamp when we went to POLLING state.
 285     * @return millis timestamp
 286     */
 287    public final long getPollingSinceTime() {
 288       return this.toPollingTime;
 289    }
 290 
 291    /**
 292     * Call by DispatchConnectionsHandler on state transition
 293     * NOTE: toAlive is called initially when a protocol plugin is successfully loaded
 294     * but we don't know yet if it ever is able to connect
 295     */
 296    public void toAlive(ConnectionStateEnum oldState) {
 297 
 298       if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to ALIVE");
 299 
 300       // Remember the current collectTime
 301       AddressBase addr = this.dispatchConnectionsHandler.getAliveAddress();
 302       if (addr == null) {
 303          log.severe(ME+": toAlive action has no alive address");
 304          return;
 305       }
 306 
 307       try {
 308          this.inAliveTransition = true;
 309 
 310          if (this.toAliveTime <= this.toPollingTime) {
 311             this.toAliveTime = System.currentTimeMillis();
 312          }
 313 
 314          this.burstModeMaxEntries = addr.getBurstModeMaxEntries();
 315          this.burstModeMaxBytes = addr.getBurstModeMaxBytes();
 316 
 317          synchronized (this.ALIVE_TRANSITION_MONITOR) {
 318             // 1. We allow a client to intercept and for example destroy all entries in the queue
 319             I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();
 320             for (int i=0; i<listeners.length; i++) {
 321                listeners[i].toAlive(this, oldState);
 322             }
 323             // 2. If a dispatch plugin is registered it may do its work
 324             if (this.msgInterceptor != null)
 325                this.msgInterceptor.toAlive(this, oldState);
 326          }
 327       }
 328       finally {
 329          this.inAliveTransition = false;
 330       }
 331 
 332       collectTime = addr.getCollectTime(); // burst mode if > 0L
 333 
 334       // 3. Deliver. Will be delayed if burst mode timer is activated, will switch to sync mode if necessary
 335       activateDispatchWorker();
 336 
 337       if (this.shallCallToAliveSync && !this.inDispatchManagerCtor && this.isSyncMode)
 338          callToAliveSync();
 339    }
 340 
 341    public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
 342       I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();
 343       for (int i=0; i<listeners.length; i++) {
 344          listeners[i].toAlive(this, oldState);
 345       }
 346    }
 347 
 348    /** Call by DispatchConnectionsHandler on state transition */
 349    public void toPolling(ConnectionStateEnum oldState) {
 350       
 351       if (isDead()) {
 352          return;
 353       }
 354 
 355       if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to POLLING");
 356       if (this.toPollingTime <= this.toAliveTime) {
 357          this.toPollingTime = System.currentTimeMillis();
 358       }
 359       switchToASyncMode();
 360 
 361       // 1. We allow a client to intercept and for example destroy all entries in the queue
 362       I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();
 363       for (int i=0; i<listeners.length; i++) {
 364          listeners[i].toPolling(this, oldState);
 365       }
 366 
 367       // 2. If a dispatch plugin is registered it may do its work
 368       if (this.msgInterceptor != null)
 369          this.msgInterceptor.toPolling(this, oldState);
 370    }
 371 
 372    /**
 373     * 
 374     * @param ex
 375     */
 376    public void toDead(XmlBlasterException ex) {
 377       shutdownFomAnyState(ConnectionStateEnum.UNDEF, ex);
 378    }
 379 
 380    /** Call by DispatchConnectionsHandler on state transition */
 381    public void shutdownFomAnyState(ConnectionStateEnum oldState, XmlBlasterException ex) {
 382       if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to DEAD");
 383       if (oldState == ConnectionStateEnum.DEAD) return;
 384       if (this.isShutdown) return;
 385       if (ex != null) { // Very dangerous code! The caller ends up with changed Exception type
 386          ex.changeErrorCode(ErrorCode.COMMUNICATION_NOCONNECTION_DEAD);
 387       }
 388       else {
 389          ex = new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME,
 390                   "Switch from " + oldState + " to DEAD, reason is not known");
 391       }
 392 
 393       // 1. We allow a client to intercept and for example destroy all entries in the queue
 394       I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();
 395       for (int i=0; i<listeners.length; i++) {
 396          try {
 397             // Only pass original ex.getMessage() - not the changed errorCode
 398             listeners[i].toDead(this, oldState, ex);
 399          }
 400          catch (Throwable e) {
 401             e.printStackTrace();
 402          }
 403       }
 404 
 405       // 2. If a dispatch plugin is registered it may do its work
 406       if (this.msgInterceptor != null)
 407          this.msgInterceptor.toDead(this, oldState, ex);
 408 
 409       if (oldState != ConnectionStateEnum.UNDEF)
 410          givingUpDelivery(ex);
 411    }
 412    
 413    private void givingUpDelivery(XmlBlasterException ex) {
 414       if (log.isLoggable(Level.FINE)) log.fine(ME+": Entering givingUpDelivery(), state is " + this.dispatchConnectionsHandler.getState());
 415       removeBurstModeTimer();
 416 
 417       boolean userThread = this.dispatchConnectionsHandler.isUserThread();
 418       if (!userThread) { // If the client user thread it will receive the exception and handle it self
 419          // The error handler flushed the queue and does error handling with them
 420          getMsgErrorHandler().handleError(new MsgErrorInfo(glob, (MsgQueueEntry)null, this, ex));
 421       }
 422       
 423       shutdown();
 424    }
 425    
 426    public void postSendNotification(MsgQueueEntry entry) {
 427       MsgQueueEntry[] entries = new MsgQueueEntry[] { entry };
 428       postSendNotification(entries);
 429    }
 430    
 431    public void postSendNotification(MsgQueueEntry[] entries) {
 432       I_PostSendListener postSendListener = this.dispatchConnectionsHandler.getPostSendListener();
 433       if (postSendListener != null) {
 434          try {
 435             postSendListener.postSend(entries);
 436          }
 437          catch (Throwable e) {
 438             e.printStackTrace();
 439             log.warning("postSendListener.postSend() exception: " + e.toString());
 440          }
 441       }
 442    }
 443    
 444    /**
 445     * Notify I_PostSendListener about problem. 
 446     * <p>
 447     * Typically XmlBlasterAccess is notified when message came asynchronously from queue
 448     *  
 449     * @param entryList
 450     * @param ex
 451     * @return true if processed
 452     * @see I_PostSendListener#postSend(MsgQueueEntry) for explanation
 453     */
 454    public boolean sendingFailedNotification(MsgQueueEntry[] entries, XmlBlasterException ex) {
 455       I_PostSendListener postSendListener = this.dispatchConnectionsHandler.getPostSendListener();
 456       if (postSendListener == null)
 457          return false;
 458       try {
 459          return postSendListener.sendingFailed(entries, ex);
 460       }
 461       catch (Throwable e) {
 462          e.printStackTrace();
 463          log.warning("postSendListener.sendingFailed() exception: " + e.toString());
 464          return false;
 465       }
 466    }
 467 
 468    /**
 469     * Called by DispatchWorker if an Exception occured in sync mode
 470     * Only on client side
 471     */
 472    public void handleSyncWorkerException(List<I_Entry> entryList, Throwable throwable) throws XmlBlasterException {
 473 
 474       if (log.isLoggable(Level.FINER)) log.finer(ME+": Sync delivery failed connection state is " + this.dispatchConnectionsHandler.getState().toString() + ": " + throwable.toString());
 475 
 476       XmlBlasterException xmlBlasterException = XmlBlasterException.convert(glob,ME,null,throwable);
 477 
 478       if (isDead()) throw xmlBlasterException;
 479 
 480       if (xmlBlasterException.isUser()) {
 481          // Exception from remote client from update(), pass it to error handler and carry on ...?
 482          // A PublishPlugin could throw it
 483          MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
 484          getMsgErrorHandler().handleErrorSync(new MsgErrorInfo(glob, entries, this, xmlBlasterException));
 485          return;
 486       }
 487       else if (xmlBlasterException.isCommunication()) {
 488 
 489          if (this.msgInterceptor != null && isPolling()) { // If we have a plugin it shall handle it
 490             try {
 491                entryList = this.msgInterceptor.handleNextMessages(this, entryList);
 492                if (entryList != null && entryList.size() > 0) {
 493                   MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
 494                   getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, xmlBlasterException));
 495                }
 496             }
 497             catch (XmlBlasterException xmlBlasterException2) {
 498                internalError(xmlBlasterException2);
 499             }
 500             if (entryList != null && entryList.size() > 0) {
 501                MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
 502                getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, xmlBlasterException));
 503             }
 504             return;
 505          }
 506 
 507          // Exception from connection to remote client (e.g. from Corba layer)
 508          // DispatchManager handles this
 509          // Error handling in sync mode
 510          // 1. throwExceptionBackToPusher
 511          // 2. Switch to async mode and collect message (wait on better times)
 512          // 3. If we have serious problems (programming exceptions or isDead()) throw exception back
 513          // 4. Pass exception to an error handler plugin
 514          switchToASyncMode();
 515 
 516          // Simulate return values, and manipulate missing informations into entries ...
 517          I_QueueEntry[] entries = (I_QueueEntry[])entryList.toArray(new I_QueueEntry[entryList.size()]);
 518          getDispatchConnectionsHandler().createFakedReturnObjects(entries, Constants.STATE_OK, Constants.INFO_QUEUED);
 519          msgQueue.put(entries, I_Queue.IGNORE_PUT_INTERCEPTOR);
 520 
 521          if (log.isLoggable(Level.FINE)) log.fine(ME+": Delivery failed, pushed " + entries.length + " entries into tail back queue");
 522       }
 523       else {
 524          if (log.isLoggable(Level.FINE)) log.fine(ME+": Invocation failed: " + xmlBlasterException.getMessage());
 525          throw xmlBlasterException;
 526       }
 527    }
 528 
 529    /**
 530     * Messages are successfully sent, remove them now from queue (sort of a commit()):
 531     * We remove filtered/destroyed messages as well (which doen't show up in entryListChecked)
 532     * @param postSendNotify TODO
 533     */
 534    public void removeFromQueue(MsgQueueEntry[] entries, boolean postSendNotify) throws XmlBlasterException {
 535       I_MsgDispatchInterceptor msgInterceptor = getMsgDispatchInterceptor();
 536       MsgUnit[] msgUnits = null;
 537       if (msgInterceptor != null) { // we need to do this before removal since the msgUnits are weak references and would be deleted by gc
 538          msgUnits = new MsgUnit[entries.length];
 539          for (int i=0; i < msgUnits.length; i++) {
 540             msgUnits[i] = entries[i].getMsgUnit();
 541          }
 542       }
 543       this.msgQueue.removeRandom(entries);
 544       /*(currently only done in sync invocation)
 545       ArrayList defaultEntries = sendAsyncResponseEvent(entryList);
 546       if (defaultEntries.size() > 0) {
 547          MsgQueueEntry[] entries = (MsgQueueEntry[])defaultEntries.toArray(new MsgQueueEntry[defaultEntries.size()]);
 548          this.msgQueue.removeRandom(entries);
 549       }
 550       */
 551       
 552       if (postSendNotify)
 553          postSendNotification(entries);
 554       
 555       if (msgInterceptor != null) {
 556          msgInterceptor.postHandleNextMessages(this, msgUnits);
 557       }
 558       
 559       if (log.isLoggable(Level.FINE)) log.fine("Commit of successful sending of " +
 560             entries.length + " messages done, current queue size is " +
 561             this.msgQueue.getNumOfEntries() + " '" + entries[0].getLogId() + "'");
 562    }
 563 
 564    /**
 565     * Called by DispatchWorker if an Exception occurred in async mode. 
 566     * @throws XmlBlasterException should never happen but is possible during removing entries from queue
 567     */
 568    public void handleWorkerException(List<I_Entry> entryList, Throwable throwable) throws XmlBlasterException {
 569       // Note: The DispatchManager is notified about connection problems directly by its DispatchConnectionsHandler
 570       //       we don't need to take care of ErrorCode.COMMUNICATION*
 571       if (log.isLoggable(Level.FINER)) log.finer(ME+": Async delivery failed connection state is " + this.dispatchConnectionsHandler.getState().toString() + ": " + throwable.toString());
 572       //Thread.currentThread().dumpStack();
 573       if (entryList == null) {
 574          if (!this.isShutdown)
 575             log.warning(ME+": Didn't expect null entryList in handleWorkerException() for throwable " + throwable.getMessage() + toXml(""));
 576          return;
 577       }
 578 
 579       getDispatchStatistic().setLastDeliveryException(throwable.toString());
 580       getDispatchStatistic().incrNumDeliveryExceptions(1);
 581 
 582       if (throwable instanceof XmlBlasterException) {
 583          XmlBlasterException ex = (XmlBlasterException)throwable;
 584          if (log.isLoggable(Level.FINE)) log.fine(ME+": Invocation or callback failed: " + ex.getMessage());
 585          if (ex.isUser()) {
 586             // Exception from remote client from update(), pass it to error handler and carry on ...
 587             MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
 588             boolean isHandled = sendingFailedNotification(entries, ex);
 589             if (isHandled)
 590                removeFromQueue(entries, false);
 591             else
 592                getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex));
 593          }
 594          else if (ex.isCommunication()) {
 595 
 596             if (this.msgInterceptor != null) { // If we have a plugin it shall handle it
 597                if (isPolling()) { // is this code really invoked ? Note of Michele Laghi on 2007-12-19
 598                   try {
 599                      entryList = this.msgInterceptor.handleNextMessages(this, entryList);
 600                      if (entryList != null && entryList.size() > 0) {
 601                         MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
 602                         getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex));
 603                      }
 604                   }
 605                   catch (XmlBlasterException ex2) {
 606                      internalError(ex2);
 607                   }
 608                   if (entryList != null && entryList.size() > 0) {
 609                      MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
 610                      getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex));
 611                   }
 612                }
 613                if (msgInterceptor != null) { // we want the exception notification at least
 614                   msgInterceptor.onDispatchWorkerException(this, ex);
 615                }
 616             }
 617 
 618             // Exception from connection to remote client (e.g. from Corba layer)
 619             // DispatchManager handles this
 620          }
 621          else {
 622             //log.severe(ME+": Callback failed: " + ex.toString());
 623             //ex.printStackTrace();
 624             MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
 625             boolean isHandled = sendingFailedNotification(entries, ex);
 626             if (isHandled)
 627                removeFromQueue(entries, false);
 628             else
 629                internalError(ex);
 630          }
 631       }
 632       else {
 633          //log.severe(ME+": Callback failed: " + throwable.toString());
 634          //throwable.printStackTrace();
 635          XmlBlasterException ex = new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "", throwable);
 636          // sendingFailedNotification() not called as the msgs remain in queue until problem is resolved by admin
 637          internalError(ex);
 638       }
 639    }
 640 
 641    public I_MsgErrorHandler getMsgErrorHandler() {
 642       return this.failureListener;
 643    }
 644 
 645    /**
 646     * We register a QueuePutListener and all put() into the queue are
 647     * intercepted - our put() is called instead.
 648     * We then deliver this QueueEntry directly to the remote
 649     * connection and return synchronously the returned value or the
 650     * Exception if one is thrown.
 651     */
 652    public void switchToSyncMode() {
 653       if (this.isSyncMode) return;
 654 
 655       synchronized (this) {
 656          if (this.isSyncMode) return;
 657          if (this.syncDispatchWorker == null) this.syncDispatchWorker = new DispatchWorker(glob, this);
 658 
 659          this.isSyncMode = true;
 660 
 661          if (this.timerKey != null)
 662             log.severe(ME+": Burst mode timer was activated and we switched to synchronous delivery" +
 663                           " - handling of this situation is not coded yet");
 664          removeBurstModeTimer();
 665 
 666          boolean isAlive = isAlive();
 667          log.info(ME+": Switched to synchronous message delivery, inAliveTransition=" + this.inAliveTransition + " isAlive=" + isAlive + " trySyncMode=" + this.trySyncMode);
 668          if (isAlive) { // For FailSafePing
 669             if (this.inAliveTransition) { // For FailSafeAsync
 670                this.shallCallToAliveSync = true;
 671             }
 672             else {
 673                callToAliveSync();
 674             }
 675          }
 676       }
 677    }
 678       
 679    private void callToAliveSync() {
 680       this.shallCallToAliveSync = false;
 681       I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();
 682       for (int i=0; i<listeners.length; i++)
 683          listeners[i].toAliveSync(this, ConnectionStateEnum.ALIVE);
 684    }
 685 
 686    /**
 687     * Switch back to asynchronous mode.
 688     * Our thread pool will take the messages out of the queue
 689     * and deliver them in asynchronous mode.
 690     */
 691    public void switchToASyncMode() {
 692       if (!this.isSyncMode) return;
 693 
 694       synchronized (this) {
 695          if (!this.isSyncMode) return;
 696          //this.msgQueue.removePutListener(this);
 697          this.isSyncMode = false;
 698          activateDispatchWorker(); // just in case there are some messages pending in the queue
 699          log.info(ME+": Switched to asynchronous message delivery");
 700       }
 701    }
 702 
 703    /**
 704     * @see I_QueuePutListener#putPre(I_QueueEntry)
 705     */
 706    public boolean putPre(I_QueueEntry queueEntry) throws XmlBlasterException {
 707       //I_QueueEntry[] queueEntries = new I_QueueEntry[1];
 708       //queueEntries[0] = queueEntry;
 709       return putPre(new I_QueueEntry[] { queueEntry });
 710    }
 711 
 712    /**
 713     * @see #putPre(I_QueueEntry)
 714     * @see I_QueuePutListener#putPre(I_QueueEntry[])
 715     */
 716    public boolean putPre(I_QueueEntry[] queueEntries) throws XmlBlasterException {
 717       if (!this.isSyncMode) {
 718         /*
 719          for (int i=0; i < queueEntries.length; i++) {
 720             if (queueEntries[i] instanceof MsgQueueEntry) {
 721                MsgQueueEntry msgQueueEntry = (MsgQueueEntry)queueEntries[i];
 722                if (MethodName.SUBSCRIBE == msgQueueEntry.getMethodName()) {
 723                   if (getSessionName().getPublicSessionId() < 1) {
 724                      // we should never allow a subscription without a positive sessionId if the 
 725                      // server is not accessible
 726                      throw new XmlBlasterException(glob, ErrorCode.RESOURCE_TEMPORARY_UNAVAILABLE, ME,
 727                            "Manager: The Subscription for '" + getSessionName().toString() + "' failed since the server is currently not available");
 728                   }
 729                }
 730             }
 731          }
 732          */
 733          if (this.inAliveTransition) {
 734             // Do not allow other threads to put messages to queue during transition to alive
 735             synchronized (ALIVE_TRANSITION_MONITOR) {
 736                // don't allow
 737             }
 738          }
 739          return true; // Add entry to queue
 740       }
 741 
 742       if (log.isLoggable(Level.FINE)) log.fine(ME+": putPre() - Got " + queueEntries.length + " QueueEntries to deliver synchronously ...");
 743       ArrayList entryList = new ArrayList(queueEntries.length);
 744       for (int ii=0; ii<queueEntries.length; ii++) {
 745          if (this.trySyncMode && !this.isSyncMode && queueEntries[ii] instanceof MsgQueueGetEntry) { // this.trySyncMode === isClientSide
 746             throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "You can't call get() in asynchronous mode (gets can't be queued because we don't know its return value)");
 747          }
 748          entryList.add(queueEntries[ii]);
 749       }
 750       this.syncDispatchWorker.run(entryList);
 751       return false;
 752    }
 753 
 754    /**
 755     * @see I_QueuePutListener#putPost(I_QueueEntry)
 756     */
 757    public void putPost(I_QueueEntry queueEntry) throws XmlBlasterException {
 758       if (!this.isSyncMode) {
 759          if (this.dispatcherActive) notifyAboutNewEntry();
 760          if (((MsgQueueEntry)queueEntry).wantReturnObj()) {
 761             // Simulate return values, and manipulate missing informations into entries ...
 762             I_QueueEntry[] entries = new I_QueueEntry[] { queueEntry };
 763             getDispatchConnectionsHandler().createFakedReturnObjects(entries, Constants.STATE_OK, Constants.INFO_QUEUED);
 764          }
 765       }
 766    }
 767 
 768    /**
 769     * @see #putPost(I_QueueEntry)
 770     * @see I_QueuePutListener#putPost(I_QueueEntry[])
 771     */
 772    public void putPost(I_QueueEntry[] queueEntries) throws XmlBlasterException {
 773       if (!this.isSyncMode) {
 774          if (this.dispatcherActive) notifyAboutNewEntry();
 775          if (queueEntries.length > 0 && ((MsgQueueEntry)queueEntries[0]).wantReturnObj()) {
 776             // Simulate return values, and manipulate missing informations into entries ...
 777             getDispatchConnectionsHandler().createFakedReturnObjects(queueEntries, Constants.STATE_OK, Constants.INFO_QUEUED);
 778          }
 779       }
 780    }
 781 
 782    /**
 783     * Here we prepare messages which are coming directly from the queue.
 784     * <ol>
 785     *   <li>We eliminate destroyed messages</li>
 786     *   <li>We make a shallow copy of the message.
 787     *       We need to do this, out messages are references directly into the queue.
 788     *       The delivery framework is later changing the QoS
 789     *       and plugins may change the content - and this should not modify the queue entries</li>
 790     * </ol>
 791     */
 792    public ArrayList prepareMsgsFromQueue(List<I_Entry> entryList) {
 793 
 794       if (entryList == null || entryList.size() < 1) {
 795          if (log.isLoggable(Level.FINE)) log.fine(ME+": Got zero messages from queue, expected at least one, can happen if client disconnected in the mean time: " + toXml(""));
 796          return null;
 797       }
 798       return prepareMsgsFromQueue(ME, log, this.msgQueue, entryList);
 799    }
 800 
 801    public static ArrayList prepareMsgsFromQueue(String logId, Logger log, I_Queue queue, List<I_Entry> entryList) {
 802       // Remove all expired messages and do a shallow copy
 803       int size = entryList.size();
 804       ArrayList result = new ArrayList(size);
 805       for (int ii=0; ii<size; ii++) {
 806          MsgQueueEntry entry = (MsgQueueEntry)entryList.get(ii);
 807          // Take care to remove the filtered away messages from the queue as well
 808          if (entry.isDestroyed()) {
 809             log.info(logId+": Message " + entry.getLogId() + " is destroyed, ignoring it");
 810             if (log.isLoggable(Level.FINE)) log.fine("Message " + entry.getLogId() + " is destroyed, ignoring it: " + entry.toXml());
 811             try {
 812                queue.removeRandom(entry); // Probably change to use [] for better performance
 813             }
 814             catch (Throwable e) {
 815                log.severe(logId+": Internal error when removing expired message " + entry.getLogId() + " from queue, no recovery implemented, we continue: " + e.toString());
 816             }
 817             continue;
 818          }
 819          result.add(entry.clone()); // expired messages are sent as well
 820       }
 821       return result;
 822    }
 823 
 824    /**
 825     * When somebody puts a new entry into the queue, we want to be
 826     * notified about this after the entry is fed.
 827     * <p>
 828     * Called by I_Queue.putPost()
 829     */
 830    public void notifyAboutNewEntry() {
 831       if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering notifyAboutNewEntry("+this.notifyCounter+")");
 832       this.notifyCounter++;
 833       //activateDispatchWorker();
 834 
 835       if (checkSending(true) == false)
 836          return;
 837 
 838       if (useBurstModeTimer() == true)
 839          return;
 840 
 841       startWorkerThread(false);
 842    }
 843 
 844    /**
 845     * Counts how often a new entry was added since the current worker thread was started.
 846     */
 847    public int getNotifyCounter() {
 848       return this.notifyCounter;
 849    }
 850 
 851    /**
 852     * Give the callback worker thread a kick to deliver the messages.
 853     * Throws no exception.
 854     */
 855    private void activateDispatchWorker() {
 856 
 857       if (checkSending(false) == false)
 858          return;
 859 
 860       if (useBurstModeTimer() == true)
 861          return;
 862 
 863       startWorkerThread(false);
 864    }
 865 
 866    /**
 867     * @return true if a burst mode timer was activated
 868     */
 869    private boolean useBurstModeTimer() {
 870       if (collectTime <= 0L) return false;
 871 
 872       // Messages are sent delayed on timeout (burst mode)
 873 
 874       if (log.isLoggable(Level.FINE)) log.fine(ME+": Executing useBurstModeTimer() collectTime=" + collectTime + " dispatchWorkerIsActive=" + dispatchWorkerIsActive);
 875       synchronized (this) {
 876          if (this.isShutdown) return false;
 877          if (this.timerKey == null) {
 878             if (log.isLoggable(Level.FINE)) log.fine(ME+": Starting burstMode timer with " + collectTime + " msec");
 879             this.timerKey = this.glob.getBurstModeTimer().addTimeoutListener(this, collectTime, null);
 880          }
 881       }
 882       return true;
 883    }
 884 
 885    /**
 886     * Remove the burst mode timer
 887     */
 888    private void removeBurstModeTimer() {
 889       synchronized (this) {
 890          if (this.timerKey != null) {
 891             this.glob.getBurstModeTimer().removeTimeoutListener(timerKey);
 892             this.timerKey = null;
 893          }
 894       }
 895    }
 896 
 897    /**
 898     * @param fromTimeout for logging only
 899     */
 900    private void startWorkerThread(boolean fromTimeout) {
 901       if (this.dispatchWorkerIsActive == false) {
 902          synchronized (this) {
 903             if (this.isShutdown) {
 904                if (log.isLoggable(Level.FINE)) log.fine(ME+": startWorkerThread() failed, we are shutdown: " + toXml(""));
 905                return;
 906             }
 907             if (this.dispatchWorkerIsActive == false) { // send message directly
 908                this.dispatchWorkerIsActive = true;
 909                this.notifyCounter = 0;
 910                try {
 911                   this.glob.getDispatchWorkerPool().execute(new DispatchWorker(glob, this));
 912                }
 913                catch (Throwable e) {
 914                   this.dispatchWorkerIsActive = false;
 915                   log.severe(ME+": Unexpected error occurred: " + e.toString());
 916                   e.printStackTrace();
 917                }
 918             }
 919          }
 920          return;
 921       }
 922 
 923       if (fromTimeout) {
 924          if (log.isLoggable(Level.FINE)) log.fine(ME+": Burst mode timeout occurred, last callback worker thread is not finished - we do nothing (the worker thread will give us a kick)");
 925       }
 926       else {
 927          if (log.isLoggable(Level.FINE)) log.fine(ME+": Last callback worker thread is not finished - we do nothing (the worker thread will give us a kick)");
 928       }
 929    }
 930 
 931    public boolean isDead() {
 932       return this.dispatchConnectionsHandler.isDead();
 933    }
 934 
 935    public boolean isPolling() {
 936       return this.dispatchConnectionsHandler.isPolling();
 937    }
 938 
 939    public boolean isAlive() {
 940       return this.dispatchConnectionsHandler.isAlive();
 941    }
 942 
 943    /**
 944     * Can be called when client connection is lost (NOT the callback connection).
 945     * Currently only detected by the SOCKET protocol plugin.
 946     * Others can only detect lost clients with their callback protocol pings
 947     */
 948    public void lostClientConnection() {
 949       log.warning(ME+": Lost client connection");
 950       // If SOCKET: the cb connection is lost as well and we can go to polling mode
 951       pingCallbackServer(false, true);
 952    }
 953 
 954    public boolean pingCallbackServer(boolean sync, boolean connectionIsDown) {
 955       DispatchConnection dispatchConnection = this.dispatchConnectionsHandler.getCurrentDispatchConnection();
 956       if (dispatchConnection != null) {
 957          dispatchConnection.setConnectionWasDown(connectionIsDown);
 958          if (sync) {
 959             dispatchConnection.timeout(null); // force a ping
 960          }
 961          else {
 962             // force a ping via another thread
 963             this.glob.getPingTimer().addTimeoutListener(dispatchConnection, 0L, null);
 964          }
 965          return true;
 966       }
 967       return false;
 968    }
 969 
 970    /**
 971     * @param isPublisherThread We take care that the publisher thread, coming through putPost()
 972     *        does never too much work to return fast enough and avoid possible dead locks.
 973     * @return true is status is OK and we can try to send a message
 974     */
 975    private boolean checkSending(boolean isPublisherThread) {
 976       if (this.isShutdown) {
 977          if (log.isLoggable(Level.FINE)) log.fine(ME+": The dispatcher is shutdown, can't activate callback worker thread" + toXml(""));
 978          return false; // assert
 979       }
 980 
 981       if (this.isSyncMode) {
 982          return false;
 983       }
 984 
 985       if (!this.dispatcherActive) {
 986          return false;
 987       }
 988 
 989       if (msgQueue.isShutdown() && !isPublisherThread) { // assert
 990          if (log.isLoggable(Level.FINE)) log.fine(ME+": The queue is shutdown, can't activate callback worker thread.");
 991          // e.g. client has disconnected on the mean time.
 992          //Thread.currentThread().dumpStack();
 993          shutdown();
 994          return false;
 995       }
 996 
 997       if (this.dispatchConnectionsHandler.isUndef()) {
 998          if (log.isLoggable(Level.FINE)) log.fine(ME+": Not connected yet, state is UNDEF");
 999          return false;
1000       }
1001 
1002       if (this.dispatchConnectionsHandler.isDead() && !isPublisherThread) {
1003          String text = "No recoverable remote connection available, giving up queue " + msgQueue.getStorageId() + ".";
1004          if (log.isLoggable(Level.FINE)) log.fine(ME+": "+text);
1005          givingUpDelivery(new XmlBlasterException(glob,ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, text));
1006          return false;
1007       }
1008 
1009       if (msgQueue.getNumOfEntries() == 0L) {
1010          return false;
1011       }
1012 
1013       if (this.msgInterceptor != null) {
1014          if (this.msgInterceptor.doActivate(this) == false) {
1015             if (log.isLoggable(Level.FINE)) log.fine(ME+": this.msgInterceptor.doActivate==false");
1016             return false; // A plugin told us to suppress sending the message
1017          }
1018          return true;
1019       }
1020 
1021       /*
1022        * The msgInterceptor plugin needs to have a chance to take care of this even in polling mode
1023        */
1024       if (this.dispatchConnectionsHandler.isPolling()) {
1025          if (log.isLoggable(Level.FINE)) log.fine(ME+": Can't send message as connection is lost and we are polling");
1026          return false;
1027       }
1028 
1029       return true;
1030    }
1031 
1032    /**
1033     * We are notified about the burst mode timeout through this method.
1034     * @param userData You get bounced back your userData which you passed
1035     *                 with Timeout.addTimeoutListener()
1036     */
1037    public void timeout(Object userData) {
1038       this.timerKey = null;
1039       if (log.isLoggable(Level.FINE)) log.fine(ME+": Burst mode timeout occurred, queue entries=" + msgQueue.getNumOfEntries() + ", starting callback worker thread ...");
1040       startWorkerThread(true);
1041    }
1042 
1043 
1044    /**
1045     * @return The interceptor plugin if available, otherwise null
1046     */
1047    public I_MsgDispatchInterceptor getMsgDispatchInterceptor() {
1048       return this.msgInterceptor;
1049    }
1050 
1051    /**
1052     * Set new callback addresses, typically after a session login/logout
1053     */
1054    public void setAddresses(AddressBase[] addr) throws XmlBlasterException {
1055       this.dispatchConnectionsHandler.initialize(addr);
1056    }
1057 
1058    /**
1059     * Switch on/off the sending of messages.
1060     */
1061    private void initDispatcherActive(AddressBase[] addrArr) {
1062       if (addrArr != null) {
1063          for (int ii=0; ii<addrArr.length; ii++) { // TODO: How to handle setting of multiple addresses??
1064             this.dispatcherActive = addrArr[ii].isDispatcherActive();
1065          }
1066       }
1067    }
1068 
1069    /**
1070     * The worker notifies us that it is finished, if messages are available
1071     * it is triggered again.
1072     */
1073    public void setDispatchWorkerIsActive(boolean val) {
1074       this.dispatchWorkerIsActive = val;
1075       if (val == false) {
1076          if (this.isShutdown) {
1077             if (log.isLoggable(Level.FINE)) log.fine(ME+": setDispatchWorkerIsActive(" + val + ") failed, we are shutdown: " + toXml(""));
1078             return;
1079          }
1080 
1081          if (msgQueue.getNumOfEntries() > 0) {
1082             if (log.isLoggable(Level.FINE)) log.fine(ME+": Finished callback job. Giving a kick to send the remaining " + msgQueue.getNumOfEntries() + " messages.");
1083             try {
1084                activateDispatchWorker();
1085             }
1086             catch(Throwable e) {
1087                log.severe(ME+": "+e.toString()); e.printStackTrace(); // Assure the queue is flushed with another worker
1088             }
1089          }
1090          else {
1091             if (this.trySyncMode && !this.isSyncMode) {
1092                switchToSyncMode();
1093             }
1094          }
1095       }
1096    }
1097 
1098    /**
1099     * Called locally and from TopicHandler when internal error (Throwable) occurred to avoid infinite looping
1100     */
1101    public void internalError(Throwable throwable) {
1102       givingUpDelivery((throwable instanceof XmlBlasterException) ? (XmlBlasterException)throwable :
1103                        new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "", throwable));
1104       log.severe(ME+": PANIC: Internal error, doing shutdown: " + throwable.getMessage());
1105       shutdown();
1106    }
1107 
1108    /**
1109     * @return A container holding some statistical delivery information
1110     */
1111    public DispatchStatistic getDispatchStatistic() {
1112       return this.dispatchConnectionsHandler.getDispatchStatistic();
1113    }
1114 
1115    public boolean isShutdown() {
1116       return this.isShutdown;
1117    }
1118 
1119    /**
1120     * Stop all callback drivers of this client.
1121     * Possibly invoked twice (givingUpDelivery() calls it indirectly as well)
1122     * We don't shutdown the corresponding queue.
1123     */
1124    public void shutdown() {
1125       if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering shutdown ...");
1126       if (this.isShutdown) return;
1127       synchronized (this) {
1128          if (this.isShutdown) return;
1129          this.isShutdown = true;
1130 
1131          this.msgQueue.removePutListener(this);
1132 
1133          // remove all ConnectionStatusListeners
1134          this.connectionStatusListeners.clear();
1135 
1136          removeBurstModeTimer();
1137 
1138          // NOTE: We would need to remove the 'final' qualifier to be able to set to null
1139 
1140          if (this.msgInterceptor != null) {
1141             try {
1142                this.msgInterceptor.shutdown(this);
1143             }
1144             catch (XmlBlasterException e) {
1145                log.warning(ME+": Ignoring problems during shutdown of plugin: " + e.getMessage());
1146             }
1147             //this.msgInterceptor = null;
1148          }
1149          if (this.dispatchConnectionsHandler != null) {
1150             this.dispatchConnectionsHandler.shutdown();
1151             //this.dispatchConnectionsHandler = null;
1152          }
1153          //this.msgQueue = null;
1154          //this.failureListener = null;
1155          //this.securityInterceptor = null;
1156 
1157          //if (this.dispatchWorkerPool != null) {
1158          //   this.dispatchWorkerPool.shutdown(); NO: not here, is the scope and duty of Global
1159          //   this.dispatchWorkerPool = null;
1160          //}
1161 
1162          if (this.syncDispatchWorker != null)
1163             this.syncDispatchWorker.shutdown();
1164       }
1165    }
1166 
1167    /**
1168     * For logging
1169     */
1170    public String getId() {
1171       return this.msgQueue.getStorageId().getId();
1172    }
1173 
1174    /**
1175     * Dump state of this object into a XML ASCII string.
1176     * <br>
1177     * @param extraOffset indenting of tags for nice output
1178     * @return internal state as a XML ASCII string
1179     */
1180    public String toXml(String extraOffset) {
1181       StringBuffer sb = new StringBuffer(2000);
1182       if (extraOffset == null) extraOffset = "";
1183       String offset = Constants.OFFSET + extraOffset;
1184 
1185       sb.append(offset).append("<DispatchManager id='").append(getId());
1186       if (this.msgQueue != null)
1187          sb.append("' numEntries='").append(this.msgQueue.getNumOfEntries());
1188       sb.append("' isShutdown='").append(this.isShutdown).append("'>");
1189       sb.append(this.dispatchConnectionsHandler.toXml(extraOffset+Constants.INDENT));
1190       sb.append(offset).append(" <dispatchWorkerIsActive>").append(dispatchWorkerIsActive).append("</dispatchWorkerIsActive>");
1191       sb.append(offset).append("</DispatchManager>");
1192 
1193       return sb.toString();
1194    }
1195 
1196    /**
1197     * Inhibits/activates the delivery of asynchronous dispatches of messages.
1198     * @param dispatcherActive
1199     */
1200    public void setDispatcherActive(boolean dispatcherActive) {
1201       if (log.isLoggable(Level.FINE)) log.fine(ME+": Changed dispatcherActive from " + this.dispatcherActive + " to " + dispatcherActive);
1202       this.dispatcherActive = dispatcherActive;
1203       if (this.dispatcherActive) notifyAboutNewEntry();
1204    }
1205 
1206    /**
1207     *
1208     * @return true if the dispacher is currently activated, i.e. if it is
1209     * able to deliver asynchronousy messages from the callback queue.
1210     */
1211    public boolean isDispatcherActive() {
1212       return this.dispatcherActive;
1213    }
1214 
1215    public ArrayList filterDistributorEntries(ArrayList entries, Throwable ex) {
1216       return this.dispatchConnectionsHandler.filterDistributorEntries(entries, ex);
1217    }
1218 
1219 }


syntax highlighted by Code2HTML, v. 0.9.1