1 /*------------------------------------------------------------------------------
2 Name: UpdateDispatcher.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.client;
7
8 import org.xmlBlaster.util.Global;
9 import java.util.HashMap;
10 import java.util.Set;
11
12 /**
13 * Dispatches callback messages depending on the subscriptionId to the specific client listener.
14 * @author xmlBlaster@marcelruff.info
15 */
16 public final class UpdateDispatcher
17 {
18 private static final String ME = "UpdateDispatcher";
19 private final Global glob;
20 /** This map contains the registered callback interfaces for given subscriptions.
21 The key is the subscription ID, the value is the I_Callback instance */
22 private final HashMap callbackMap;
23
24 public UpdateDispatcher(Global glob) {
25 this.glob = glob;
26 this.callbackMap = new HashMap();
27 }
28
29 /**
30 * Register a callback interface with the given subscriptionId
31 */
32 public synchronized void addCallback(String subscriptionId, I_Callback callback, boolean isPersistent) {
33 if (subscriptionId == null) {
34 throw new IllegalArgumentException("Null argument not allowed in addCallback: subscriptionId=" + subscriptionId + " callback=" + callback);
35 }
36 this.callbackMap.put(subscriptionId, new CallbackInfo(callback, isPersistent));
37 }
38
39 /**
40 * Access a callback interface for the given subscriptionId
41 * @return null if not found
42 */
43 public synchronized I_Callback getCallback(String subscriptionId) {
44 if (subscriptionId == null) {
45 throw new IllegalArgumentException("The subscriptionId is null");
46 }
47 CallbackInfo info = (CallbackInfo)this.callbackMap.get(subscriptionId);
48 return (info == null) ? null: info.callback;
49 }
50
51 /**
52 * Mark that a subscribe() invocation returned (is the server ACK).
53 * @param subscriptionId The subcribe
54 */
55 public synchronized void ackSubscription(String subscriptionId) {
56 if (subscriptionId == null) {
57 throw new IllegalArgumentException("The subscriptionId is null");
58 }
59 CallbackInfo info = (CallbackInfo)this.callbackMap.get(subscriptionId);
60 if (info != null) info.ack = true;
61 }
62
63 /**
64 * Returns a current snapshot (shallow copy).
65 */
66 public synchronized I_Callback[] getCallbacks() {
67 Set values = this.callbackMap.entrySet();
68 if (values == null || values.size() < 1) {
69 return new I_Callback[0];
70 }
71 CallbackInfo[] infoArr = (CallbackInfo[])values.toArray(new CallbackInfo[values.size()]);
72 I_Callback[] cbArr = new I_Callback[infoArr.length];
73 for (int i=0; i<infoArr.length; i++) {
74 cbArr[i] = infoArr[i].callback;
75 }
76 return cbArr;
77 }
78
79 /**
80 * Returns a current snapshot.
81 */
82 public synchronized String[] getSubscriptionIds() {
83 Set keys = this.callbackMap.keySet();
84 if (keys == null || keys.size() < 1) {
85 return new String[0];
86 }
87 return (String[])keys.toArray(new String[keys.size()]);
88 }
89
90 /**
91 * Remove the callback interface for the given subscriptionId
92 * @return the removed entry of null if subscriptionId is unknown
93 */
94 public synchronized I_Callback removeCallback(String subscriptionId) {
95 CallbackInfo info = (CallbackInfo)this.callbackMap.remove(subscriptionId);
96 return (info == null) ? null : info.callback;
97 }
98
99 /**
100 * Get number of callback registrations
101 */
102 public synchronized int size() {
103 return this.callbackMap.size();
104 }
105 /**
106 * Remove all callback registrations
107 */
108 public synchronized void clear() {
109 this.callbackMap.clear();
110 }
111
112 /**
113 * Clear all subscribes which where marked as acknowledged
114 * @return number of removed entries
115 */
116 public synchronized int clearAckNonPersistentSubscriptions() {
117 String[] ids = getSubscriptionIds();
118 int count = 0;
119 for (int i=0; i<ids.length; i++) {
120 CallbackInfo info = (CallbackInfo)this.callbackMap.get(ids[i]);
121 if (info != null && info.ack && !info.isPersistent()) {
122 this.callbackMap.remove(ids[i]);
123 count++;
124 }
125 }
126 return count;
127 }
128
129 /**
130 * Clear all subscribes which where not marked as acknowledged
131 */
132 public synchronized void clearNAKSubscriptions() {
133 String[] ids = getSubscriptionIds();
134 for (int i=0; i<ids.length; i++) {
135 CallbackInfo info = (CallbackInfo)this.callbackMap.get(ids[i]);
136 if (info != null && info.ack == false) {
137 this.callbackMap.remove(ids[i]);
138 }
139 }
140 }
141
142 /**
143 * Inner helper class to hold ACK/NAK state of subscriptions.
144 * We need this to be able to garbage collect map entries on
145 * reconnect to a different session.
146 */
147 class CallbackInfo {
148 I_Callback callback;
149 boolean persistent;
150 boolean ack = false; // Is the subscribe acknowledged from the server (the subscribe() method returned?)
151 CallbackInfo(I_Callback callback, boolean persistent) {
152 this.callback = callback;
153 this.persistent = persistent;
154 }
155 boolean isPersistent() {
156 return this.persistent;
157 }
158 };
159 }
syntax highlighted by Code2HTML, v. 0.9.1