[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Fwd: Re: [xmlblaster] Question on pausing the update method long enough to process messages...



I'm sorry for the confusion. I am a bit confused myself so I know I'm not
explaining myself well. Let me try again...

I'm working on a project that involves interfacing 2 systems, near real
time. I'll call those systems, 'Sys1' and 'Sys2' for this example.

We've written 2 Java applications, I'll call those 'Publish' and
'Subscribe', that run every 2 minutes.

'Publish' publishes 'Sys1's' messages to the xmlBlaster queue, and also
checks for (subscribes to) acknowledgements from 'Subscribe'. We're
estimating 20 or so messages every 2 minutes to be published.

'Subscribe' subscribes to those messages, converts them to xml DOM, edits
them, republishes them, erases the orig messages subscribed to then creates
xml files containing the republished content and stores them on a local
directory for 'Sys2' to pick up at some specified time. Right after that,
'Subscribe' checks for acknowledgements sent from 'Sys2' and if some are
found, 'Subscribe' publishes those to the xmlBlaster queue for 'Publish' to
pick up.

I have put display statements all over the 'Subscribe' code to watch what's
happening. My problem is that after I call con.subscribe(sk.toXml(),
sq.toXml()), the update() method is invoked, which is fine, but within that
update() method, I have all the code that converts the messages to xml DOM,
edits them, republishes them, erases the orig messages subscribed to then
creates xml files containing the republished content and stores them on a
local directory.

Based on my display statements, I have found that the update() method gets
invoked for each message that the subscribe retrieved, which is exactly
what I need it to do. However, I have more code following the con.subscribe
that does other processing and it is getting executed BEFORE the update()
has been called for all the messages retrieved. This caused problems so to
get around that, I increased the sleep time. The following code snippet
demonstrates what I did:

START CODE SNIPPET
*****************************************************************************************************
// First see how many messages we are going to retrieve when we
subscribe...
String sSubscribeKey = "/xmlBlaster/key/TRANSACTION[ at type='billing' or
 at type='estimate']";
GetKeyWrapper gk = new GetKeyWrapper(sSubscribeKey, Constants.XPATH);
GetQosWrapper gq = new GetQosWrapper();
MessageUnit[] messages = con.get(gk.toXml(), gq.toXml());

// Get count of messages we're going to subscribe to...
int updateCnt = messages.length;

// If we have messages to subscribe to, go ahead and subscribe...
if(updateCnt > 0)
{
      SubscribeKeyWrapper sk = new SubscribeKeyWrapper(sSubscribeKey,
Constants.XPATH);
      SubscribeQosWrapper sq = new SubscribeQosWrapper();
      SubscribeRetQos subRet = con.subscribe(sk.toXml(), sq.toXml());

      // Wait 3 secs for each message to ensure the update method has
      // time to complete...
      // This is fine for a few messages in the queue, but it will not work
for large numbers of messages in the queue.
      for(int ii=0; ii<updateCnt; ii++)
      {
            try { Thread.currentThread().sleep(3000); }
            catch(InterruptedException i) {}
      }
}


// Once the update() processing has run for all messages, I want to get all
the xml files currently sitting in the acks folder plus do other things but
the following
// code will get executed BEFORE the update() has completed for ALL
messages if I don't have the 'sleep' code above. This is my problem...
File tempUploadDir = new File("D:\\xml\\acknowledgements");
String[] fileNameList = tempUploadDir.list();
log.info("","****** There are "+fileNameList.length+" ack files to
process.");

String msg = "";
String msgKey = "";
String msgInfo = "";

// For each file in the acks directory, read it in as an xml document,
// create a string version of the xml doc, and publish message to
xmlBlaster
for (int i = 0; i < fileNameList.length; i++)
{

      File curUploadDir = new File ("D:\\xml\\acknowledgements",
fileNameList[i]);
            File f = new File(curUploadDir.toString());
            Document dcXmlDoc = docbuilder.parse(f);

      ...
      ...
      ...

      msgInfo = "<TRANSACTION type='acknowledgement' url='"+sURL+"'/>";

            //Send the message on its way
      PublishKeyWrapper pk = new PublishKeyWrapper(msgKey, "text/xml");

      pk.wrap(msgInfo);
      PublishQosWrapper pq = new PublishQosWrapper();
      MessageUnit msgUnit = new MessageUnit(pk.toXml(), msg.getBytes(),
pq.toXml());

      log.info("","****** PUBLISH: xmlString contains: " + xmlString);

      con.publish(msgUnit);

}

//Disconnect from xmlBlaster
DisconnectQos dq = new DisconnectQos();
con.disconnect(dq);
END CODE SNIPPET
*****************************************************************************************************

So what I am trying to figure out is, how do I know when the update handler
is complete for ALL messages subscribed to? I am aware of the command line
arg, burstMode.collectTime 400, but this did not solve my problem. I've
also tried using the code:

**************************************************
ConnectQos qos = new ConnectQos(glob, name, passwd);
CallbackAddress cbAddress = new CallbackAddress(glob);
cbAddress.setCollectTime(2000);      // sets burst mode to 2000
milliseconds
qos.addCallbackAddress(cbAddress);
ConnectReturnQos connectReturnQos = con.connect(qos, this);
**************************************************

however this seemed to stall prior to even calling the update. What I need
is some way to stall so that once the update is invoked, everything gets
done before moving on.

Are there other settings in the qos that I should try? Or is the qos not
where I need to focus?

I hope this helps to clarify what I'm asking. If not, please let me know
and I will try again.

Thank you very much,
Kelli



                                                                                                                                       
                      "Kelli Fuller"                                                                                                   
                      <kellibfuller at hot        To:       kfuller at caci.com                                                              
                      mail.com>                cc:                                                                                     
                                               Subject:  Fwd: Re: [xmlblaster] Question on pausing the update method long enough to    
                      11/27/2002 12:06          process messages...                                                                    
                      PM                                                                                                               
                                                                                                                                       
                                                                                                                                       











>From: Heinrich Götzger <Heinrich.Goetzger at exploding-systems.de>
>Reply-To: xmlblaster at server.xmlBlaster.org
>To: xmlBlaster List <xmlblaster at server.xmlblaster.org>
>Subject: Re: [xmlblaster] Question on pausing the update method long
enough
>to process messages...
>Date: Wed, 27 Nov 2002 16:54:53 +0100 (CET)
>
>Kelli,
>
>I'm not sure if I got your problem right.
>
>Do you want some store and forward queue which will be filled by the
>update method?
>
>
>Heinrich
>
>
>On Wed, 27 Nov 2002, Kelli Fuller wrote:
>
> > I am trying to learn how to use xmlBlaster. With that, I have written a
> > pretty basic class to subscribe to and publish messages. In this class,

>I
> > only subscribe to messages once and I only have one update() method,
> > however in that method I take the content of the message, covert it to
a
> > xml DOM, edit some node values then I republish the message with a new
>key
> > and erase the original message then finally create a xml file to store
>in a
> > directory for another system to pick up. This class is on a scheduler
>that
> > runs every 2 mintues.
> >
> > Currently, I am forcing a sleep period right after the code that
>subscribes
> > of 3 secs per message, like this:
> >
> > if(updateCnt > 0)
> > {
> >       SubscribeKeyWrapper sk = new SubscribeKeyWrapper(sSubscribeKey,
> > Constants.XPATH);
> >       SubscribeQosWrapper sq = new SubscribeQosWrapper();
> >       SubscribeRetQos subRet = con.subscribe(sk.toXml(), sq.toXml());
> >
> >       // Wait 3 secs for each message to ensure the update method has
>time
> >       // to complete...
> >       for(int ii=0; ii<updateCnt; ii++)
> >       {
> >             try { Thread.currentThread().sleep(3000); }
> >             catch(InterruptedException i) {}
> >       }
> > }
> >
> > I have to do this to prevent the code below this from being executed.
>This
> > is not a viable solution though because of the number of messages that
> > could be in the queue at any given time.
> >
> > How can I ensure that all messages I've subscribed to run through the
> > update method before continuring with any other processing? I've read
>about
> > setting the burstMode.collectTime as an argument, which I've tried, but
> > this didn't seem to solve my problem.
> >
> > I also need to ensure that for the current call to my class, that the
> > update method doesn't get called by xmlBlaster when new messages are
> > published. I want the next call to my class to get any new published
> > messages and so on. So in other words, I want to subscribe to one set
of
> > messages, call update for only that set of messages given enough time
to
> > process as described above and stop and wait 2 minutes for this class
to
>be
> > called again and repeat.
> >
> > How can I control this?
> >
> > Thanks so much,
> >
> > Kelli
> >


_________________________________________________________________
Add photos to your messages with MSN 8. Get 2 months FREE*.
http://join.msn.com/?page=features/featuredemail