1 /*------------------------------------------------------------------------------
2 Name: UpdateDispatcher.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.client;
7
8 import org.xmlBlaster.util.Global;
9 import org.xmlBlaster.util.XmlBuffer;
10
11 import java.util.HashMap;
12 import java.util.Set;
13
14 /**
15 * Dispatches callback messages depending on the subscriptionId to the specific client listener.
16 * @author xmlBlaster@marcelruff.info
17 */
18 public final class UpdateDispatcher
19 {
20 private static final String ME = "UpdateDispatcher";
21 private final Global glob;
22 /** This map contains the registered callback interfaces for given subscriptions.
23 The key is the subscription ID, the value is the I_Callback instance */
24 private final HashMap callbackMap;
25
26 public UpdateDispatcher(Global glob) {
27 this.glob = glob;
28 this.callbackMap = new HashMap();
29 }
30
31 /**
32 * Register a callback interface with the given subscriptionId
33 */
34 public synchronized void addCallback(String subscriptionId, I_Callback callback, boolean isPersistent) {
35 if (subscriptionId == null) {
36 throw new IllegalArgumentException("Null argument not allowed in addCallback: subscriptionId=" + subscriptionId + " callback=" + callback);
37 }
38 this.callbackMap.put(subscriptionId, new CallbackInfo(callback, isPersistent));
39 }
40
41 /**
42 * Access a callback interface for the given subscriptionId
43 * @return null if not found
44 */
45 public synchronized I_Callback getCallback(String subscriptionId) {
46 if (subscriptionId == null) {
47 throw new IllegalArgumentException("The subscriptionId is null");
48 }
49 CallbackInfo info = (CallbackInfo)this.callbackMap.get(subscriptionId);
50
51 if (info == null) {
52 String tmp = XmlBuffer.unEscapeXml(subscriptionId);
53 info = (CallbackInfo)this.callbackMap.get(tmp);
54 }
55
56 return (info == null) ? null: info.callback;
57 }
58
59 /**
60 * Mark that a subscribe() invocation returned (is the server ACK).
61 * @param subscriptionId The subcribe
62 */
63 public synchronized void ackSubscription(String subscriptionId) {
64 if (subscriptionId == null) {
65 throw new IllegalArgumentException("The subscriptionId is null");
66 }
67 CallbackInfo info = (CallbackInfo)this.callbackMap.get(subscriptionId);
68 if (info != null) info.ack = true;
69 }
70
71 /**
72 * Returns a current snapshot (shallow copy).
73 */
74 public synchronized I_Callback[] getCallbacks() {
75 Set values = this.callbackMap.entrySet();
76 if (values == null || values.size() < 1) {
77 return new I_Callback[0];
78 }
79 CallbackInfo[] infoArr = (CallbackInfo[])values.toArray(new CallbackInfo[values.size()]);
80 I_Callback[] cbArr = new I_Callback[infoArr.length];
81 for (int i=0; i<infoArr.length; i++) {
82 cbArr[i] = infoArr[i].callback;
83 }
84 return cbArr;
85 }
86
87 /**
88 * Returns a current snapshot.
89 */
90 public synchronized String[] getSubscriptionIds() {
91 Set keys = this.callbackMap.keySet();
92 if (keys == null || keys.size() < 1) {
93 return new String[0];
94 }
95 return (String[])keys.toArray(new String[keys.size()]);
96 }
97
98 /**
99 * Remove the callback interface for the given subscriptionId
100 * @return the removed entry of null if subscriptionId is unknown
101 */
102 public synchronized I_Callback removeCallback(String subscriptionId) {
103 CallbackInfo info = (CallbackInfo)this.callbackMap.remove(subscriptionId);
104 return (info == null) ? null : info.callback;
105 }
106
107 /**
108 * Get number of callback registrations
109 */
110 public synchronized int size() {
111 return this.callbackMap.size();
112 }
113 /**
114 * Remove all callback registrations
115 */
116 public synchronized void clear() {
117 this.callbackMap.clear();
118 }
119
120 /**
121 * Clear all subscribes which where marked as acknowledged
122 * @return number of removed entries
123 */
124 public synchronized int clearAckNonPersistentSubscriptions() {
125 String[] ids = getSubscriptionIds();
126 int count = 0;
127 for (int i=0; i<ids.length; i++) {
128 CallbackInfo info = (CallbackInfo)this.callbackMap.get(ids[i]);
129 if (info != null && info.ack && !info.isPersistent()) {
130 this.callbackMap.remove(ids[i]);
131 count++;
132 }
133 }
134 return count;
135 }
136
137 /**
138 * Clear all subscribes which where not marked as acknowledged
139 */
140 public synchronized void clearNAKSubscriptions() {
141 String[] ids = getSubscriptionIds();
142 for (int i=0; i<ids.length; i++) {
143 CallbackInfo info = (CallbackInfo)this.callbackMap.get(ids[i]);
144 if (info != null && info.ack == false) {
145 this.callbackMap.remove(ids[i]);
146 }
147 }
148 }
149
150 /**
151 * Inner helper class to hold ACK/NAK state of subscriptions.
152 * We need this to be able to garbage collect map entries on
153 * reconnect to a different session.
154 */
155 class CallbackInfo {
156 I_Callback callback;
157 boolean persistent;
158 boolean ack = false; // Is the subscribe acknowledged from the server (the subscribe() method returned?)
159 CallbackInfo(I_Callback callback, boolean persistent) {
160 this.callback = callback;
161 this.persistent = persistent;
162 }
163 boolean isPersistent() {
164 return this.persistent;
165 }
166 };
167 }
syntax highlighted by Code2HTML, v. 0.9.1