1 #!/usr/bin/perl
  2 # Invoke
  3 #   perl subscribePoller.pl http://myHost:8080/
  4 # if xmlBlaster runs on 'myHost'
  5 #
  6 # Work around as we don't have a callback server
  7 # to not loose asynchronously arriving messages for us.
  8 #
  9 # Connects with a persistent session 'joe/1',
 10 # subscribes on topic 'myTopic'
 11 # and synchronously polls the callback queue
 12 # with get() for arrived messages.
 13 # The session/subscription is persistent and
 14 # we never loose any message even if the script
 15 # terminates for a while.
 16 #
 17 # Test setup:
 18 #
 19 # Start server
 20 #   java -Dcom.sun.management.jmxremote org.xmlBlaster.Main
 21 # (you can use JDK's 'jconsole' to observe the server status)
 22 #
 23 # Start a publisher to send test messages
 24 #  java javaclients.HelloWorldPublish -oid myTopic -numPublish 20
 25 #
 26 # @author Marcel Ruff
 27 
 28 use Frontier::Client;
 29 use MIME::Base64;
 30     
 31 $server_url = @ARGV[0];
 32 if ($#ARGV == -1) {
 33    $host = `uname -n`;
 34    $host =~ s/^\s*(.*?)\s*$/$1/;
 35    $server_url = "http://" . $host . ":8080/";  # guess where xmlBlaster is
 36 }
 37 print "\nTrying to connect to xmlBlaster server on $server_url ...\n";
 38 
 39 $server = Frontier::Client->new(url => $server_url);
 40 print "Connected to xmlBlaster server on $server_url \n";
 41 
 42 # Login and set dispatcherActive='false' as we have no callback server
 43 # Use a fake EMAIL callback protocol to satisfy xmlBlaster
 44 # We are only interested in the callback queue to hold the messages
 45 $sessionId = $server->call('authenticate.login', "ben", "secret",
 46     "<qos>
 47       <session name='joe/3' timeout='-1'/>
 48       <persistent/>
 49       <queue relating='callback' maxEntries='1000'>
 50         <callback type='EMAIL' retries='-1' pingInterval='0' dispatcherActive='false'>
 51           a@b
 52         </callback>
 53       </queue>
 54     </qos>", "");
 55 
 56 print "Login success, got secret sessionId=$sessionId \n";
 57 
 58 # Subscribe with persistence flag to survive server restart
 59 # Subscribe once is enough as we have a persistent session 'joe/3'.
 60 # To avoid duplicate subscriptions on restart of this script
 61 # we set multiSubscribe to false
 62 $topicId = 'myTopic';
 63 $returnQos = $server->call('xmlBlaster.subscribe', $sessionId,
 64        "<key oid='" . $topicId . "'/>",
 65        "<qos>
 66           <multiSubscribe>false</multiSubscribe>
 67           <persistent>true</persistent>
 68         </qos>");
 69 print "\nResult for a subscribe(" . $topicId . "):\n------------", $returnQos, "\n------------\n";
 70 
 71 # Poll for messages
 72 while (true) {
 73    $queryKey = "<key oid='__cmd:client/joe/3/?cbQueueEntries'/>";
 74    # Access the callback queue and consume all messages from there
 75    @msgUnits = $server->call('xmlBlaster.get', $sessionId, $queryKey, 
 76       "<qos>
 77         <querySpec type='QueueQuery'>
 78           <![CDATA[maxEntries=-1&maxSize=-1&consumable=true&waitingDelay=0]]>
 79         </querySpec>
 80       </qos>");
 81 
 82    print "\nResults for a get($queryKey):";
 83    for $i (0 .. $#msgUnits) {
 84       for $j (0 .. $#{$msgUnits[$i]}) {
 85          print "\n-------------#$j-------------------";
 86          $key = $msgUnits[$i][$j][0];
 87          $contentBase64AndEncoded = $msgUnits[$i][$j][1];
 88          $content = decode_base64($contentBase64AndEncoded->value());
 89          $qos = $msgUnits[$i][$j][2];
 90          print $key;
 91          print "\n<content>" . $content . "</content>\n";
 92          print $qos;  # TODO: re-subscribe if an ERASE arrives
 93          print "\n-------------#$j-------------------\n";
 94       }
 95    }
 96    sleep(2);  # Poll invterval set to 2 seconds
 97 }
 98 
 99 # No logout from xmlBlaster to keep the session 'joe/3'
100 #$server->call('authenticate.logout', $sessionId);
101 
102 print "\nKeeping session, bye.\n";


syntax highlighted by Code2HTML, v. 0.9.1