[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Fwd: Re: [xmlblaster] Question on pausing the update method long enough to process messages...



Kelli,

hmm, this sounds quite interessting. So I'm still not sure if I get all
your requirements right.

What I would see, that your problem has to break apart in small tasks. I
would think about putting single processing needs into single
tasks/Threads and try to uncouple them.

So for example having some updateWorker thread in sys2 which receives all
msg from sys1 and does the processing independant of the subscriber
itself.

Once a subscribe is placed, the update will be invoked, if there are
messages in the queue. The update will be called whenever there are
messages in the queue. Otherwise the update is blocked, once it's called
and not finished.

If you want to know if you received all messages you are waiting for, you
need to publish the number of thme first. Right? Or send some
'end_of_messages' last.

It probably would help to introduce some more publisher or subscribers
(in different threads?).

I'm not sure if this can be solved with xmlBlaster settings itself without
a redesign of the clients. Otherwise I would need to spend more time to
capture the whole problem to find some solution.

regards

Heinrich


On Wed, 27 Nov 2002, Kelli Fuller wrote:

>
> I'm sorry for the confusion. I am a bit confused myself so I know I'm not
> explaining myself well. Let me try again...
>
> I'm working on a project that involves interfacing 2 systems, near real
> time. I'll call those systems, 'Sys1' and 'Sys2' for this example.
>
> We've written 2 Java applications, I'll call those 'Publish' and
> 'Subscribe', that run every 2 minutes.
>
> 'Publish' publishes 'Sys1's' messages to the xmlBlaster queue, and also
> checks for (subscribes to) acknowledgements from 'Subscribe'. We're
> estimating 20 or so messages every 2 minutes to be published.
>
> 'Subscribe' subscribes to those messages, converts them to xml DOM, edits
> them, republishes them, erases the orig messages subscribed to then creates
> xml files containing the republished content and stores them on a local
> directory for 'Sys2' to pick up at some specified time. Right after that,
> 'Subscribe' checks for acknowledgements sent from 'Sys2' and if some are
> found, 'Subscribe' publishes those to the xmlBlaster queue for 'Publish' to
> pick up.
>
> I have put display statements all over the 'Subscribe' code to watch what's
> happening. My problem is that after I call con.subscribe(sk.toXml(),
> sq.toXml()), the update() method is invoked, which is fine, but within that
> update() method, I have all the code that converts the messages to xml DOM,
> edits them, republishes them, erases the orig messages subscribed to then
> creates xml files containing the republished content and stores them on a
> local directory.
>
> Based on my display statements, I have found that the update() method gets
> invoked for each message that the subscribe retrieved, which is exactly
> what I need it to do. However, I have more code following the con.subscribe
> that does other processing and it is getting executed BEFORE the update()
> has been called for all the messages retrieved. This caused problems so to
> get around that, I increased the sleep time. The following code snippet
> demonstrates what I did:
>
> START CODE SNIPPET
> *****************************************************************************************************
> // First see how many messages we are going to retrieve when we
> subscribe...
> String sSubscribeKey = "/xmlBlaster/key/TRANSACTION[ at type='billing' or
>  at type='estimate']";
> GetKeyWrapper gk = new GetKeyWrapper(sSubscribeKey, Constants.XPATH);
> GetQosWrapper gq = new GetQosWrapper();
> MessageUnit[] messages = con.get(gk.toXml(), gq.toXml());
>
> // Get count of messages we're going to subscribe to...
> int updateCnt = messages.length;
>
> // If we have messages to subscribe to, go ahead and subscribe...
> if(updateCnt > 0)
> {
>       SubscribeKeyWrapper sk = new SubscribeKeyWrapper(sSubscribeKey,
> Constants.XPATH);
>       SubscribeQosWrapper sq = new SubscribeQosWrapper();
>       SubscribeRetQos subRet = con.subscribe(sk.toXml(), sq.toXml());
>
>       // Wait 3 secs for each message to ensure the update method has
>       // time to complete...
>       // This is fine for a few messages in the queue, but it will not work
> for large numbers of messages in the queue.
>       for(int ii=0; ii<updateCnt; ii++)
>       {
>             try { Thread.currentThread().sleep(3000); }
>             catch(InterruptedException i) {}
>       }
> }
>
>
> // Once the update() processing has run for all messages, I want to get all
> the xml files currently sitting in the acks folder plus do other things but
> the following
> // code will get executed BEFORE the update() has completed for ALL
> messages if I don't have the 'sleep' code above. This is my problem...
> File tempUploadDir = new File("D:\\xml\\acknowledgements");
> String[] fileNameList = tempUploadDir.list();
> log.info("","****** There are "+fileNameList.length+" ack files to
> process.");
>
> String msg = "";
> String msgKey = "";
> String msgInfo = "";
>
> // For each file in the acks directory, read it in as an xml document,
> // create a string version of the xml doc, and publish message to
> xmlBlaster
> for (int i = 0; i < fileNameList.length; i++)
> {
>
>       File curUploadDir = new File ("D:\\xml\\acknowledgements",
> fileNameList[i]);
>             File f = new File(curUploadDir.toString());
>             Document dcXmlDoc = docbuilder.parse(f);
>
>       ...
>       ...
>       ...
>
>       msgInfo = "<TRANSACTION type='acknowledgement' url='"+sURL+"'/>";
>
>             //Send the message on its way
>       PublishKeyWrapper pk = new PublishKeyWrapper(msgKey, "text/xml");
>
>       pk.wrap(msgInfo);
>       PublishQosWrapper pq = new PublishQosWrapper();
>       MessageUnit msgUnit = new MessageUnit(pk.toXml(), msg.getBytes(),
> pq.toXml());
>
>       log.info("","****** PUBLISH: xmlString contains: " + xmlString);
>
>       con.publish(msgUnit);
>
> }
>
> //Disconnect from xmlBlaster
> DisconnectQos dq = new DisconnectQos();
> con.disconnect(dq);
> END CODE SNIPPET
> *****************************************************************************************************
>
> So what I am trying to figure out is, how do I know when the update handler
> is complete for ALL messages subscribed to? I am aware of the command line
> arg, burstMode.collectTime 400, but this did not solve my problem. I've
> also tried using the code:
>
> **************************************************
> ConnectQos qos = new ConnectQos(glob, name, passwd);
> CallbackAddress cbAddress = new CallbackAddress(glob);
> cbAddress.setCollectTime(2000);      // sets burst mode to 2000
> milliseconds
> qos.addCallbackAddress(cbAddress);
> ConnectReturnQos connectReturnQos = con.connect(qos, this);
> **************************************************
>
> however this seemed to stall prior to even calling the update. What I need
> is some way to stall so that once the update is invoked, everything gets
> done before moving on.
>
> Are there other settings in the qos that I should try? Or is the qos not
> where I need to focus?
>
> I hope this helps to clarify what I'm asking. If not, please let me know
> and I will try again.
>
> Thank you very much,
> Kelli
>
>
>
>
>                       "Kelli Fuller"
>                       <kellibfuller at hot        To:       kfuller at caci.com
>                       mail.com>                cc:
>                                                Subject:  Fwd: Re: [xmlblaster] Question on pausing the update method long enough to
>                       11/27/2002 12:06          process messages...
>                       PM
>
>
>
>
>
>
>
>
>
>
>
>
>
> >From: Heinrich Götzger <Heinrich.Goetzger at exploding-systems.de>
> >Reply-To: xmlblaster at server.xmlBlaster.org
> >To: xmlBlaster List <xmlblaster at server.xmlblaster.org>
> >Subject: Re: [xmlblaster] Question on pausing the update method long
> enough
> >to process messages...
> >Date: Wed, 27 Nov 2002 16:54:53 +0100 (CET)
> >
> >Kelli,
> >
> >I'm not sure if I got your problem right.
> >
> >Do you want some store and forward queue which will be filled by the
> >update method?
> >
> >
> >Heinrich
> >
> >
> >On Wed, 27 Nov 2002, Kelli Fuller wrote:
> >
> > > I am trying to learn how to use xmlBlaster. With that, I have written a
> > > pretty basic class to subscribe to and publish messages. In this class,
>
> >I
> > > only subscribe to messages once and I only have one update() method,
> > > however in that method I take the content of the message, covert it to
> a
> > > xml DOM, edit some node values then I republish the message with a new
> >key
> > > and erase the original message then finally create a xml file to store
> >in a
> > > directory for another system to pick up. This class is on a scheduler
> >that
> > > runs every 2 mintues.
> > >
> > > Currently, I am forcing a sleep period right after the code that
> >subscribes
> > > of 3 secs per message, like this:
> > >
> > > if(updateCnt > 0)
> > > {
> > >       SubscribeKeyWrapper sk = new SubscribeKeyWrapper(sSubscribeKey,
> > > Constants.XPATH);
> > >       SubscribeQosWrapper sq = new SubscribeQosWrapper();
> > >       SubscribeRetQos subRet = con.subscribe(sk.toXml(), sq.toXml());
> > >
> > >       // Wait 3 secs for each message to ensure the update method has
> >time
> > >       // to complete...
> > >       for(int ii=0; ii<updateCnt; ii++)
> > >       {
> > >             try { Thread.currentThread().sleep(3000); }
> > >             catch(InterruptedException i) {}
> > >       }
> > > }
> > >
> > > I have to do this to prevent the code below this from being executed.
> >This
> > > is not a viable solution though because of the number of messages that
> > > could be in the queue at any given time.
> > >
> > > How can I ensure that all messages I've subscribed to run through the
> > > update method before continuring with any other processing? I've read
> >about
> > > setting the burstMode.collectTime as an argument, which I've tried, but
> > > this didn't seem to solve my problem.
> > >
> > > I also need to ensure that for the current call to my class, that the
> > > update method doesn't get called by xmlBlaster when new messages are
> > > published. I want the next call to my class to get any new published
> > > messages and so on. So in other words, I want to subscribe to one set
> of
> > > messages, call update for only that set of messages given enough time
> to
> > > process as described above and stop and wait 2 minutes for this class
> to
> >be
> > > called again and repeat.
> > >
> > > How can I control this?
> > >
> > > Thanks so much,
> > >
> > > Kelli
> > >