1 /*------------------------------------------------------------------------------
  2 Name:      MsgInterceptor.java
  3 Project:   org.xmlBlasterProject:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.test;
  7 
  8 import java.util.logging.Logger;
  9 import org.xmlBlaster.client.I_Callback;
 10 import org.xmlBlaster.client.I_StreamingCallback;
 11 import org.xmlBlaster.client.qos.UpdateQos;
 12 import org.xmlBlaster.client.qos.UpdateReturnQos;
 13 import org.xmlBlaster.client.qos.PublishReturnQos;
 14 import org.xmlBlaster.client.key.UpdateKey;
 15 import org.xmlBlaster.contrib.I_Update;
 16 import org.xmlBlaster.util.Global;
 17 import org.xmlBlaster.util.XmlBlasterException;
 18 import org.xmlBlaster.util.MsgUnit;
 19 import org.xmlBlaster.util.def.ErrorCode;
 20 
 21 import java.io.ByteArrayOutputStream;
 22 import java.io.IOException;
 23 import java.io.InputStream;
 24 import java.lang.InterruptedException;
 25 import java.util.Map;
 26 import java.util.Vector;
 27 
 28 import junit.framework.Assert;
 29 import java.lang.ref.WeakReference;
 30 
 31 /**
 32  * Intercepts incoming message in update() and collects them in a Vector for nice handling. 
 33  */
 34 public class MsgInterceptor extends Assert implements I_Callback, I_StreamingCallback, I_Update 
 35 {
 36    private final WeakReference weakglob;
 37    private final WeakReference weaklog;
 38    private I_Callback testsuite;
 39    //private Msgs msgs = null;
 40    private int verbosity = 2;
 41    private boolean countErased;
 42    private I_StreamingCallback streamTestsuite;
 43    private I_Update contribTestsuite;
 44    private byte[] msgContent;
 45    
 46    /**
 47     * @param testsuite If != null your update() variant will be called as well
 48     */
 49    public MsgInterceptor(Global glob, Logger log, I_Callback testsuite, I_StreamingCallback streamTestsuite) {
 50       this(glob, log, testsuite);
 51       this.streamTestsuite = streamTestsuite;
 52    }
 53 
 54    /**
 55     * @param testsuite If != null your update() variant will be called as well
 56     */
 57    public MsgInterceptor(Global glob, Logger log, I_Callback testsuite, I_Update contribTestsuite) {
 58       this(glob, log, testsuite);
 59       this.contribTestsuite = contribTestsuite;
 60    }
 61 
 62    /**
 63     * @param testsuite If != null your update() variant will be called as well
 64     */
 65    public MsgInterceptor(Global glob, Logger log, I_Callback testsuite) {
 66       this.weakglob = new WeakReference(glob);
 67       this.weaklog = new WeakReference(log);
 68       this.testsuite = testsuite;
 69       //this.msgs = new Msgs();
 70    }
 71 
 72    public final Global getGlobal() {
 73       return (Global)this.weakglob.get();
 74    }
 75 
 76    public final Logger getLog() {
 77       return (Logger)this.weaklog.get();
 78    }
 79 
 80    public void setLogPrefix(String prefix) {
 81    }
 82 
 83    /**
 84     * 0: no logging
 85     * 1: simple logging
 86     * 2: dump messages on arrival
 87     */
 88    public void setVerbosity(int val) {
 89       this.verbosity = val;
 90    }
 91 
 92    /*
 93     * Contains all update() messages in a Vector, but not erase events.
 94    public Msgs getMsgs() {
 95       return this.msgs;
 96    }
 97     */
 98 
 99    /**
100     * @param countErased Set to true to count the erased notifications as well
101     */
102    public void countErased(boolean countErased) {
103       this.countErased = countErased;
104    }
105 
106    /**
107     * This is the callback method (I_Callback) invoked from xmlBlaster
108     * It directly calls the update method from the testsuite (delegation)
109     */
110    public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
111       String contentStr = new String(content);
112       
113       if (this.verbosity == 1) {
114          String cont = (contentStr.length() > 10) ? (contentStr.substring(0,10)+"...") : contentStr;
115          getLog().info("Receiving update of a message oid=" + updateKey.getOid() +
116                    " priority=" + updateQos.getPriority() +
117                    " state=" + updateQos.getState() +
118                    " content=" + cont);
119       }
120       else if (this.verbosity == 2) {
121          getLog().info("Receiving update #" + (count()+1) + " of a message cbSessionId=" + cbSessionId +
122                       updateKey.toXml() + "\n" + new String(content) + updateQos.toXml());
123       }
124 
125       if (this.countErased || !updateQos.isErased()) {
126          add(new Msg(cbSessionId, updateKey, content, updateQos));
127       }
128       if (testsuite != null)
129          return testsuite.update(cbSessionId, updateKey, content, updateQos);
130       else {
131          UpdateReturnQos qos = new UpdateReturnQos(getGlobal());
132          return qos.toXml();
133       }
134    }
135    
136    /**
137     * This is the callback method (I_StreamingCallback) invoked from xmlBlaster
138     * It directly calls the update method from the testsuite (delegation)
139     */
140    public String update(String cbSessionId, UpdateKey updateKey, InputStream is, UpdateQos updateQos) throws XmlBlasterException {
141       
142       String ret = null;
143       if (this.streamTestsuite != null) {
144          try {
145             ret = this.streamTestsuite.update(cbSessionId, updateKey, is, updateQos);
146          }
147          catch (IOException ex) {
148             throw new XmlBlasterException(Global.instance(), ErrorCode.INTERNAL_ILLEGALARGUMENT, "update", "update", ex);
149          }
150       }
151       else {
152          fail("The testsuite instance has not been defined");
153       }
154       if (this.countErased || !updateQos.isErased()) {
155          add(new Msg(cbSessionId, updateKey, msgContent, updateQos));
156       }
157       return ret;
158    }
159 
160 
161    /**
162     * This is the callback method (I_StreamingCallback) invoked from xmlBlaster
163     * It directly calls the update method from the testsuite (delegation)
164     */
165    public void update(String topic, InputStream is, Map attrMap) throws Exception {
166       if (this.contribTestsuite != null) {
167          try {
168             this.contribTestsuite.update(topic, is, attrMap);
169          }
170          catch (IOException ex) {
171             throw new XmlBlasterException(Global.instance(), ErrorCode.INTERNAL_ILLEGALARGUMENT, "update", "update", ex);
172          }
173       }
174       if (verbosity > 0 && is != null) {
175          ByteArrayOutputStream baos = new ByteArrayOutputStream();
176          int val = 0;
177          byte[] buf = new byte[1000];
178          while ( (val = is.read(buf)) > -1)
179             baos.write(buf, 0, val);
180          System.out.println(new String(baos.toByteArray()));
181       }
182       add(new Msg(null, null, new byte[0], null));
183    }
184 
185    /**
186     * @see #waitOnUpdate(long, String, String, int)
187     */
188    public int waitOnUpdate(final long timeout, int countExpected) {
189       return waitOnUpdate(timeout, null, null, countExpected);
190    }
191 
192    /**
193     * Waits until the given number of messages arrived,
194     * the messages must match the given oid and state. 
195     * It is not checked if more messages would arrive as we return after
196     * countExpected are here.
197     * <p>
198     * ERASE notifies are not returned
199     * </p>
200     * <p>
201     * This method does not assert() it return the number of messages arrived
202     * which you can use to assert yourself.
203     * </p>
204     * @param timeout in milliseconds
205     * @param oid The expected message oid, if null the oid is not checked (all oids are OK)
206     * @param state The expected state, if null the state is not checked (all states are OK)
207     *
208     * @return Number of messages arrived
209     */
210    public int waitOnUpdate(final long timeout, String oid, String state, int countExpected) {
211       long pollingInterval = 50L;  // check every 0.05 seconds
212       if (timeout < 50)  pollingInterval = timeout / 10L;
213       long sum = 0L;
214       int countArrived = 0;
215       while (true) {
216          countArrived = getMsgs(oid, state).length;
217          if (countArrived >= countExpected)
218             return countArrived; // OK, no timeout
219          try {
220             Thread.sleep(pollingInterval);
221          }
222          catch( InterruptedException i)
223          {}
224 
225          sum += pollingInterval;
226          if (sum > timeout) {
227             getLog().severe("timeout=" + timeout + " occurred for " + oid + " state=" + state + " countExpected=" + countExpected + " countArrived=" + countArrived);
228             return countArrived; // Timeout occurred
229          }
230       }
231    }
232 
233    /**
234     * Sleeps until timeout and returns the arrived messages. 
235     * <p>
236     * ERASE notifies are not returned
237     * </p>
238     * @see #waitOnUpdate(long, String, String)
239     */
240    public int waitOnUpdate(final long timeout) {
241       return waitOnUpdate(timeout, null, null);
242    }
243 
244    /**
245     * Sleeps until timeout and returns the number of arrived messages filtered by oid and state. 
246     * <p>
247     * ERASE notifies are not returned
248     * </p>
249     * @param timeout in milliseconds
250     * @param oid The expected message oid, if null the oid is not checked (all oids are OK)
251     * @param state The expected state, if null the state is not checked (all states are OK)
252     *
253     * @return Number of messages arrived
254     */
255    public int waitOnUpdate(final long timeout, String oid, String state) {
256       try {
257          Thread.sleep(timeout);
258       }
259       catch( InterruptedException i)
260       {}
261       return getMsgs(oid, state).length;
262    }
263 
264 
265    // Holding all messages
266    private Vector<Msg> updateVec = new Vector<Msg>();
267    
268    public void add(Msg msg) {
269       this.updateVec.addElement(msg);
270   }
271    
272    public void remove(Msg msg) {
273       this.updateVec.removeElement(msg);
274    }
275    
276    /**
277     * Clears all arrived messages AND the countErased flag to false
278     */
279    public void clear() { 
280       this.updateVec.clear();
281       this.countErased = false;
282    }
283 
284    /**
285     * Access the updated message filtered by the given oid and state. 
286     * @param oid if null the oid is not checked
287     * @param state if null the state is not checked
288     */
289    public Msg[] getMsgs(String oid, String state) {
290       Vector ret = new Vector();
291       for (int i=0; i<this.updateVec.size(); i++) {
292          Msg msg = (Msg)this.updateVec.elementAt(i);
293          //System.out.println("MsgInterceptor: Checking msg oid='" + msg.getOid() + "' with state='" + msg.getState() + "' against '" + oid + "' '" + state + "'");
294          if (
295              (oid == null || oid.equals(msg.getOid())) &&
296              (state == null || state.equals(msg.getState()))
297             ) {
298             ret.addElement(msg);
299             //System.out.println("MsgInterceptor: FOUND: Checking msg oid='" + msg.getOid() + "' with state='" + msg.getState() + "' against '" + oid + "' '" + state + "'");
300          }
301       }
302       return  (Msg[])ret.toArray(new Msg[ret.size()]);
303    }
304 
305    public Msg[] getMsgs() {
306       return getMsgs(null, null);
307    }
308 
309    /**
310     * Access the updated message filtered by the given oid and state. 
311     * @return null or the message
312     * @exception If more than one message is available
313     */
314    public Msg getMsg(String oid, String state) throws XmlBlasterException {
315       Msg[] msgs = getMsgs(oid, state);
316       //System.out.println("MsgInterceptor: FOUND " + msgs.length + " entries for msg oid='" + oid + "' with state='" + state);
317       if (msgs.length > 1)
318          throw new XmlBlasterException("Msgs", "update(oid=" + oid + ", state=" + state + ") " + msgs.length + " arrived instead of zero or one");
319       if (msgs.length == 0)
320          return null;
321       return msgs[0];
322    }
323 
324    public int count() {
325       return this.updateVec.size();
326    }
327 
328    /**
329     * Compares all messages given by parameter 'expectedArr' and compare
330     * them with the received ones. On failure a junit - assert() is thrown.
331     * <p>
332     * The correct sequence and the message data is checked.
333     * </p>
334     * @param expectedArr The published messages which we expect here as updates
335     * @param secretCbSessionId If not null it is checked as well
336     */
337    public void compareToReceived(MsgUnit[] expectedArr, String secretCbSessionId) {
338       
339       assertEquals("We have received " + count() + " messages only", expectedArr.length, count());
340       
341       for(int i=0; i<expectedArr.length; i++) {
342          MsgUnit expected = expectedArr[i];
343          Msg msg = (Msg)this.updateVec.elementAt(i);
344          if (secretCbSessionId != null) {
345             assertEquals("The secretCbSessionId is wrong", secretCbSessionId, msg.getCbSessionId());
346          }
347          msg.compareMsg(expected);
348       }
349    }
350 
351    /**
352     * Compares all messages given by parameter 'expectedArr' and compare
353     * them with the received ones. On failure a junit - assert() is thrown.
354     * <p>
355     * Especially the sequence and the rcvTimestamp is checked.
356     * </p>
357     * @param expectedArr The published messages which we expect here as updates
358     */
359    public void compareToReceived(PublishReturnQos[] expectedArr) {
360       assertEquals("We have received " + count() + " messages only", expectedArr.length, count());
361 
362       for(int i=0; i<expectedArr.length; i++) {
363          Msg msg = (Msg)this.updateVec.elementAt(i);
364          msg.compareMsg(expectedArr[i]);
365       }
366    }
367    
368    public void setMsgContent(byte[] msgContent) {
369       this.msgContent = msgContent;
370    }
371    
372    public String toString() {
373       StringBuilder sb = new StringBuilder();
374       for (int i=0; i<updateVec.size(); i++) {
375          Msg msg = updateVec.get(i);
376          if (i > 0)
377             sb.append("\n");
378          sb.append("#").append(i);
379          sb.append(": ").append(msg.toString());
380       }
381       return sb.toString();
382    }
383    
384 } // MsgInterceptor


syntax highlighted by Code2HTML, v. 0.9.1