1 /*------------------------------------------------------------------------------
  2 Name:      StreamingCallback.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 
  7 package org.xmlBlaster.client;
  8 
  9 import java.io.ByteArrayInputStream;
 10 import java.io.IOException;
 11 import java.io.OutputStream;
 12 import java.io.PipedInputStream;
 13 import java.io.PipedOutputStream;
 14 import java.util.ArrayList;
 15 import java.util.logging.Level;
 16 import java.util.logging.Logger;
 17 
 18 import org.xmlBlaster.client.key.UpdateKey;
 19 import org.xmlBlaster.client.qos.ConnectQos;
 20 import org.xmlBlaster.client.qos.UpdateQos;
 21 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
 22 import org.xmlBlaster.jms.XBConnectionMetaData;
 23 import org.xmlBlaster.util.Global;
 24 import org.xmlBlaster.util.I_Timeout;
 25 import org.xmlBlaster.util.MsgUnit;
 26 import org.xmlBlaster.util.Timeout;
 27 import org.xmlBlaster.util.Timestamp;
 28 import org.xmlBlaster.util.XmlBlasterException;
 29 import org.xmlBlaster.util.def.Constants;
 30 import org.xmlBlaster.util.def.ErrorCode;
 31 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 32 import org.xmlBlaster.util.key.MsgKeyData;
 33 import org.xmlBlaster.util.qos.ClientProperty;
 34 import org.xmlBlaster.util.qos.MsgQosData;
 35 import org.xmlBlaster.util.qos.storage.ClientQueueProperty;
 36 import org.xmlBlaster.util.queue.I_Queue;
 37 import org.xmlBlaster.util.queue.StorageId;
 38 
 39 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 40 import EDU.oswego.cs.dl.util.concurrent.Mutex;
 41 
 42 /**
 43  * StreamingCallback
 44  * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
 45  */
 46 public class StreamingCallback implements I_Callback, I_Timeout, I_ConnectionStateListener {
 47 
 48    /**
 49     * 
 50     * Writer needed since the out stream must be written from a thread which does not
 51     * die before the thread which reads the in counterpart. For some "strange" reason
 52     * the implementation of the Pipe streams makes a check if the thread which has 
 53     * made the last write operation on the out stream still is valid. If not, a Dead
 54     * End IO Exception is thrown when reading.
 55     * 
 56     * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
 57     */
 58    class Writer extends Thread {
 59 
 60       class WriterData extends Mutex {
 61          private OutputStream outStrm;
 62          private byte[] data;
 63          private Throwable exception;
 64          
 65          public WriterData(OutputStream out, byte[] data) {
 66             super();
 67             this.outStrm = out;
 68             this.data = data;
 69          }
 70       }
 71 
 72       private LinkedQueue channel;
 73       
 74       public Writer(String name) {
 75          super(name);
 76          this.channel = new LinkedQueue();
 77          setDaemon(true);
 78          start();
 79       }
 80 
 81       public Writer() {
 82          super();
 83          this.channel = new LinkedQueue();
 84          setDaemon(true);
 85          start();
 86       }
 87       
 88       public synchronized void write(OutputStream outStream, byte[] buf) throws InterruptedException, XmlBlasterException {
 89          WriterData data = new WriterData(outStream, buf);
 90          try {
 91             data.acquire();
 92             this.channel.put(data);
 93             data.acquire(); // waits until the other thread is finished
 94             if (data.exception != null)
 95                throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "write: a throwable occured", "", data.exception);
 96          }
 97          finally {
 98             data.release();
 99          }
100       }
101 
102       public synchronized void close(OutputStream outStream) throws InterruptedException, XmlBlasterException {
103          WriterData data = new WriterData(outStream, null);
104          try {
105             data.acquire();
106             this.channel.put(data);
107             data.acquire(); // waits until the other thread is finished
108             if (data.exception != null)
109                throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "close: a throwable occured", "", data.exception);
110          }
111          finally {
112             data.release();
113          }
114       }
115 
116       /**
117        * @see java.lang.Thread#run()
118        */
119       public void run() {
120          while (true) {
121             try {
122                WriterData writerData = (WriterData)this.channel.take();
123                try {
124                   if (writerData.outStrm != null) {
125                      if (writerData.data != null) {
126 
127                         int bytesLeft = writerData.data.length;
128                         int bytesRead = 0;
129                         final int MAX_CHUNK_SIZE = 4096;
130                         while (bytesLeft > 0) {
131                            int toRead = bytesLeft > MAX_CHUNK_SIZE ? MAX_CHUNK_SIZE : bytesLeft;
132                            writerData.outStrm.write(writerData.data, bytesRead, toRead);
133                            writerData.outStrm.flush();
134                            bytesRead += toRead;
135                            bytesLeft -= toRead;
136                         }
137                         // writerData.out.write(0);
138                         // writerData.out.flush();
139                         
140                         // these would block (probably the pipes are not the best in the world
141                         // writerData.out.write(writerData.data);
142                         // writerData.out.flush();
143                      }
144                      else
145                         writerData.outStrm.close();
146                   }
147                }
148                catch (Throwable e) {
149                   writerData.exception = e;
150                }
151                finally {
152                   writerData.release();
153                }
154             }
155             catch (Throwable e) {
156                if (e.getMessage().indexOf("Pipe closed") < 0) {
157                   log.warning("An exception occured when writing to the stream: ' " + e.getMessage());
158                   e.printStackTrace();
159                }
160                else if (log.isLoggable(Level.FINE)) {
161                   log.fine("The pipe was closed, which resulted in an IO Exception. It can happen when the client has returned before reading the complete message");
162                   e.printStackTrace();
163                }
164             }
165          }
166       }
167    }
168    
169    
170    class ExecutionThread extends Thread {
171       
172       private String cbSessionId_;
173       private UpdateKey updateKey_;
174       private byte[] content_;
175       private UpdateQos updateQos_;
176       
177       public ExecutionThread(String cbSessId, UpdateKey updKey, byte[] content, UpdateQos updQos) {
178          this.cbSessionId_ = cbSessId;
179          this.updateKey_ = updKey;
180          this.content_ = content;
181          this.updateQos_ = updQos;
182          
183       }
184       
185       public void run() {
186          try {
187             ret = updateNewMessage(cbSessionId_, updateKey_, content_, updateQos_);
188             clearQueue();
189          }
190          catch (Throwable e) {
191             setException(e);
192             e.printStackTrace();
193          }
194          finally {
195             try {
196                if (in != null)
197                   in.close();
198             }
199             catch (IOException e) {
200                e.printStackTrace();
201             }
202             mutex.release();
203          }
204       }
205    };
206 
207    private static Logger log = Logger.getLogger(StreamingCallback.class.getName());
208    public final static String ENTRY_CB_SESSION_ID = "__entryCbSessionId";
209    
210    private I_StreamingCallback callback;
211 
212    private Global global;
213    private PipedOutputStream out;
214    private PipedInputStream in;
215    private XmlBlasterException ex;
216    private String ret;
217    private String cbSessionId;
218    private Writer writer;
219    /** The time to wait in ms until returning when waiting (if zero or negative inifinite) */
220    private long waitForChunksTimeout;
221    // private long waitForClientReturnTimeout;
222    private Timeout timer;
223    private Timestamp timestamp; // the key for the timeout timer (can be null)
224    private I_Queue queue; // optional client side queue
225    private boolean useQueue;
226    private boolean initialized;
227    private boolean lastMessageCompleted = true;
228    private final Mutex mutex;
229    
230    private void reset() throws XmlBlasterException {
231       this.out = null;
232       this.in = null;
233       this.ret = null;
234       this.cbSessionId = null;
235    }
236    
237    public StreamingCallback(Global global, I_StreamingCallback callback) throws XmlBlasterException {
238       this(global, callback, 0L, 0L, false);
239    }
240    
241    /**
242     * 
243     * @param callback
244     */
245    public StreamingCallback(Global global, I_StreamingCallback callback, long waitForChunksTimeout, long waitForClientReturnTimeout, boolean useQueue) 
246       throws XmlBlasterException {
247       this.callback = callback;
248       this.global = global;
249       this.mutex = new Mutex();
250       String writerName = StreamingCallback.class.getName() + "-writer";
251       synchronized(this.global) {
252          this.writer = (Writer)this.global.getObjectEntry(writerName);
253          if (this.writer == null) {
254             this.writer = new Writer();
255             this.global.addObjectEntry(writerName, this.writer);
256          }
257       }
258       this.waitForChunksTimeout = waitForChunksTimeout;
259       // this.waitForClientReturnTimeout = waitForClientReturnTimeout;
260       if (this.waitForChunksTimeout > 0L) {
261          String timerName = StreamingCallback.class.getName() + "-timer";
262          synchronized(this.global) {
263             this.timer = (Timeout)this.global.getObjectEntry(timerName);
264             if (this.timer == null) {
265                this.timer = new Timeout(timerName);
266                this.global.addObjectEntry(timerName, this.timer);
267             }
268          }
269       }
270       this.useQueue = useQueue;
271       // TODO latch until connected to avoit early updates
272    }
273    
274    /**
275     * 
276     * @return the number of delivered entries from local client update queue.
277     */
278    public final int sendInitialQueueEntries() throws XmlBlasterException {
279       if (this.queue == null)
280          return 0;
281       ArrayList list = this.queue.peek(-1, -1L);
282       for (int i=0; i < list.size(); i++) {
283          MsgQueuePublishEntry entry = (MsgQueuePublishEntry)list.get(i);
284          MsgKeyData key = entry.getMsgKeyData();
285          MsgQosData qos =(MsgQosData)entry.getMsgUnit().getQosData();
286          byte[] cont = entry.getMsgUnit().getContent();
287          String entryCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null);
288          qos.getClientProperties().remove(ENTRY_CB_SESSION_ID);
289          final boolean isExternal = false; // we don't want to store these entries since already here
290          updateInternal(entryCbSessionId, new UpdateKey(key), cont, new UpdateQos(this.global, qos), isExternal);
291       }
292       this.queue.clear();
293       return list.size();
294    }
295    
296    private final void storeEntry(String cbSessId, UpdateKey key, byte[] cont, UpdateQos qos) throws XmlBlasterException {
297       if (this.queue == null)
298          return;
299       final boolean ignorePutInterceptor = false;
300       if (cbSessId != null) {
301          String oldCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null);
302          if (oldCbSessionId != null && !oldCbSessionId.equals(cbSessId)) {
303             log.warning("the client property '" + ENTRY_CB_SESSION_ID + "' is a reserved word, we will overwrite its value='" + oldCbSessionId + "' to be '" + cbSessionId + "'");
304             ClientProperty prop = new ClientProperty(ENTRY_CB_SESSION_ID, null, null, cbSessId);
305             qos.getClientProperties().put(prop.getName(), prop);
306          }
307       }
308       MsgUnit msgUnit = new MsgUnit(key.getData(), cont, qos.getData());
309       MsgQueuePublishEntry entry = new MsgQueuePublishEntry(this.global, msgUnit, this.queue.getStorageId());
310       this.queue.put(entry, ignorePutInterceptor);
311    }
312    
313    /**
314     * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
315     */
316    public String updateStraight(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException {
317       log.fine("cbSessionId='" + cbSessId + "'");
318       ByteArrayInputStream bais = new ByteArrayInputStream(cont);
319       return this.callback.update(cbSessId, updKey, bais, updQos);
320    }
321    
322    /**
323     * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
324     */
325    public String updateNewMessage(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException {
326       log.fine("cbSessionId='" + cbSessId + "'");
327       return this.callback.update(cbSessId, updKey, in, updQos);
328    }
329 
330    private final boolean isFirstChunk(UpdateQos qos) {
331       int seq = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_SEQ, log), 0);
332       return seq == 0;
333    }
334    
335    private final boolean isLastChunk(UpdateQos qos) {
336       boolean hasGroupId = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_ID, log), (String)null) != null;
337       if (!hasGroupId)
338          return true;
339       return qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_EOF, log), false);
340    }
341    
342    private final ClientProperty getProp(String key, UpdateQos qos) {
343       return qos.getClientProperty(Constants.addJmsPrefix(key, log));
344    }
345    
346    /**
347     * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
348     */
349    public String update(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException {
350       boolean sendInitial = this.queue != null && this.lastMessageCompleted && this.queue.getNumOfEntries() > 0; 
351       if (sendInitial)
352          sendInitialQueueEntries();
353       
354       final boolean isExternal = true;
355       log.fine("cbSessionId='" + cbSessId + "'");
356       return updateInternal(cbSessId, updKey, cont, updQos, isExternal);
357    }
358    
359    /**
360     * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
361     */
362    private final String updateInternal(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos, boolean isExternal) throws XmlBlasterException {
363       this.lastMessageCompleted = false;
364       boolean doStore = isExternal;
365       boolean isLastChunk = false;
366       try {
367          log.fine("entering with cbSessionId='" + cbSessId + "'");
368          if (this.timer != null && this.timestamp != null) { // no need to be threadsafe since update is single thread
369             this.timer.removeTimeoutListener(this.timestamp);
370             this.timestamp = null;
371          }
372          ClientProperty exProp = getProp(XBConnectionMetaData.JMSX_GROUP_EX, updQos);
373          // TODO Check if this exception really should be thrown: I think it shall not be thrown since it is an exception
374          // which occured when publishing and this is the information that the update should return
375          if (exProp != null)
376             throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_INTERNALERROR, "update", "An exception occured on a chunk when updating. " + updQos.toXml());
377          isLastChunk = isLastChunk(updQos);
378          
379          synchronized(this) {
380             consumeExceptionIfNotNull();
381             if (this.ret != null) {
382                clearQueue();
383                return ret;
384             }
385          }
386          
387          if (isLastChunk) { // no need to store the last message since sync return
388             if (isFirstChunk(updQos)) {
389                // TODO a sync to wait until cleared (the updateStraight after the sync, not inside).
390                try {
391                   return updateStraight(cbSessId, updKey, cont, updQos);
392                }
393                catch (IOException e) {
394                   throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: exception occured.", e);
395                }
396                
397             }
398             
399             try {
400                if (cont != null && cont.length > 0) {
401                   this.writer.write(this.out, cont);
402                }
403                
404                this.writer.close(this.out);
405                // wait until the client has returned his method.
406                try {
407                   mutex.acquire();
408                   consumeExceptionIfNotNull();
409                   clearQueue();
410                   return this.ret;
411                }
412                finally {
413                   mutex.release();
414                }
415             }
416             catch (InterruptedException e) {
417                throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
418             }
419             finally {
420                reset();
421             }
422          }
423          else { // it is not the last message
424             if (this.timer != null)
425                this.timestamp = this.timer.addTimeoutListener(this, this.waitForChunksTimeout, null);
426             try {
427                if (isFirstChunk(updQos)) {
428                   this.mutex.acquire();
429                   this.cbSessionId = cbSessId;
430                   this.out = new PipedOutputStream();
431                   this.in = new PipedInputStream(this.out);
432                   ExecutionThread thread = new ExecutionThread(cbSessId, updKey, cont, updQos);
433                   thread.start();
434                }
435                else { // check if the message is complete
436                   /*
437                   if (this.oldGroupId == null) {
438                      try {
439                         mutex.acquire();
440                         throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: The message is not the first of a group but the previous one was already completed.");
441                      }
442                      finally {
443                         mutex.release();
444                      }
445                   }
446                   */
447                }
448                this.writer.write(this.out, cont);
449             }
450             catch (InterruptedException e) {
451                throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
452             }
453             catch (IOException e) {
454                throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
455             }
456             if (doStore)
457                storeEntry(cbSessId, updKey, cont, updQos);
458             // and return a fake positive response.
459             return Constants.RET_OK;
460          }
461          
462       }
463       catch (XmlBlasterException e) {
464          try {
465             this.writer.close(this.out);
466          }
467          catch (InterruptedException e1) {
468             e1.printStackTrace();
469          }
470          this.lastMessageCompleted = true;
471          throw e;
472       }
473       catch (Throwable e) {
474          e.printStackTrace();
475          throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_HOLDBACK, "throwable in updateInternal", "", e);
476       }
477       finally {
478          if (isLastChunk) {
479             this.lastMessageCompleted = true;
480          }
481          log.fine("Leaving method");
482       }
483    }
484 
485    /**
486     * It is used here to inform the user update method that a timeout occured, it will throw
487     * an IOException when reading the in stream of the update method.
488     * @see org.xmlBlaster.util.I_Timeout#timeout(java.lang.Object)
489     */
490    public void timeout(Object userData) {
491       try {
492          this.writer.close(this.out);
493       }
494       catch (Throwable e) {
495          // we can not make it threadsafe so we must protect against possible NPE Exceptions
496          e.printStackTrace();
497       }
498       
499    }
500 
501    private final void clearQueue() {
502       if (queue != null) {
503          log.fine("Clear the queue " + this.queue.getStorageId());
504          queue.clear();
505       }
506    }
507 
508    /**
509     * Always makes a USER_UPDATE_HOLDBACK Exception out of it, no matter what the original exception 
510     * was.
511     * @param ex
512     */
513    private synchronized void setException(Throwable ex) {
514       if (ex instanceof XmlBlasterException) {
515          XmlBlasterException tmp = (XmlBlasterException)ex;
516          if (tmp.getErrorCode().equals(ErrorCode.USER_UPDATE_HOLDBACK))
517             this.ex = tmp;
518          else
519             this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex);
520       }
521       else {
522          this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex);
523       }
524    }
525    
526    /**
527     * returns the exception (if any) and resets it.
528     * @return
529     */
530    private synchronized void consumeExceptionIfNotNull() throws XmlBlasterException {
531       XmlBlasterException e = this.ex;
532       if (e != null) {
533          this.ex = null;
534          throw e;
535       }
536    }
537    
538    // implementation of interface I_ConnectionStateListener
539    
540    /**
541     * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedAlive(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
542     */
543    public synchronized void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
544       log.fine("I am alive now");
545       // only used on first connect after it is ignored.
546       if (this.initialized)
547          return;
548      
549       if (this.useQueue) {
550          log.info("going to instance the queue");
551          ConnectQos connectQos = connection.getConnectQos();
552          ClientQueueProperty prop = connectQos.getClientQueueProperty();
553          StorageId storageId = ((XmlBlasterAccess) connection).createStorageId(Constants.RELATING_CLIENT_UPDATE);
554          try {
555             this.queue = this.global.getQueuePluginManager().getPlugin(prop.getType(), prop.getVersion(), storageId,
556                   connectQos.getClientQueueProperty());
557             if (((XmlBlasterAccess)connection).isCallbackDispatcherActive())
558                sendInitialQueueEntries();
559          }
560          catch (XmlBlasterException e) {
561             log.severe("An exception occured when trying to initialize the callback client queue: " + e.getMessage());
562             e.printStackTrace();
563          }
564       }
565       
566       this.initialized = true;
567    }
568 
569    /* (non-Javadoc)
570     * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedDead(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
571     */
572    public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
573       // TODO Auto-generated method stub
574       
575    }
576 
577    /* (non-Javadoc)
578     * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedPolling(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
579     */
580    public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
581       // TODO Auto-generated method stub
582       
583    }
584    
585 }


syntax highlighted by Code2HTML, v. 0.9.1