1 /*------------------------------------------------------------------------------
  2 Name:      StreamingCallback.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 
  7 package org.xmlBlaster.client;
  8 
  9 import java.io.ByteArrayInputStream;
 10 import java.io.IOException;
 11 import java.io.OutputStream;
 12 import java.io.PipedInputStream;
 13 import java.io.PipedOutputStream;
 14 import java.util.List;
 15 import java.util.concurrent.LinkedBlockingQueue;
 16 import java.util.concurrent.locks.ReentrantLock;
 17 import java.util.logging.Level;
 18 import java.util.logging.Logger;
 19 
 20 import org.xmlBlaster.client.key.UpdateKey;
 21 import org.xmlBlaster.client.qos.ConnectQos;
 22 import org.xmlBlaster.client.qos.UpdateQos;
 23 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
 24 import org.xmlBlaster.jms.XBConnectionMetaData;
 25 import org.xmlBlaster.util.Global;
 26 import org.xmlBlaster.util.I_Timeout;
 27 import org.xmlBlaster.util.MsgUnit;
 28 import org.xmlBlaster.util.Timeout;
 29 import org.xmlBlaster.util.Timestamp;
 30 import org.xmlBlaster.util.XmlBlasterException;
 31 import org.xmlBlaster.util.def.Constants;
 32 import org.xmlBlaster.util.def.ErrorCode;
 33 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 34 import org.xmlBlaster.util.key.MsgKeyData;
 35 import org.xmlBlaster.util.qos.ClientProperty;
 36 import org.xmlBlaster.util.qos.MsgQosData;
 37 import org.xmlBlaster.util.qos.storage.ClientQueueProperty;
 38 import org.xmlBlaster.util.queue.I_Entry;
 39 import org.xmlBlaster.util.queue.I_Queue;
 40 import org.xmlBlaster.util.queue.StorageId;
 41 
 42 //import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 43 //import EDU.oswego.cs.dl.util.concurrent.Mutex;
 44 
 45 /**
 46  * StreamingCallback
 47  * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
 48  */
 49 public class StreamingCallback implements I_Callback, I_Timeout, I_ConnectionStateListener {
 50 
 51    /**
 52     * 
 53     * Writer needed since the out stream must be written from a thread which does not
 54     * die before the thread which reads the in counterpart. For some "strange" reason
 55     * the implementation of the Pipe streams makes a check if the thread which has 
 56     * made the last write operation on the out stream still is valid. If not, a Dead
 57     * End IO Exception is thrown when reading.
 58     * 
 59     * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
 60     */
 61    class Writer extends Thread {
 62 
 63       class WriterData extends ReentrantLock {
 64       private static final long serialVersionUID = -8014185442026247255L;
 65       private OutputStream outStrm;
 66          private byte[] data;
 67          private Throwable exception;
 68          
 69          public WriterData(OutputStream out, byte[] data) {
 70             super();
 71             this.outStrm = out;
 72             this.data = data;
 73          }
 74       }
 75 
 76       private LinkedBlockingQueue<WriterData> channel;
 77       
 78       public Writer(String name) {
 79          super(name);
 80          this.channel = new LinkedBlockingQueue<WriterData>();
 81          setDaemon(true);
 82          start();
 83       }
 84 
 85       public Writer() {
 86          super();
 87          this.channel = new LinkedBlockingQueue<WriterData>();
 88          setDaemon(true);
 89          start();
 90       }
 91       
 92       public synchronized void write(OutputStream outStream, byte[] buf) throws InterruptedException, XmlBlasterException {
 93          WriterData data = new WriterData(outStream, buf);
 94          try {
 95             data.lockInterruptibly();
 96             this.channel.put(data);
 97             data.lockInterruptibly(); // waits until the other thread is finished
 98             if (data.exception != null)
 99                throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "write: a throwable occured", "", data.exception);
100          }
101          finally {
102             data.unlock();
103          }
104       }
105 
106       public synchronized void close(OutputStream outStream) throws InterruptedException, XmlBlasterException {
107          WriterData data = new WriterData(outStream, null);
108          try {
109             data.lockInterruptibly();
110             this.channel.put(data);
111             data.lockInterruptibly(); // waits until the other thread is finished
112             if (data.exception != null)
113                throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "close: a throwable occured", "", data.exception);
114          }
115          finally {
116             data.unlock();
117          }
118       }
119 
120       /**
121        * @see java.lang.Thread#run()
122        */
123       public void run() {
124          while (true) {
125             try {
126                WriterData writerData = (WriterData)this.channel.take();
127                try {
128                   if (writerData.outStrm != null) {
129                      if (writerData.data != null) {
130 
131                         int bytesLeft = writerData.data.length;
132                         int bytesRead = 0;
133                         final int MAX_CHUNK_SIZE = 4096;
134                         while (bytesLeft > 0) {
135                            int toRead = bytesLeft > MAX_CHUNK_SIZE ? MAX_CHUNK_SIZE : bytesLeft;
136                            writerData.outStrm.write(writerData.data, bytesRead, toRead);
137                            writerData.outStrm.flush();
138                            bytesRead += toRead;
139                            bytesLeft -= toRead;
140                         }
141                         // writerData.out.write(0);
142                         // writerData.out.flush();
143                         
144                         // these would block (probably the pipes are not the best in the world
145                         // writerData.out.write(writerData.data);
146                         // writerData.out.flush();
147                      }
148                      else
149                         writerData.outStrm.close();
150                   }
151                }
152                catch (Throwable e) {
153                   writerData.exception = e;
154                }
155                finally {
156                   writerData.unlock();
157                }
158             }
159             catch (Throwable e) {
160                if (e.getMessage().indexOf("Pipe closed") < 0) {
161                   log.warning("An exception occured when writing to the stream: ' " + e.getMessage());
162                   e.printStackTrace();
163                }
164                else if (log.isLoggable(Level.FINE)) {
165                   log.fine("The pipe was closed, which resulted in an IO Exception. It can happen when the client has returned before reading the complete message");
166                   e.printStackTrace();
167                }
168             }
169          }
170       }
171    }
172    
173    
174    class ExecutionThread extends Thread {
175       
176       private String cbSessionId_;
177       private UpdateKey updateKey_;
178       private byte[] content_;
179       private UpdateQos updateQos_;
180       
181       public ExecutionThread(String cbSessId, UpdateKey updKey, byte[] content, UpdateQos updQos) {
182          this.cbSessionId_ = cbSessId;
183          this.updateKey_ = updKey;
184          this.content_ = content;
185          this.updateQos_ = updQos;
186          
187       }
188       
189       public void run() {
190          try {
191             ret = updateNewMessage(cbSessionId_, updateKey_, content_, updateQos_);
192             clearQueue();
193          }
194          catch (Throwable e) {
195             setException(e);
196             e.printStackTrace();
197          }
198          finally {
199             try {
200                if (in != null)
201                   in.close();
202             }
203             catch (IOException e) {
204                e.printStackTrace();
205             }
206             mutex.unlock();
207          }
208       }
209    };
210 
211    private static Logger log = Logger.getLogger(StreamingCallback.class.getName());
212    public final static String ENTRY_CB_SESSION_ID = "__entryCbSessionId";
213    
214    private I_StreamingCallback callback;
215 
216    private Global global;
217    private PipedOutputStream out;
218    private PipedInputStream in;
219    private XmlBlasterException ex;
220    private String ret;
221    private String cbSessionId;
222    private Writer writer;
223    /** The time to wait in ms until returning when waiting (if zero or negative inifinite) */
224    private long waitForChunksTimeout;
225    // private long waitForClientReturnTimeout;
226    private Timeout timer;
227    private Timestamp timestamp; // the key for the timeout timer (can be null)
228    private I_Queue queue; // optional client side queue
229    private boolean useQueue;
230    private boolean initialized;
231    private boolean lastMessageCompleted = true;
232    private final ReentrantLock mutex;
233    
234    private void reset() throws XmlBlasterException {
235       this.out = null;
236       this.in = null;
237       this.ret = null;
238       this.cbSessionId = null;
239    }
240    
241    public StreamingCallback(Global global, I_StreamingCallback callback) throws XmlBlasterException {
242       this(global, callback, 0L, 0L, false);
243    }
244    
245    /**
246     * 
247     * @param callback
248     */
249    public StreamingCallback(Global global, I_StreamingCallback callback, long waitForChunksTimeout, long waitForClientReturnTimeout, boolean useQueue) 
250       throws XmlBlasterException {
251       this.callback = callback;
252       this.global = global;
253       this.mutex = new ReentrantLock();
254       String writerName = StreamingCallback.class.getName() + "-writer";
255       synchronized(this.global) {
256          this.writer = (Writer)this.global.getObjectEntry(writerName);
257          if (this.writer == null) {
258             this.writer = new Writer();
259             this.global.addObjectEntry(writerName, this.writer);
260          }
261       }
262       this.waitForChunksTimeout = waitForChunksTimeout;
263       // this.waitForClientReturnTimeout = waitForClientReturnTimeout;
264       if (this.waitForChunksTimeout > 0L) {
265          String timerName = StreamingCallback.class.getName() + "-timer";
266          synchronized(this.global) {
267             this.timer = (Timeout)this.global.getObjectEntry(timerName);
268             if (this.timer == null) {
269                this.timer = new Timeout(timerName);
270                this.global.addObjectEntry(timerName, this.timer);
271             }
272          }
273       }
274       this.useQueue = useQueue;
275       // TODO latch until connected to avoit early updates
276    }
277    
278    /**
279     * 
280     * @return the number of delivered entries from local client update queue.
281     */
282    public final int sendInitialQueueEntries() throws XmlBlasterException {
283       if (this.queue == null)
284          return 0;
285       List<I_Entry> list = this.queue.peek(-1, -1L);
286       for (int i=0; i < list.size(); i++) {
287          MsgQueuePublishEntry entry = (MsgQueuePublishEntry)list.get(i);
288          MsgKeyData key = entry.getMsgKeyData();
289          MsgQosData qos =(MsgQosData)entry.getMsgUnit().getQosData();
290          byte[] cont = entry.getMsgUnit().getContent();
291          String entryCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null);
292          qos.getClientProperties().remove(ENTRY_CB_SESSION_ID);
293          final boolean isExternal = false; // we don't want to store these entries since already here
294          updateInternal(entryCbSessionId, new UpdateKey(key), cont, new UpdateQos(this.global, qos), isExternal);
295       }
296       this.queue.clear();
297       return list.size();
298    }
299    
300    private final void storeEntry(String cbSessId, UpdateKey key, byte[] cont, UpdateQos qos) throws XmlBlasterException {
301       if (this.queue == null)
302          return;
303       final boolean ignorePutInterceptor = false;
304       if (cbSessId != null) {
305          String oldCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null);
306          if (oldCbSessionId != null && !oldCbSessionId.equals(cbSessId)) {
307             log.warning("the client property '" + ENTRY_CB_SESSION_ID + "' is a reserved word, we will overwrite its value='" + oldCbSessionId + "' to be '" + cbSessionId + "'");
308             ClientProperty prop = new ClientProperty(ENTRY_CB_SESSION_ID, null, null, cbSessId);
309             qos.getClientProperties().put(prop.getName(), prop);
310          }
311       }
312       MsgUnit msgUnit = new MsgUnit(key.getData(), cont, qos.getData());
313       MsgQueuePublishEntry entry = new MsgQueuePublishEntry(this.global, msgUnit, this.queue.getStorageId());
314       this.queue.put(entry, ignorePutInterceptor);
315    }
316    
317    /**
318     * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
319     */
320    public String updateStraight(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException {
321       log.fine("cbSessionId='" + cbSessId + "'");
322       ByteArrayInputStream bais = new ByteArrayInputStream(cont);
323       return this.callback.update(cbSessId, updKey, bais, updQos);
324    }
325    
326    /**
327     * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
328     */
329    public String updateNewMessage(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException {
330       log.fine("cbSessionId='" + cbSessId + "'");
331       return this.callback.update(cbSessId, updKey, in, updQos);
332    }
333 
334    private final boolean isFirstChunk(UpdateQos qos) {
335       int seq = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_SEQ, log), 0);
336       return seq == 0;
337    }
338    
339    private final boolean isLastChunk(UpdateQos qos) {
340       boolean hasGroupId = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_ID, log), (String)null) != null;
341       if (!hasGroupId)
342          return true;
343       return qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_EOF, log), false);
344    }
345    
346    private final ClientProperty getProp(String key, UpdateQos qos) {
347       return qos.getClientProperty(Constants.addJmsPrefix(key, log));
348    }
349    
350    /**
351     * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
352     */
353    public String update(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException {
354       boolean sendInitial = this.queue != null && this.lastMessageCompleted && this.queue.getNumOfEntries() > 0; 
355       if (sendInitial)
356          sendInitialQueueEntries();
357       
358       final boolean isExternal = true;
359       log.fine("cbSessionId='" + cbSessId + "'");
360       return updateInternal(cbSessId, updKey, cont, updQos, isExternal);
361    }
362    
363    /**
364     * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
365     */
366    private final String updateInternal(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos, boolean isExternal) throws XmlBlasterException {
367       this.lastMessageCompleted = false;
368       boolean doStore = isExternal;
369       boolean isLastChunk = false;
370       try {
371          log.fine("entering with cbSessionId='" + cbSessId + "'");
372          if (this.timer != null && this.timestamp != null) { // no need to be threadsafe since update is single thread
373             this.timer.removeTimeoutListener(this.timestamp);
374             this.timestamp = null;
375          }
376          ClientProperty exProp = getProp(XBConnectionMetaData.JMSX_GROUP_EX, updQos);
377          // TODO Check if this exception really should be thrown: I think it shall not be thrown since it is an exception
378          // which occured when publishing and this is the information that the update should return
379          if (exProp != null)
380             throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_INTERNALERROR, "update", "An exception occured on a chunk when updating. " + updQos.toXml());
381          isLastChunk = isLastChunk(updQos);
382          
383          synchronized(this) {
384             consumeExceptionIfNotNull();
385             if (this.ret != null) {
386                clearQueue();
387                return ret;
388             }
389          }
390          
391          if (isLastChunk) { // no need to store the last message since sync return
392             if (isFirstChunk(updQos)) {
393                // TODO a sync to wait until cleared (the updateStraight after the sync, not inside).
394                try {
395                   return updateStraight(cbSessId, updKey, cont, updQos);
396                }
397                catch (IOException e) {
398                   throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: exception occured.", e);
399                }
400                
401             }
402             
403             try {
404                if (cont != null && cont.length > 0) {
405                   this.writer.write(this.out, cont);
406                }
407                
408                this.writer.close(this.out);
409                // wait until the client has returned his method.
410                try {
411                   mutex.lockInterruptibly();
412                   consumeExceptionIfNotNull();
413                   clearQueue();
414                   return this.ret;
415                }
416                finally {
417                   mutex.unlock();
418                }
419             }
420             catch (InterruptedException e) {
421                throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
422             }
423             finally {
424                reset();
425             }
426          }
427          else { // it is not the last message
428             if (this.timer != null)
429                this.timestamp = this.timer.addTimeoutListener(this, this.waitForChunksTimeout, null);
430             try {
431                if (isFirstChunk(updQos)) {
432                   this.mutex.lockInterruptibly();
433                   this.cbSessionId = cbSessId;
434                   this.out = new PipedOutputStream();
435                   this.in = new PipedInputStream(this.out);
436                   ExecutionThread thread = new ExecutionThread(cbSessId, updKey, cont, updQos);
437                   thread.start();
438                }
439                else { // check if the message is complete
440                   /*
441                   if (this.oldGroupId == null) {
442                      try {
443                         mutex.lockInterruptibly();
444                         throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: The message is not the first of a group but the previous one was already completed.");
445                      }
446                      finally {
447                         mutex.unlock();
448                      }
449                   }
450                   */
451                }
452                this.writer.write(this.out, cont);
453             }
454             catch (InterruptedException e) {
455                throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
456             }
457             catch (IOException e) {
458                throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
459             }
460             if (doStore)
461                storeEntry(cbSessId, updKey, cont, updQos);
462             // and return a fake positive response.
463             return Constants.RET_OK;
464          }
465          
466       }
467       catch (XmlBlasterException e) {
468          try {
469             this.writer.close(this.out);
470          }
471          catch (InterruptedException e1) {
472             e1.printStackTrace();
473          }
474          this.lastMessageCompleted = true;
475          throw e;
476       }
477       catch (Throwable e) {
478          e.printStackTrace();
479          throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_HOLDBACK, "throwable in updateInternal", "", e);
480       }
481       finally {
482          if (isLastChunk) {
483             this.lastMessageCompleted = true;
484          }
485          log.fine("Leaving method");
486       }
487    }
488 
489    /**
490     * It is used here to inform the user update method that a timeout occured, it will throw
491     * an IOException when reading the in stream of the update method.
492     * @see org.xmlBlaster.util.I_Timeout#timeout(java.lang.Object)
493     */
494    public void timeout(Object userData) {
495       try {
496          this.writer.close(this.out);
497       }
498       catch (Throwable e) {
499          // we can not make it threadsafe so we must protect against possible NPE Exceptions
500          e.printStackTrace();
501       }
502       
503    }
504 
505    private final void clearQueue() {
506       if (queue != null) {
507          log.fine("Clear the queue " + this.queue.getStorageId());
508          queue.clear();
509       }
510    }
511 
512    /**
513     * Always makes a USER_UPDATE_HOLDBACK Exception out of it, no matter what the original exception 
514     * was.
515     * @param ex
516     */
517    private synchronized void setException(Throwable ex) {
518       if (ex instanceof XmlBlasterException) {
519          XmlBlasterException tmp = (XmlBlasterException)ex;
520          if (tmp.getErrorCode().equals(ErrorCode.USER_UPDATE_HOLDBACK))
521             this.ex = tmp;
522          else
523             this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex);
524       }
525       else {
526          this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex);
527       }
528    }
529    
530    /**
531     * returns the exception (if any) and resets it.
532     * @return
533     */
534    private synchronized void consumeExceptionIfNotNull() throws XmlBlasterException {
535       XmlBlasterException e = this.ex;
536       if (e != null) {
537          this.ex = null;
538          throw e;
539       }
540    }
541    
542    // implementation of interface I_ConnectionStateListener
543    
544    /**
545     * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedAlive(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
546     */
547    public synchronized void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
548       log.fine("I am alive now");
549       // only used on first connect after it is ignored.
550       if (this.initialized)
551          return;
552      
553       if (this.useQueue) {
554          log.info("going to instance the queue");
555          ConnectQos connectQos = connection.getConnectQos();
556          ClientQueueProperty prop = connectQos.getClientQueueProperty();
557          StorageId storageId = ((XmlBlasterAccess) connection).createStorageId(Constants.RELATING_CLIENT_UPDATE);
558          try {
559             this.queue = this.global.getQueuePluginManager().getPlugin(prop.getType(), prop.getVersion(), storageId,
560                   connectQos.getClientQueueProperty());
561             if (((XmlBlasterAccess)connection).isCallbackDispatcherActive())
562                sendInitialQueueEntries();
563          }
564          catch (XmlBlasterException e) {
565             log.severe("An exception occured when trying to initialize the callback client queue: " + e.getMessage());
566             e.printStackTrace();
567          }
568       }
569       
570       this.initialized = true;
571    }
572 
573    /**
574     * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedAlive(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
575     */
576    public synchronized void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
577       log.fine("I am alive and sync now");
578    }
579 
580    /* (non-Javadoc)
581     * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedDead(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
582     */
583    public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
584       // TODO Auto-generated method stub
585       
586    }
587 
588    /* (non-Javadoc)
589     * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedPolling(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
590     */
591    public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
592       // TODO Auto-generated method stub
593       
594    }
595    
596 }


syntax highlighted by Code2HTML, v. 0.9.1