[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [xmlblaster] Missing messages



Amendment on my fix:
TopicHandler: moved destroyTimer check point from "private ArrayList timeout()" to "public final void timeout(Object userData)" in order to enable the immediate destruction when destroyDelay = 0.


My approach still leaves some gap in order to avoid dead lock:
After a publishing thread unset the destroyTimer and just began to do actual publishing, another publishing thread just finished its job and could reset the destroyTimer.
In my testing, the chance didn't occur, but it's possible theoretically. So my approach only reduced the chance but not solved the problem completely.



Eugene Wu wrote:

Hi, there:

Figured out one cause for the issue. It could happen when a pulishing thread and a timeout thread work on a same TopicHandler instance.

For example, after thread A (publishing) retrieved a topicHandler from topicHandlerMap in line 1695 of RoquestBroker:
Object obj = topicHandlerMap.get(msgUnit.getKeyOid());


the topicHandler's publish method would be invoked. As a synchronized lock on the topicHandler can not be applyed on the whole pubish() method (dead-lock concerns), another thread B (timeout) could get a chance to invoke timeout(Object userData) method of the topicHandler before thread A sets the topicHandler's state to ALIVE. Therefore, the topicHandler could be publishing a message and, at the same time, also could be destroyed, which caused unpredictable results.

In my fix (attached), both publishing thread and timeout thread have a check point, in which three things are done to let a slightly ahead thread be able to prevent a slightly behind competing thread from running:

1) Get the lock on the topicHandler (synchronized lock)
2) Check if the topicHandler is working with a competing thread. If yes, stop the current thread for timeout or stop using the handler for publishing. Otherwise, goto step 3.
3) Setup a check mark to prevent its competing thread running. The mark must be cleared by the framework in a proper time later on.


Here are more detailed scenarios:

a) Publishing thread ahead
After thread A got the topicHandler, its state will be checked. If it is dead (and should be destroyed now because of a synchronized lock in timeout method), a new TopicHandler will be created. If the retrieved topicHandle is not dead, its destroyTimer will be removed. Later on, thread B will be stoped at the point where it check for the destroyTimer's null value.


The destroyTimer will be reset by the framework when the topicHandler is turned to UNREFERENCED.

b) Timeout thread ahead
After thread B got the topicHandler, it will check for the existence of destroyTimer. If not existed, stop the timeout event. Otherwise, the thread will continue. Because of a synchronized lock, it will set the topicHandler as DEAD eventually, which will prevent thread A from using the obsolete handler.


   A dead topicHandle will be garbage collected. (need to confirm this!)


Regards, Eugene

Index: org/xmlBlaster/engine/TopicHandler.java
===================================================================
--- org/xmlBlaster/engine/TopicHandler.java	(revision 15178)
+++ org/xmlBlaster/engine/TopicHandler.java	(working copy)
 at  at  -1946,6 +1946,14  at  at 
       return notifyList;
    }
 
+   void unsetDestroyTimer() {
+      if (this.timerKey != null) {
+         log.finer("Unset destroyTimer for TopicHandler: "+this.getUniqueKey());
+         this.destroyTimer.removeTimeoutListener(this.timerKey);
+         this.timerKey = null;
+      }
+   }
+   
    /**
     * Merge the message DOM tree into the big xmlBlaster DOM tree
     */
 at  at  -2196,8 +2204,16  at  at 
     * This timeout occurs after a configured delay (destroyDelay) in UNREFERENCED state
     */
    public final void timeout(Object userData) {
-      if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy delay occurred - destroying topic now ...");
-      ArrayList notifyList = timeout();
+      if (log.isLoggable(Level.FINER)) log.finer("Timeout after destroy delay occurred - destroying topic (oid="+getUniqueKey()+") now ...");
+      ArrayList notifyList = null;
+      synchronized (this) {
+         if (timerKey==null) {
+            log.finer("Ignored timeout event for the timer was unset by a publishing thread");
+            return;
+         }
+         notifyList = timeout();
+      }
+      
       if (notifyList != null) notifySubscribersAboutErase(notifyList); // must be outside the synchronize
    }
 
Index: org/xmlBlaster/engine/RequestBroker.java
===================================================================
--- org/xmlBlaster/engine/RequestBroker.java	(revision 15178)
+++ org/xmlBlaster/engine/RequestBroker.java	(working copy)
 at  at  -1686,6 +1686,7  at  at 
          // Handle local message
 
          // Find or create the topic
+         boolean newlycreated = false;
          TopicHandler topicHandler = null;
          synchronized(this.topicHandlerMap) {
             if (!msgKeyData.getOid().equals(msgUnit.getKeyOid())) {
 at  at  -1695,11 +1696,25  at  at 
             Object obj = topicHandlerMap.get(msgUnit.getKeyOid());
             if (obj == null) {
                topicHandler = new TopicHandler(this, sessionInfo, msgUnit.getKeyOid()); // adds itself to topicHandlerMap
+               newlycreated = true;
             }
             else {
                topicHandler = (TopicHandler)obj;
             }
          }
+         
+         if (!newlycreated && !topicHandler.isAlive()) {
+            log.finer("The topic is not ALIVE, need to check if it is dying or not");
+            synchronized(topicHandler) { // wiating if the topicHandler is dying.
+               if (topicHandler.isDead()) {
+                  log.finer("The topic is DEAD, need to create a new one");
+                  topicHandler = new TopicHandler(this, sessionInfo, msgUnit.getKeyOid());
+               }
+               else {
+                  topicHandler.unsetDestroyTimer(); // to provent the topicHandler from being timed out
+               }
+            }
+         }
 
          // Process the message
          publishReturnQos = topicHandler.publish(sessionInfo, msgUnit, publishQos);