1 """
  2 __version__ = '$Revision: 1.7 $'
  3 __date__    = '$Date: 2004-11-24 20:15:11 +0000 (Wed, 24 Nov 2004) $'
  4 __author__  = 'spex66@gmx.net'
  5 __license__ = 'pyBlaster is under LGPL, see http://www.xmlBlaster.org/license.html'
  6 
  7 last change by $Author: ruff $ 
  8 
  9 """
 10 
 11 # Copyright (c) 2003 Peter Arwanitis
 12 # mailto:spex66 @ gmx . net
 13 # (=PA=)
 14 
 15 
 16 """
 17 
 18  8888888b.           888888b.   888                   888
 19  888   Y88b          888  '88b  888                   888
 20  888    888          888  .88P  888                   888
 21  888   d88P 888  888 8888888K.  888  8888b.  .d8888b  888888 .d88b.  888d888
 22  8888888P'  888  888 888  'Y88b 888     '88b 88K      888   d8P  Y8b 888P'
 23  888        888  888 888    888 888 .d888888 'Y8888b. 888   88888888 888
 24  888        Y88b 888 888   d88P 888 888  888      X88 Y88b. Y8b.     888
 25  888         'Y88888 8888888P'  888 'Y888888  88888P'  'Y888 'Y8888  888
 26                  888
 27             Y8b d88P
 28              'Y88P'
 29 
 30 
 31 =======================================================
 32 THE ABSTRACT
 33              
 34 pyBlaster
 35     The Python way ("The first steps" :-)) to use www.XMLBLASTER.org
 36     
 37     A Python module that provides the complete XMLBLASTER interface for XMLRPC 
 38     This means for asynchronous updates (callbacks), too!
 39     
 40     Fredrik Lundh has provided the excellent XMLRPC library for Python.
 41        http://www.pythonware.com/products/xmlrpc/
 42 
 43 
 44 
 45 Have fun and thanks to the XMLBLASTER-team!
 46         http://www.xmlblaster.org
 47 
 48         
 49 Peter Arwanitis
 50 spex66 @ gmx . net
 51 (=PA=)
 52 
 53 =======================================================
 54 THE DETAILS
 55 
 56 Core file
 57     pyBlaster.py
 58 
 59         My 1st Step:
 60             class XmlBlasterClient
 61                 Implementation of the complete(?) XMLRPC client interface
 62                 With just a little beautifying of the method-signatures
 63 
 64         My 2nd Step:        
 65             class XmlBlasterCallbackClient
 66                 Specialisation of XmlBlasterClient with additional 
 67                 threaded XMLRPC server implementation
 68         
 69 
 70 Based on (if you have an uptodate installation, delete the provided files)
 71     xmlrpclib.py / SimpleXMLRPCServer.py ( Version 1.0.1 )
 72     
 73 Additional core files
 74     BaseService.py             class to comfortable handle threads
 75                                found in the Narval project from LOGILAB
 76                                http://www.logilab.org
 77     ThreadedXMLRPCServer.py    mixin class SimpleXMLRPCServer & BaseService
 78                                to build an threaded XMLRPCServer
 79                                
 80                                ResponsiveThreadedXMLRPCServer thanks to
 81                                Robin Munn, I've figured out a XMLRPCServer
 82                                cooperative with threading, look at the
 83                                comments in new ResponsiveThreadingTCPServer.py
 84                             
 85 Optional files
 86     ShellService.py            mixin class BaseService & InteractiveConsole
 87                                to serve an interactive Python prompt (shell) 
 88                                for debugging and testing 
 89 
 90 
 91 =======================================================
 92 THE INSTALLATION
 93 
 94 Put the pyBlaster directory (its an python package) into your 
 95 python/Lib/site-packages (or use it direct from the directory)
 96 
 97 
 98 =======================================================
 99 THE USAGE 
100 
101 Read the XMLBLASTER documentation / requirements, especially for the
102 "quality of service QoS" options.
103 
104 In your python project:
105 
106 # import
107 from pyBlaster import pyBlaster
108 
109 # build an instance
110 xb = pyBlaster.XmlBlasterCallbackClient()
111 
112 # start server / use client
113 # its up to you, thats all!
114 
115 =======================================================
116 THE TEST
117 
118 Developed under Python 2.2.2 with the XMLRPC update from pythonware
119 
120 Success stories from Jython and other CPython version are appreciated!
121 
122 Test (batteries included):
123     start pyBlaster.py in a shell and have a look at the help text
124     start pyBlaster.py in more than one shell and experiment interactive
125           with publish / subcribe / get <-- this is the python way :)
126     
127 
128 
129 (=PA=)        
130 """
131 
132 
133 from ThreadedXMLRPCServer import ResponsiveThreadedXMLRPCServer
134 import xmlrpclib
135 import sys
136 from ShellService import ShellService
137 from socket import gethostname
138 
139 true, false = 1, 0 # Bool init
140 
141 class XmlBlasterClient:
142     """Implementation of a client interface for XMLBLASTER 
143        (docstrings copied from the java version)
144     """
145     def __init__(self, xmlblaster_url=None):
146         "Optional xmlblaster_url for direct connection"
147         self.xmlblaster_url = xmlblaster_url
148         self.proxy          = None
149         self.sessionId      = None
150                 
151         if xmlblaster_url:
152             self.connect(xmlblaster_url)
153             
154     # CLIENT Interface #################################################        
155             
156     def connect(self, xmlblaster_url):
157         self.proxy = xmlrpclib.ServerProxy(xmlblaster_url)
158         print "\n==> ::CONNECT to XmlBlaster:: <=="
159         print '    Sucessful Server connect on ', xmlblaster_url
160         #self.proxy = xmlrpclib.Server(xmlblaster_url) # for old xmlrpclib versions
161         
162     # XMLBLASTER
163 
164     def login(self, username='guest', password='guest', callback_url=None, additionalConnectQos=''):
165         """
166         Do login to xmlBlaster.
167         @param additionalConnectQos For example "<session timeout='3600000' maxSessions='10'/>"
168         @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.connect.html
169         @deprecated Use connect() instead
170         @return The secret sessionId as a raw string
171         """
172         
173         if callback_url:
174             _cb = "<callback type='XMLRPC'>%s</callback>" % callback_url
175         else:
176             _cb = ""
177         
178         qos = "<qos>" + _cb + additionalConnectQos + "</qos>"
179         
180         # remember the return secret value for further usage
181         self.sessionId = self.proxy.authenticate.login(username, password, qos, "")
182         
183         print "==> ::LOGIN:: <=="
184         print '      Success with sessionId= ', self.sessionId
185         
186     def logout(self):
187     
188         print "==> ::LOGOUT:: <=="
189         self.proxy.authenticate.logout(self.sessionId)
190     
191     def publish(self, xmlKey, content, qos):
192         """
193         Publish messages.
194 
195         This variant allows to pass an array of MsgUnitRaw object, for performance reasons and
196         probably in future as an entity for transactions.
197 
198         @see org.xmlBlaster.engine.RequestBroker
199         @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html">The interface.publish requirement</a>
200         """
201         
202         print "==> ::PUBLISH:: <=="
203         return self.proxy.xmlBlaster.publish(self.sessionId, xmlKey, content, qos)
204     
205     def publishOneway(self, msgUnitArr):
206         """
207         Publish an array of messages.
208 
209         The oneway variant may be used for better performance,
210         it is not returning a value (no application level ACK)
211         and there are no exceptions supported over the connection to the client.
212 
213         @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html">The interface.publish requirement</a>
214         """
215         print "==> ::PUBLISHONEWAY:: <=="
216         """
217             Hack: I don't think the oneway is supported by XmlRpc because http requests always
218             return something, for now we fake it using a publishArr() and ignore the returned
219             value (Marcel 2004-08-12)
220         """
221         #self.proxy.xmlBlaster.publishOneway(self.sessionId, msgUnitArr)
222         ignoreReturn = self.proxy.xmlBlaster.publishArr(self.sessionId, msgUnitArr)
223 
224     def publishArr(self, msgUnitArr):
225         """
226         Publish an array of messages.
227 
228         This variant may be used for better performance.
229 
230         @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html">The interface.publish requirement</a>
231         """
232         print "==> ::PUBLISHARR:: <=="
233         return self.proxy.xmlBlaster.publishArr(self.sessionId, msgUnitArr)
234     
235     def subscribe(self, xmlKey, qos):
236         """
237         Subscribe to messages.
238 
239         @param xmlKey_literal Depending on the security plugin this key is encrypted
240         @param subscribeQoS_literal Depending on the security plugin this qos is encrypted
241         @see org.xmlBlaster.engine.RequestBroker
242         @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.subscribe.html">The interface.subscribe requirement</a>
243         """
244 
245         print "==> ::SUBSCRIBE:: <=="
246         return self.proxy.xmlBlaster.subscribe(self.sessionId, xmlKey, qos)
247         
248     def unSubscribe(self, xmlKey, qos):
249         """
250         unSubscribe  from messages.
251 
252         To pass the raw xml ASCII strings, use this method.
253 
254         @param xmlKey_literal Depending on the security plugin this key is encrypted
255         @param unSubscribe QoS_literal Depending on the security plugin this qos is encrypted
256         @see org.xmlBlaster.engine.RequestBroker
257         """
258         print "==> ::unSubscribe :: <=="
259         return self.proxy.xmlBlaster.unSubscribe (self.sessionId, xmlKey, qos)
260             
261     def get(self, xmlKey, qos):
262         """
263         Synchronous access a message.
264 
265         @see org.xmlBlaster.engine.RequestBroker
266         @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.get.html">The interface.get requirement</a>
267         """
268         print "==> ::GET:: <=="
269         return self.proxy.xmlBlaster.get(self.sessionId, xmlKey, qos)
270 
271     def erase(self, xmlKey, qos):
272         """   
273         Delete messages.
274         @see org.xmlBlaster.engine.RequestBroker
275         @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.erase.html">The interface.erase requirement</a>
276         """    
277         
278         print "==> ::ERASE:: <=="
279         return self.proxy.xmlBlaster.erase(self.sessionId, xmlKey, qos)
280         
281     def printMessages(self, messages):
282         print "   Received ", len(messages), " messages:"
283         for msg in messages:
284            key = msg[0]
285            content = msg[1]     # content is of type xmlrpclib.Binary
286            qos = msg[2]
287            print "      key=", key
288            print "      content=", content.data, " bytes"
289            print "      qos=", qos
290         
291 
292 class XmlBlasterCallbackClient(XmlBlasterClient):
293     """Specialication of the client class with the additional 
294        implementation of the server interface
295        
296        To use asynchronous update() or updateOneway() you can
297        subtype this class and override the methods with your own
298        project specific dispatchers. 
299        
300        Hint:
301          Remember that the calls occur in a seperate thread, 
302          so usage of a threadsafe queue is always a good idea
303          
304          Look for python cookbook hints on threaded programming
305        
306        The XMLRPC callback server runs on an own port as a seperate thread
307        (docstrings copied from the java version)
308     """
309 
310     def __init__(self, xmlblaster_url=None):
311     
312         # INIT client part
313         XmlBlasterClient.__init__(self, xmlblaster_url)
314         
315         self.callback_url    = None
316         self.callback_server = None
317         self.shell           = None
318         
319     # Dispatcher Class (XMLRPC Server Interface) ###############################
320 
321     class XB_CallbackDispatcher:
322         def __init__(self, xb_CallbackInstance):
323             self.xb_CallbackInstance=xb_CallbackInstance
324         def update(self, *attrs):
325             return self.xb_CallbackInstance.update(*attrs)
326         def updateOneway(self, *attrs):
327             return self.xb_CallbackInstance.updateOneway(*attrs)
328         def ping(self, *attrs):
329             return self.xb_CallbackInstance.ping(*attrs)
330 
331     # Start / Stop Server ######################################################
332 
333     def startCallbackServer(self, port=0) :
334         # Automatic port is allocated if port is 0
335 
336         self.callback_server = ResponsiveThreadedXMLRPCServer(port, 
337                                     dispatcherClass=self.XB_CallbackDispatcher, 
338                                     callbackInstance=self
339                                     )
340         
341         # thanks to Doug Palmer
342         allocated_port = self.callback_server.getConnectedPort()
343         
344         #print 'autoport acquired:: ', allocated_port
345         
346         self.callback_url = 'http://%s:%i/RPC2' % (gethostname(), allocated_port)   
347         
348         print "\n==> ::STARTCALLBACKSERVER:: <=="
349         print '      Success with callback_url= ', self.callback_url
350         
351         self.callback_server.start()
352 
353     def stopCallbackServer(self) :
354         print "\n==> ::STOPCALLBACKSERVER:: <=="
355         print "      I'm dying... "
356         self.callback_server.stop()
357         
358         print 'CBServer is alive? ', self.callback_server.isAlive()
359         print "      ...good bye!"
360         
361 
362     # Start / Stop SHELL Service ##############################################
363 
364     def startShellService(self):
365         if not self.shell:
366             print "\n==> ::STARTSHELLSERVICE:: <=="
367             self.shell = ShellService(engine=self, name='ShellService')
368             self.shell.start()
369 
370     def stopShellService(self):
371         if self.shell:
372             print "\n==> ::STOPSHELLSERVICE:: <=="
373             self.shell.stop()
374             
375     # Total Shutdown ##############################################
376     def shutdown(self):
377         "Closes all servers (joining all threads) and connections"
378         
379         # XXX 080403 PA ? thread stopping isn't workink smooth yet... help appreciated!
380         # XXX 080503 PA ? callbackserver joins now, but not perfectly smooth
381         #                 with stopping the shellservice :-/
382         
383         #                 from time to time shellservice ends automagically ?!?
384         
385         print "\n==> ::SHUTDOWN Initiated:: <=="
386         if self.sessionId: self.logout()
387         if self.callback_server: self.stopCallbackServer()
388         if self.shell: self.stopShellService()
389         print "\n==> ::SHUTDOWN Completed:: <=="
390         #sys.exit(1)
391         
392             
393 
394 
395     # SERVER Interface ##########################################################    
396 
397     def update(self, sessionId, key, content, qos):
398         """ This is the callback method invoked from the server
399             informing the client in an asynchronous mode about new messages
400             
401             You have to override this method in a specialication, to establish
402             your own logic.
403         """
404     
405         print "==> ::UPDATE:: <=="
406         print "   SessionId::    ", sessionId
407         print "         Key::    ", key
408         print "     Content::    ", content.data
409         print "         QoS::    ", qos
410         
411         return ""
412         
413     
414     def updateOneway(self, sessionId, key, content, qos):
415         """ This oneway method does not return something, it is high performing but
416             you loose the application level hand shake.
417             @see <a href="http://www.xmlBlaster.org/xmlBlaster/src/java/org/xmlBlaster/protocol/corba/xmlBlaster.idl" target="others">CORBA xmlBlaster.idl</a>
418             
419             You have to override this method in a specialication, to establish
420             your own logic.
421         """
422         print "==> ::UPDATEONEWAY:: <=="
423         print "   SessionId::    ", sessionId
424         print "         Key::    ", key
425         print "     Content::    ", content.data
426         print "         QoS::    ", qos
427 
428     def ping(self, qos):
429         """ Ping to check if the callback server is alive.
430             @param qos ""
431             @return ""
432         """
433         #print "==>::PING:: ", qos
434         
435         return ""
436 
437 # TEST with interaction #####################################################
438 
439 __usage__ = """Usage from the shell:
440         python pyBlaster.py [xmlblaster_url, 
441                              callbackport, 
442                              username, password, 
443                              XPath subscription testphrase
444                              ]
445         
446     Example:
447         java -jar lib/xmlBlaster.jar
448             <-- starts XmlBlaster
449         
450         pyBlaster.py http://<the-xb-machine>:8080 8081 me too first
451             <-- starts your first client with callbacks
452 
453         pyBlaster.py http://<the-xb-machine>:8080 8082 you too second
454             <-- starts another one in another shell (on another computer)
455 
456         XPath subscription testphrase: 'first' , 'second' or 'third'
457         look at the result of the five publish() calls, copy them for 
458         further testing
459         
460         try to publish something on your own :-) 
461         the python interactive shell is your friend
462         
463     Pythonshell:
464             _ is the instance of your XmlBlasterCallbackClient, 
465             >>> dir(_) 
466             shows the interface
467             
468             >>> print _.publish.__doc__ 
469             gives a interactive look at the docstring 
470             (it's just an example :-))
471             
472             cause callbacks are mixed on the same output, 
473             it's a bit messy sometimes :-)
474             
475             <ctrl><pause> is killing the program, without ending 
476             each thread and connection by hand
477             
478     Hint:
479             Situation:
480                 A client got a message with an oid (i.e. '3') 
481                 cause it's fitting an appropriate subsribe/XPath
482             Effect:    
483                 If the message with exactly _this_ oid is changed/altered, 
484                 the client recieves an update() on this, ignoring the 
485                 subscribe/XPath!
486                 
487                 If you have an OID once, you have an subscription to all 
488                 changes, nice!
489         
490         """
491 
492 if __name__ == '__main__':
493     # ok, it is a raw usage of sys.argv, but there is no confusion 
494     # through all the perfect modules to parse the parameters :-)
495     #
496     # for the beauty of option-parsing look elsewhere, thanks
497     try:
498         xmlblaster_url = sys.argv[1]
499         callbackport = int(sys.argv[2])
500         user = sys.argv[3]
501         passwd = sys.argv[4]
502         phrase = sys.argv[5]
503     except:
504         print __usage__
505         sys.exit()
506     
507     xb = XmlBlasterCallbackClient(xmlblaster_url)
508     xb.startCallbackServer(callbackport)
509     xb.startShellService()
510     
511     
512     print """    _.login('%s', '%s', '%s')""" % (user, passwd, xb.callback_url)
513     additionalConnectQos = "<session timeout='3600000' maxSessions='10'/>"
514     xb.login(user, passwd, xb.callback_url, additionalConnectQos)
515     
516     print """    _.subscribe("<key oid='' queryType='XPATH'>//%s</key>", "<qos/>")"""  % phrase
517     xb.subscribe("<key oid='' queryType='XPATH'>//%s</key>" % phrase, "<qos/>")
518     
519 
520     print """    _.publish("<key oid='1'><first/></key>", 'First Type Message', "<qos></qos>")"""
521     publishRetQos = xb.publish("<key oid='1'><first/></key>", 'First Type Message', "<qos></qos>")
522     #print publishRetQos
523 
524     print """    _.publish("<key oid='2'><second/></key>", 'Second Type Message', "<qos></qos>")"""
525     xb.publish("<key oid='2'><second/></key>", 'Second Type Message', "<qos></qos>")
526 
527     print """    _.publish("<key oid='3'><first/></key>", 'First Type Message', "<qos></qos>")"""
528     xb.publish("<key oid='3'><first/></key>", 'First Type Message', "<qos></qos>")
529 
530     print """    _.publish("<key oid='4'><third/></key>", 'Third Type Message', "<qos></qos>")"""
531     xb.publish("<key oid='4'><third/></key>", 'Third Type Message', "<qos></qos>")
532 
533     print """    _.publish("<key oid='5'><third/></key>", 'Third Type Message', "<qos></qos>")"""
534     xb.publish("<key oid='5'><third/></key>", 'Third Type Message', "<qos></qos>")
535     
536     #print """    _.publishArr([["<key oid='6'><first/></key>", '', "<qos/>"]]) ..."""
537     #publishArrRetQos = xb.publishArr([["<key oid='6'><first/></key>", '', "<qos/>"]])
538     #print publishArrRetQos
539 
540     print """\n\nNow it's on you, the python-prompt is yours! Yes it's an python prompt !-)
541     
542     _    <-- is the running instance
543     
544     maybe a first attempt is to copy the printed calls and alter them...
545     """ 
546 
547     


syntax highlighted by Code2HTML, v. 0.9.1