1 /*------------------------------------------------------------------------------
  2 Name:      UpdateDispatcher.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.client;
  7 
  8 import org.xmlBlaster.util.Global;
  9 import java.util.HashMap;
 10 import java.util.Set;
 11 
 12 /**
 13  * Dispatches callback messages depending on the subscriptionId to the specific client listener. 
 14  * @author xmlBlaster@marcelruff.info
 15  */
 16 public final class UpdateDispatcher
 17 {
 18    private static final String ME = "UpdateDispatcher";
 19    private final Global glob;
 20    /** This map contains the registered callback interfaces for given subscriptions.
 21        The key is the subscription ID, the value is the I_Callback instance */
 22    private final HashMap callbackMap;
 23 
 24    public UpdateDispatcher(Global glob) {
 25       this.glob = glob;
 26       this.callbackMap = new HashMap();
 27    }
 28 
 29    /**
 30     * Register a callback interface with the given subscriptionId
 31     */
 32    public synchronized void addCallback(String subscriptionId, I_Callback callback, boolean isPersistent) {
 33       if (subscriptionId == null) {
 34          throw new IllegalArgumentException("Null argument not allowed in addCallback: subscriptionId=" + subscriptionId + " callback=" + callback);
 35       }
 36       this.callbackMap.put(subscriptionId, new CallbackInfo(callback, isPersistent));
 37    }
 38 
 39    /**
 40     * Access a callback interface for the given subscriptionId
 41     * @return null if not found
 42     */
 43    public synchronized I_Callback getCallback(String subscriptionId) {
 44       if (subscriptionId == null) {
 45          throw new IllegalArgumentException("The subscriptionId is null");
 46       }
 47       CallbackInfo info = (CallbackInfo)this.callbackMap.get(subscriptionId);
 48       return (info == null) ? null: info.callback;
 49    }
 50    
 51    /**
 52     * Mark that a subscribe() invocation returned (is the server ACK). 
 53     * @param subscriptionId The subcribe
 54     */
 55    public synchronized void ackSubscription(String subscriptionId) {
 56       if (subscriptionId == null) {
 57          throw new IllegalArgumentException("The subscriptionId is null");
 58       }
 59       CallbackInfo info = (CallbackInfo)this.callbackMap.get(subscriptionId);
 60       if (info != null) info.ack = true;
 61    }
 62    
 63    /**
 64     * Returns a current snapshot (shallow copy). 
 65     */
 66    public synchronized I_Callback[] getCallbacks() {
 67       Set values = this.callbackMap.entrySet();
 68       if (values == null || values.size() < 1) {
 69          return new I_Callback[0];
 70       }
 71       CallbackInfo[] infoArr = (CallbackInfo[])values.toArray(new CallbackInfo[values.size()]);
 72       I_Callback[] cbArr = new I_Callback[infoArr.length];
 73       for (int i=0; i<infoArr.length; i++) {
 74          cbArr[i] = infoArr[i].callback;
 75       }
 76       return cbArr;
 77    }
 78 
 79    /**
 80     * Returns a current snapshot. 
 81     */
 82    public synchronized String[] getSubscriptionIds() {
 83       Set keys = this.callbackMap.keySet();
 84       if (keys == null || keys.size() < 1) {
 85          return new String[0];
 86       }
 87       return (String[])keys.toArray(new String[keys.size()]);
 88    }
 89 
 90    /**
 91     * Remove the callback interface for the given subscriptionId
 92     * @return the removed entry of null if subscriptionId is unknown
 93     */
 94    public synchronized I_Callback removeCallback(String subscriptionId) {
 95       CallbackInfo info = (CallbackInfo)this.callbackMap.remove(subscriptionId);
 96       return (info == null) ? null : info.callback;
 97    }
 98 
 99    /**
100     * Get number of callback registrations
101     */
102    public synchronized int size() {
103       return this.callbackMap.size();
104    }
105    /**
106     * Remove all callback registrations
107     */
108    public synchronized void clear() {
109       this.callbackMap.clear();
110    }
111 
112    /**
113     * Clear all subscribes which where marked as acknowledged
114     * @return number of removed entries
115     */
116    public synchronized int clearAckNonPersistentSubscriptions() {
117       String[] ids = getSubscriptionIds();
118       int count = 0;
119       for (int i=0; i<ids.length; i++) {
120          CallbackInfo info = (CallbackInfo)this.callbackMap.get(ids[i]);
121          if (info != null && info.ack && !info.isPersistent()) {
122             this.callbackMap.remove(ids[i]);
123             count++;
124          }
125       }
126       return count;
127    }
128 
129    /**
130     * Clear all subscribes which where not marked as acknowledged
131     */
132    public synchronized void clearNAKSubscriptions() {
133       String[] ids = getSubscriptionIds();
134       for (int i=0; i<ids.length; i++) {
135          CallbackInfo info = (CallbackInfo)this.callbackMap.get(ids[i]);
136          if (info != null && info.ack == false) {
137             this.callbackMap.remove(ids[i]);
138          }
139       }
140    }
141 
142    /**
143     * Inner helper class to hold ACK/NAK state of subscriptions. 
144     * We need this to be able to garbage collect map entries on
145     * reconnect to a different session.
146     */
147    class CallbackInfo {
148       I_Callback callback;
149       boolean persistent;
150       boolean ack = false; // Is the subscribe acknowledged from the server (the subscribe() method returned?)
151       CallbackInfo(I_Callback callback, boolean persistent) {
152          this.callback = callback;
153          this.persistent = persistent;
154       }
155       boolean isPersistent() {
156          return this.persistent;
157       }
158    };
159 }


syntax highlighted by Code2HTML, v. 0.9.1