[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [xmlblaster-devel] OutOfMemory Exceptions



Joshua Bowman wrote:

Hello,

The saga continues:

My application definitely has a lot of objects with short lifetimes and
almost none with longer ones.  Thus, after reading the Sun docs, I just
increased the size of the eden space using the "NewSize" parameter and
used the default garbage collector.  It seems to work much better this
way.

However, because I'm using Perl, it doesn't appear that I can make use
of the subscribe asynchronous callback.  Using get in a one-to-many
propagation structure obviously has some problems.  How does one make
sure that all clients get the message before the message is removed?

I'm currently kludging my way around this by just running a seperate
client that handles all erase-ing.  All get-ing clients check in within
some specified time quantum, so the erase-ing client simply waits until
a message has been around for two full quanta and then assumes that all
clients have checked in and removes it from the bus.

However, this seems to lead to concurrency issues.  If one client is
trying to get while another is erase-ing, it seems to generate an
exception.  Is there a way to pass an option to the bus that will ensure
messages are cleared out only after a certain time interval has
elapsed?  I've tried fiddling with destroyDelay and maxRemainingLife,
but it seems that I still need to invoke the erase command.

Any suggestions?


Hi Joshua,
I am not sure I understand all the details about what you want to do.
I believe that you want a workaround for subscribes since you don't get asynchronous callbacks in perl.
If I understood you correctly then you could do the following:


You could connect by explicitly disactivating the callback dispatcher.
You do that in the connect Qos (see the connect requirement)
http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.connect.html



You can specify a fake callback address (does not matter what). Then you can subscribe to the messages you want.
At this point the messages corresponding to your subscription are collected on the callback queue.


You now can make a "consumable get" described in
http://www.xmlblaster.org/xmlBlaster/doc/requirements/engine.qos.queryspec.QueueQuery.html


from time to time.

This way it is ensured you don't loose any update and don't have to bother about synchronizing deletion of messages.
Btw do you really need to publish on different topics ? Couldn't you work on the same topic all the time ?



Here is a functional perl code snippet, it subscribes to a topic and polls then and again for received messages in its callback queue.

As the session and subscription is persistent the perl client
will never loose any message even if it stops for a while or
if the xmlBlaster server is down for a while.

====================================
#!/usr/bin/perl
# Invoke
#   perl subscribePoller.pl http://myHost:8080/
# if xmlBlaster runs on 'myHost'
#
# Work around as we don't have a callback server
# to not loose asynchronously arriving messages for us.
#
# Connects with a persistent session 'joe/1',
# subscribes on topic 'myTopic'
# and synchronously polls the callback queue
# with get() for arrived messages.
# The session/subscription is persistent and
# we never loose any message even if the script
# terminates for a while.
#
# Test setup:
#
# Start server
#   java -Dcom.sun.management.jmxremote org.xmlBlaster.Main
# (you can use JDK's 'jconsole' to observe the server status)
#
# Start a publisher to send test messages
#  java javaclients.HelloWorldPublish -oid myTopic -numPublish 20
#
#  at author Marcel Ruff

use Frontier::Client;
use MIME::Base64;
$server_url = at ARGV[0];
if ($#ARGV == -1) {
$host = `uname -n`;
$host =~ s/^\s*(.*?)\s*$/$1/;
$server_url = "http://"; . $host . ":8080/"; # guess where xmlBlaster is
}
print "\nTrying to connect to xmlBlaster server on $server_url ...\n";


$server = Frontier::Client->new(url => $server_url);
print "Connected to xmlBlaster server on $server_url \n";

# Login and set dispatcherActive='false' as we have no callback server
# Use a fake EMAIL callback protocol to satisfy xmlBlaster
# We are only interested in the callback queue to hold the messages
$sessionId = $server->call('authenticate.login', "ben", "secret",
"<qos>
<session name='joe/3' timeout='-1'/>
<persistent/>
<queue relating='callback' maxEntries='1000'>
<callback type='EMAIL' retries='-1' pingInterval='0' dispatcherActive='false'>
a at b
</callback>
</queue>
</qos>", "");


print "Login success, got secret sessionId=$sessionId \n";

# Subscribe with persistence flag to survive server restart
# Subscribe once is enough as we have a persistent session 'joe/3'.
# To avoid duplicate subscriptions on restart of this script
# we set multiSubscribe to false
$topicId = 'myTopic';
$returnQos = $server->call('xmlBlaster.subscribe', $sessionId,
"<key oid='" . $topicId . "'/>",
"<qos>
<multiSubscribe>false</multiSubscribe>
<persistent>true</persistent>
</qos>");
print "\nResult for a subscribe(" . $topicId . "):\n------------", $returnQos, "\n------------\n";


# Poll for messages
while (true) {
$queryKey = "<key oid='__cmd:client/joe/3/?cbQueueEntries'/>";
# Access the callback queue and consume all messages from there
at msgUnits = $server->call('xmlBlaster.get', $sessionId, $queryKey,
"<qos>
<querySpec type='QueueQuery'>
<![CDATA[maxEntries=-1&maxSize=-1&consumable=true&waitingDelay=0]]>
</querySpec>
</qos>");


  print "\nResults for a get($queryKey):";
  for $i (0 .. $#msgUnits) {
     for $j (0 .. $#{$msgUnits[$i]}) {
        print "\n-------------#$j-------------------";
        $key = $msgUnits[$i][$j][0];
        $contentBase64AndEncoded = $msgUnits[$i][$j][1];
        $content = decode_base64($contentBase64AndEncoded->value());
        $qos = $msgUnits[$i][$j][2];
        print $key;
        print "\n<content>" . $content . "</content>\n";
        print $qos;  # TODO: re-subscribe if an ERASE arrives
        print "\n-------------#$j-------------------\n";
     }
  }
  sleep(2);  # Poll invterval set to 2 seconds
}

# No logout from xmlBlaster to keep the session 'joe/3'
#$server->call('authenticate.logout', $sessionId);

print "\nKeeping session, bye.\n";
===============================================

rgds

Marcel