1 /*------------------------------------------------------------------------------
  2 Name:      UpdateDispatcher.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.client;
  7 
  8 import org.xmlBlaster.util.Global;
  9 import org.xmlBlaster.util.XmlBuffer;
 10 
 11 import java.util.HashMap;
 12 import java.util.Set;
 13 
 14 /**
 15  * Dispatches callback messages depending on the subscriptionId to the specific client listener. 
 16  * @author xmlBlaster@marcelruff.info
 17  */
 18 public final class UpdateDispatcher
 19 {
 20    private static final String ME = "UpdateDispatcher";
 21    private final Global glob;
 22    /** This map contains the registered callback interfaces for given subscriptions.
 23        The key is the subscription ID, the value is the I_Callback instance */
 24    private final HashMap callbackMap;
 25 
 26    public UpdateDispatcher(Global glob) {
 27       this.glob = glob;
 28       this.callbackMap = new HashMap();
 29    }
 30 
 31    /**
 32     * Register a callback interface with the given subscriptionId
 33     */
 34    public synchronized void addCallback(String subscriptionId, I_Callback callback, boolean isPersistent) {
 35       if (subscriptionId == null) {
 36          throw new IllegalArgumentException("Null argument not allowed in addCallback: subscriptionId=" + subscriptionId + " callback=" + callback);
 37       }
 38       this.callbackMap.put(subscriptionId, new CallbackInfo(callback, isPersistent));
 39    }
 40 
 41    /**
 42     * Access a callback interface for the given subscriptionId
 43     * @return null if not found
 44     */
 45    public synchronized I_Callback getCallback(String subscriptionId) {
 46       if (subscriptionId == null) {
 47          throw new IllegalArgumentException("The subscriptionId is null");
 48       }
 49       CallbackInfo info = (CallbackInfo)this.callbackMap.get(subscriptionId);
 50       
 51       if (info == null) {
 52          String tmp = XmlBuffer.unEscapeXml(subscriptionId);
 53          info = (CallbackInfo)this.callbackMap.get(tmp);
 54       }
 55       
 56       return (info == null) ? null: info.callback;
 57    }
 58    
 59    /**
 60     * Mark that a subscribe() invocation returned (is the server ACK). 
 61     * @param subscriptionId The subcribe
 62     */
 63    public synchronized void ackSubscription(String subscriptionId) {
 64       if (subscriptionId == null) {
 65          throw new IllegalArgumentException("The subscriptionId is null");
 66       }
 67       CallbackInfo info = (CallbackInfo)this.callbackMap.get(subscriptionId);
 68       if (info != null) info.ack = true;
 69    }
 70    
 71    /**
 72     * Returns a current snapshot (shallow copy). 
 73     */
 74    public synchronized I_Callback[] getCallbacks() {
 75       Set values = this.callbackMap.entrySet();
 76       if (values == null || values.size() < 1) {
 77          return new I_Callback[0];
 78       }
 79       CallbackInfo[] infoArr = (CallbackInfo[])values.toArray(new CallbackInfo[values.size()]);
 80       I_Callback[] cbArr = new I_Callback[infoArr.length];
 81       for (int i=0; i<infoArr.length; i++) {
 82          cbArr[i] = infoArr[i].callback;
 83       }
 84       return cbArr;
 85    }
 86 
 87    /**
 88     * Returns a current snapshot. 
 89     */
 90    public synchronized String[] getSubscriptionIds() {
 91       Set keys = this.callbackMap.keySet();
 92       if (keys == null || keys.size() < 1) {
 93          return new String[0];
 94       }
 95       return (String[])keys.toArray(new String[keys.size()]);
 96    }
 97 
 98    /**
 99     * Remove the callback interface for the given subscriptionId
100     * @return the removed entry of null if subscriptionId is unknown
101     */
102    public synchronized I_Callback removeCallback(String subscriptionId) {
103       CallbackInfo info = (CallbackInfo)this.callbackMap.remove(subscriptionId);
104       return (info == null) ? null : info.callback;
105    }
106 
107    /**
108     * Get number of callback registrations
109     */
110    public synchronized int size() {
111       return this.callbackMap.size();
112    }
113    /**
114     * Remove all callback registrations
115     */
116    public synchronized void clear() {
117       this.callbackMap.clear();
118    }
119 
120    /**
121     * Clear all subscribes which where marked as acknowledged
122     * @return number of removed entries
123     */
124    public synchronized int clearAckNonPersistentSubscriptions() {
125       String[] ids = getSubscriptionIds();
126       int count = 0;
127       for (int i=0; i<ids.length; i++) {
128          CallbackInfo info = (CallbackInfo)this.callbackMap.get(ids[i]);
129          if (info != null && info.ack && !info.isPersistent()) {
130             this.callbackMap.remove(ids[i]);
131             count++;
132          }
133       }
134       return count;
135    }
136 
137    /**
138     * Clear all subscribes which where not marked as acknowledged
139     */
140    public synchronized void clearNAKSubscriptions() {
141       String[] ids = getSubscriptionIds();
142       for (int i=0; i<ids.length; i++) {
143          CallbackInfo info = (CallbackInfo)this.callbackMap.get(ids[i]);
144          if (info != null && info.ack == false) {
145             this.callbackMap.remove(ids[i]);
146          }
147       }
148    }
149 
150    /**
151     * Inner helper class to hold ACK/NAK state of subscriptions. 
152     * We need this to be able to garbage collect map entries on
153     * reconnect to a different session.
154     */
155    class CallbackInfo {
156       I_Callback callback;
157       boolean persistent;
158       boolean ack = false; // Is the subscribe acknowledged from the server (the subscribe() method returned?)
159       CallbackInfo(I_Callback callback, boolean persistent) {
160          this.callback = callback;
161          this.persistent = persistent;
162       }
163       boolean isPersistent() {
164          return this.persistent;
165       }
166    };
167 }


syntax highlighted by Code2HTML, v. 0.9.1