1 /*------------------------------------------------------------------------------
2 Name: MsgInterceptor.java
3 Project: org.xmlBlasterProject: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.test;
7
8 import java.util.logging.Logger;
9 import org.xmlBlaster.client.I_Callback;
10 import org.xmlBlaster.client.I_StreamingCallback;
11 import org.xmlBlaster.client.qos.UpdateQos;
12 import org.xmlBlaster.client.qos.UpdateReturnQos;
13 import org.xmlBlaster.client.qos.PublishReturnQos;
14 import org.xmlBlaster.client.key.UpdateKey;
15 import org.xmlBlaster.contrib.I_Update;
16 import org.xmlBlaster.util.Global;
17 import org.xmlBlaster.util.XmlBlasterException;
18 import org.xmlBlaster.util.MsgUnit;
19 import org.xmlBlaster.util.def.ErrorCode;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.lang.InterruptedException;
25 import java.util.Map;
26 import java.util.Vector;
27
28 import junit.framework.Assert;
29 import java.lang.ref.WeakReference;
30
31 /**
32 * Intercepts incoming message in update() and collects them in a Vector for nice handling.
33 */
34 public class MsgInterceptor extends Assert implements I_Callback, I_StreamingCallback, I_Update
35 {
36 private final WeakReference weakglob;
37 private final WeakReference weaklog;
38 private I_Callback testsuite;
39 //private Msgs msgs = null;
40 private int verbosity = 2;
41 private boolean countErased;
42 private I_StreamingCallback streamTestsuite;
43 private I_Update contribTestsuite;
44 private byte[] msgContent;
45
46 /**
47 * @param testsuite If != null your update() variant will be called as well
48 */
49 public MsgInterceptor(Global glob, Logger log, I_Callback testsuite, I_StreamingCallback streamTestsuite) {
50 this(glob, log, testsuite);
51 this.streamTestsuite = streamTestsuite;
52 }
53
54 /**
55 * @param testsuite If != null your update() variant will be called as well
56 */
57 public MsgInterceptor(Global glob, Logger log, I_Callback testsuite, I_Update contribTestsuite) {
58 this(glob, log, testsuite);
59 this.contribTestsuite = contribTestsuite;
60 }
61
62 /**
63 * @param testsuite If != null your update() variant will be called as well
64 */
65 public MsgInterceptor(Global glob, Logger log, I_Callback testsuite) {
66 this.weakglob = new WeakReference(glob);
67 this.weaklog = new WeakReference(log);
68 this.testsuite = testsuite;
69 //this.msgs = new Msgs();
70 }
71
72 public final Global getGlobal() {
73 return (Global)this.weakglob.get();
74 }
75
76 public final Logger getLog() {
77 return (Logger)this.weaklog.get();
78 }
79
80 public void setLogPrefix(String prefix) {
81 }
82
83 /**
84 * 0: no logging
85 * 1: simple logging
86 * 2: dump messages on arrival
87 */
88 public void setVerbosity(int val) {
89 this.verbosity = val;
90 }
91
92 /*
93 * Contains all update() messages in a Vector, but not erase events.
94 public Msgs getMsgs() {
95 return this.msgs;
96 }
97 */
98
99 /**
100 * @param countErased Set to true to count the erased notifications as well
101 */
102 public void countErased(boolean countErased) {
103 this.countErased = countErased;
104 }
105
106 /**
107 * This is the callback method (I_Callback) invoked from xmlBlaster
108 * It directly calls the update method from the testsuite (delegation)
109 */
110 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
111 String contentStr = new String(content);
112
113 if (this.verbosity == 1) {
114 String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;
115 getLog().info("Receiving update of a message oid=" + updateKey.getOid() +
116 " priority=" + updateQos.getPriority() +
117 " state=" + updateQos.getState() +
118 " content=" + cont);
119 }
120 else if (this.verbosity == 2) {
121 getLog().info("Receiving update #" + (count()+1) + " of a message cbSessionId=" + cbSessionId +
122 updateKey.toXml() + "\n" + new String(content) + updateQos.toXml());
123 }
124
125 if (this.countErased || !updateQos.isErased()) {
126 add(new Msg(cbSessionId, updateKey, content, updateQos));
127 }
128 if (testsuite != null)
129 return testsuite.update(cbSessionId, updateKey, content, updateQos);
130 else {
131 UpdateReturnQos qos = new UpdateReturnQos(getGlobal());
132 return qos.toXml();
133 }
134 }
135
136 /**
137 * This is the callback method (I_StreamingCallback) invoked from xmlBlaster
138 * It directly calls the update method from the testsuite (delegation)
139 */
140 public String update(String cbSessionId, UpdateKey updateKey, InputStream is, UpdateQos updateQos) throws XmlBlasterException {
141
142 String ret = null;
143 if (this.streamTestsuite != null) {
144 try {
145 ret = this.streamTestsuite.update(cbSessionId, updateKey, is, updateQos);
146 }
147 catch (IOException ex) {
148 throw new XmlBlasterException(Global.instance(), ErrorCode.INTERNAL_ILLEGALARGUMENT, "update", "update", ex);
149 }
150 }
151 else {
152 fail("The testsuite instance has not been defined");
153 }
154 if (this.countErased || !updateQos.isErased()) {
155 add(new Msg(cbSessionId, updateKey, msgContent, updateQos));
156 }
157 return ret;
158 }
159
160
161 /**
162 * This is the callback method (I_StreamingCallback) invoked from xmlBlaster
163 * It directly calls the update method from the testsuite (delegation)
164 */
165 public void update(String topic, InputStream is, Map attrMap) throws Exception {
166 if (this.contribTestsuite != null) {
167 try {
168 this.contribTestsuite.update(topic, is, attrMap);
169 }
170 catch (IOException ex) {
171 throw new XmlBlasterException(Global.instance(), ErrorCode.INTERNAL_ILLEGALARGUMENT, "update", "update", ex);
172 }
173 }
174 if (verbosity > 0 && is != null) {
175 ByteArrayOutputStream baos = new ByteArrayOutputStream();
176 int val = 0;
177 byte[] buf = new byte[1000];
178 while ( (val = is.read(buf)) > -1)
179 baos.write(buf, 0, val);
180 System.out.println(new String(baos.toByteArray()));
181 }
182 add(new Msg(null, null, new byte[0], null));
183 }
184
185 /**
186 * @see #waitOnUpdate(long, String, String, int)
187 */
188 public int waitOnUpdate(final long timeout, int countExpected) {
189 return waitOnUpdate(timeout, null, null, countExpected);
190 }
191
192 /**
193 * Waits until the given number of messages arrived,
194 * the messages must match the given oid and state.
195 * It is not checked if more messages would arrive as we return after
196 * countExpected are here.
197 * <p>
198 * ERASE notifies are not returned
199 * </p>
200 * <p>
201 * This method does not assert() it return the number of messages arrived
202 * which you can use to assert yourself.
203 * </p>
204 * @param timeout in milliseconds
205 * @param oid The expected message oid, if null the oid is not checked (all oids are OK)
206 * @param state The expected state, if null the state is not checked (all states are OK)
207 *
208 * @return Number of messages arrived
209 */
210 public int waitOnUpdate(final long timeout, String oid, String state, int countExpected) {
211 long pollingInterval = 50L; // check every 0.05 seconds
212 if (timeout < 50) pollingInterval = timeout / 10L;
213 long sum = 0L;
214 int countArrived = 0;
215 while (true) {
216 countArrived = getMsgs(oid, state).length;
217 if (countArrived >= countExpected)
218 return countArrived; // OK, no timeout
219 try {
220 Thread.sleep(pollingInterval);
221 }
222 catch( InterruptedException i)
223 {}
224
225 sum += pollingInterval;
226 if (sum > timeout) {
227 getLog().severe("timeout=" + timeout + " occurred for " + oid + " state=" + state + " countExpected=" + countExpected + " countArrived=" + countArrived);
228 return countArrived; // Timeout occurred
229 }
230 }
231 }
232
233 /**
234 * Sleeps until timeout and returns the arrived messages.
235 * <p>
236 * ERASE notifies are not returned
237 * </p>
238 * @see #waitOnUpdate(long, String, String)
239 */
240 public int waitOnUpdate(final long timeout) {
241 return waitOnUpdate(timeout, null, null);
242 }
243
244 /**
245 * Sleeps until timeout and returns the number of arrived messages filtered by oid and state.
246 * <p>
247 * ERASE notifies are not returned
248 * </p>
249 * @param timeout in milliseconds
250 * @param oid The expected message oid, if null the oid is not checked (all oids are OK)
251 * @param state The expected state, if null the state is not checked (all states are OK)
252 *
253 * @return Number of messages arrived
254 */
255 public int waitOnUpdate(final long timeout, String oid, String state) {
256 try {
257 Thread.sleep(timeout);
258 }
259 catch( InterruptedException i)
260 {}
261 return getMsgs(oid, state).length;
262 }
263
264
265 // Holding all messages
266 private Vector<Msg> updateVec = new Vector<Msg>();
267
268 public void add(Msg msg) {
269 this.updateVec.addElement(msg);
270 }
271
272 public void remove(Msg msg) {
273 this.updateVec.removeElement(msg);
274 }
275
276 /**
277 * Clears all arrived messages AND the countErased flag to false
278 */
279 public void clear() {
280 this.updateVec.clear();
281 this.countErased = false;
282 }
283
284 /**
285 * Access the updated message filtered by the given oid and state.
286 * @param oid if null the oid is not checked
287 * @param state if null the state is not checked
288 */
289 public Msg[] getMsgs(String oid, String state) {
290 Vector ret = new Vector();
291 for (int i=0; i<this.updateVec.size(); i++) {
292 Msg msg = (Msg)this.updateVec.elementAt(i);
293 //System.out.println("MsgInterceptor: Checking msg oid='" + msg.getOid() + "' with state='" + msg.getState() + "' against '" + oid + "' '" + state + "'");
294 if (
295 (oid == null || oid.equals(msg.getOid())) &&
296 (state == null || state.equals(msg.getState()))
297 ) {
298 ret.addElement(msg);
299 //System.out.println("MsgInterceptor: FOUND: Checking msg oid='" + msg.getOid() + "' with state='" + msg.getState() + "' against '" + oid + "' '" + state + "'");
300 }
301 }
302 return (Msg[])ret.toArray(new Msg[ret.size()]);
303 }
304
305 public Msg[] getMsgs() {
306 return getMsgs(null, null);
307 }
308
309 /**
310 * Access the updated message filtered by the given oid and state.
311 * @return null or the message
312 * @exception If more than one message is available
313 */
314 public Msg getMsg(String oid, String state) throws XmlBlasterException {
315 Msg[] msgs = getMsgs(oid, state);
316 //System.out.println("MsgInterceptor: FOUND " + msgs.length + " entries for msg oid='" + oid + "' with state='" + state);
317 if (msgs.length > 1)
318 throw new XmlBlasterException("Msgs", "update(oid=" + oid + ", state=" + state + ") " + msgs.length + " arrived instead of zero or one");
319 if (msgs.length == 0)
320 return null;
321 return msgs[0];
322 }
323
324 public int count() {
325 return this.updateVec.size();
326 }
327
328 /**
329 * Compares all messages given by parameter 'expectedArr' and compare
330 * them with the received ones. On failure a junit - assert() is thrown.
331 * <p>
332 * The correct sequence and the message data is checked.
333 * </p>
334 * @param expectedArr The published messages which we expect here as updates
335 * @param secretCbSessionId If not null it is checked as well
336 */
337 public void compareToReceived(MsgUnit[] expectedArr, String secretCbSessionId) {
338
339 assertEquals("We have received " + count() + " messages only", expectedArr.length, count());
340
341 for(int i=0; i<expectedArr.length; i++) {
342 MsgUnit expected = expectedArr[i];
343 Msg msg = (Msg)this.updateVec.elementAt(i);
344 if (secretCbSessionId != null) {
345 assertEquals("The secretCbSessionId is wrong", secretCbSessionId, msg.getCbSessionId());
346 }
347 msg.compareMsg(expected);
348 }
349 }
350
351 /**
352 * Compares all messages given by parameter 'expectedArr' and compare
353 * them with the received ones. On failure a junit - assert() is thrown.
354 * <p>
355 * Especially the sequence and the rcvTimestamp is checked.
356 * </p>
357 * @param expectedArr The published messages which we expect here as updates
358 */
359 public void compareToReceived(PublishReturnQos[] expectedArr) {
360 assertEquals("We have received " + count() + " messages only", expectedArr.length, count());
361
362 for(int i=0; i<expectedArr.length; i++) {
363 Msg msg = (Msg)this.updateVec.elementAt(i);
364 msg.compareMsg(expectedArr[i]);
365 }
366 }
367
368 public void setMsgContent(byte[] msgContent) {
369 this.msgContent = msgContent;
370 }
371
372 public String toString() {
373 StringBuilder sb = new StringBuilder();
374 for (int i=0; i<updateVec.size(); i++) {
375 Msg msg = updateVec.get(i);
376 if (i > 0)
377 sb.append("\n");
378 sb.append("#").append(i);
379 sb.append(": ").append(msg.toString());
380 }
381 return sb.toString();
382 }
383
384 } // MsgInterceptor
syntax highlighted by Code2HTML, v. 0.9.1