XmlBlaster Logo

REQUIREMENT

engine.message.streaming

XmlBlaster Logo


Type NEW
Priority HIGH
Status OPEN
Topic XmlBlaster makes use of streaming to handle large/big messages
Des
cription

For use cases where large messages are published, subscribed or getted, a lot memory is needed to handle them as complete units. For such cases, a different approach is needed.
One solution is to implement streaming, that is to fill the content of a message with a Stream and let the client library handle the chunking of the message parts in a way which is transparent to the user.
On the subscriber side, in the update method, the user again must be able to retrieve the content of the message via a Stream.

To publish a message there can be two approaches:

  • Passing InputStream (as the JDBC Approach)
       InputStream in = ...; // coming from somewhere for example FileInputStream
       MsgUnit msg = new MsgUnit(key, in, qos);
       PublishReturnQos publishReturnQos = conn.publish(msg); // returns on eof
    	     
    The filling of the stream must be done somewhere else, for example it could be the file, it could come from a socket or it could be an own implementation in which case it must be filled in a separate thread.
  • Getting an OutputStream to which to write to (as the Servlet Approach)
       MsgUnit msg = new MsgUnit(key, qos);
       OutputStream os = msg.getOutputStream(conn);
       for (int i=0; ...) {
          os.write(data);
          os.flush();
       }
       os.close(); 
       PublishReturnQos returnQos = msg.getReturnQos(msg);  // fake operation to retrieve the return qos.
    	     
    This alternative offers you an output stream which you fill with what you want. You do not need to know in beforehand the size of the content to be sent. The end of the message is determined by the close operation. This approach needs an own implementation of the OutputStream. A would be implicitly published when the internal buffer of the stream is full or when a flush- or close operation is invoked on the stream.
For both approaches a configuration parameter for the size of the chunks is needed. This is an attribute in the publishQos for example chunkSize. Later on other messages could be chunked too even if not using the stream approach.

Implementation Details
Whenever the current size of the internal buffer is exceeded, or if explicitly invoked by the client, a chunk constituing a part of the message is published. To be able to reconstruct the information of the complete message on the destination side (which is the update- or the get method), the following information must be sent along the chunk as a client property:

   __CHUNK_SEQ_NUM    (only filled if the message is a multi-chunk message) (starts with 0 and increments)
   __CHUNK_EOF         when it exists it is always set to 'true' and is only set on the last chunk. On single-chunk messages is never set.
   __CHUNK_EXCEPTION   contains the exception as a blob if an exception occured.
   __CHUNK_BELONGS_TO  a unique Id identifying the complete message. This must be assigned client side.
   __CHUNK_NMAX        the complete number of chunks. This is initially not needed and would only be set if the complete size is known.
      
One open question is on the second alternative (Servlet approach) the publish is invoked implicitly, so the question is how the ReturnQos is handled, and what is the meaning of the publish method ?.

On the Update side
The old/current I_Callback Interface does not fit our purposes since it passes a byte[] object as an argument in the update Method. Two things are needed:

  • a Decorator class for MsgUnit called UpdateMsgUnit
  • a new Callback Interface I_UpdateListener having
       UpdateReturnQos I_UpdateListener.onUpdate(UpdateMsgUnit msg) throws XmlBlasterException
    	      
In I_XmlBlasterAccess the following new methods are needed:
   ConnectReturnQos connect(ConnectQos qos, I_UpdateListener updateListener) throws XmlBlasterException;
   SubscribeReturnQos subscribe(SubscribeKey subscribeKey, SubscribeQos subscribeQos, I_UpdateListener updateListener) 
      throws XmlBlasterException;
	
Again as for the publisher we have two possibilities to read the message content from the stream.
  • OutputStream
    This is passed by the user and the Client Library fills this
       UpdateReturnQos I_UpdateListener.onUpdate(UpdateMsgUnit msg) throws XmlBlasterException {
           FileOutputStream os = new FileOutputStream(...);
           msg.msgWrite(os); // does block
           return ...;
       }
    	    
  • InputStream (Servlet Approach)
    The InputStream is passed by the Client Library.
       UpdateReturnQos I_UpdateListener.onUpdate(UpdateMsgUnit msg) throws XmlBlasterException {
           InputStream in = msg.getInputStream();
           in.read(...);
             ...
           return ...;
       }
    	    

On the Get Side

Example
Java
Configure

NOTE: Configuration parameters are specified on command line (-someValue 17) or in the xmlBlaster.properties file (someValue=17). See requirement "util.property" for details.
Columns named Impl tells you if the feature is implemented.
Columns named Hot tells you if the configuration is changeable in hot operation.

See API org.xmlBlaster.client.qos.PublishQos
See API org.xmlBlaster.client.key.PublishKey
See API org.xmlBlaster.engine.xml2java.PublishQos
See REQ engine.message.lifecycle
See REQ engine.queue
See REQ engine.callback
See REQ util.property

This page is generated from the requirement XML file xmlBlaster/doc/requirements/engine.message.streaming.xml

Back to overview