1 package org.xmlBlaster.test.memoryleak;
  2 import org.xmlBlaster.util.ThreadLister;
  3 import org.xmlBlaster.util.Global;
  4 import org.xmlBlaster.util.XmlBlasterException;
  5 import org.xmlBlaster.client.qos.ConnectQos;
  6 import org.xmlBlaster.client.qos.DisconnectQos;
  7 import org.xmlBlaster.client.qos.ConnectReturnQos;
  8 import org.xmlBlaster.util.qos.SessionQos;
  9 import org.xmlBlaster.client.I_XmlBlasterAccess;
 10 import org.xmlBlaster.client.I_Callback;
 11 import org.xmlBlaster.client.key.UpdateKey;
 12 import org.xmlBlaster.client.qos.UpdateQos;
 13 import org.xmlBlaster.util.MsgUnit;
 14 import org.xmlBlaster.test.Util;
 15 
 16 import junit.framework.*;
 17 
 18 import java.util.logging.Logger;
 19 import java.util.logging.Level;
 20 import java.util.ArrayList;
 21 import java.util.Iterator;
 22 import java.io.File;
 23 import java.io.FileReader;
 24 import java.io.BufferedReader;
 25 
 26 /**
 27  * This does a twofold test by creating a number of connection, an
 28  * almost infinite number of times. It tests two stuff:
 29  * 1. if the client side is leaking threads.
 30  * 2. if Jacorb contains a locking bug.
 31  *
 32  * @author <a href="mailto:pra@tim.se">Peter Antman</a>
 33  * @version $Revision: 1.4 $
 34  */
 35 
 36 public class TestThreadLeak extends TestCase implements I_Callback {
 37    private static String ME = "TestThreadLeak";
 38    private final Global glob;
 39    private static Logger log = Logger.getLogger(TestThreadLeak.class.getName());
 40    private String fileName;
 41    private int noConnections = 10;
 42    private boolean noError = true;
 43    private ArrayList connections = new ArrayList();
 44    private int maxThreadDiff = 500;
 45    private String pid;
 46    private String osName;
 47    /** Time a connection should live, before beeing taken down */
 48    private long cttl = 5000;
 49    
 50    public TestThreadLeak (Global glob, String testName) throws Exception{
 51       super(testName);
 52       this.glob = glob;
 53 
 54       fileName = glob.getProperty().get("pidFileName", (String)null);
 55    }
 56 
 57    /**
 58     * Sets up the fixture. 
 59    */
 60    protected void setUp() throws Exception 
 61    {
 62       String[] args = {
 63          "-protocol", 
 64          "SOCKET", //"SOCKET",
 65          "-session.maxSessions",
 66          "20"
 67       };
 68       glob.init(args);
 69 
 70       // if we have a filename where a pid is, wait until that file has shown up
 71       // But now more that 5 times
 72       if ( fileName != null) {
 73          int i = 0;
 74          File file = new File(fileName);
 75          while (!file.exists()) {
 76          i++;
 77          if ( i > 4) {
 78             Assert.fail("We where given a pid filename " + file + " but could not find it, giving up");
 79          } // end of if ()
 80          
 81          Thread.sleep(2000);
 82          } // end of while ()
 83          BufferedReader r = new BufferedReader(new FileReader(file));
 84          pid = r.readLine();
 85       } // end of if ()
 86 
 87       osName = System.getProperty("os.name");
 88 
 89    }
 90    
 91 
 92    void dumpThreadStack() throws Exception {
 93       if ( pid != null && !osName.startsWith("Window")) {
 94          Runtime runtime = Runtime.getRuntime();
 95          Process p = runtime.exec("kill -3 " + pid);
 96          p.waitFor();
 97       } else {
 98          log.info("Could not dump stack pid="+pid+" os="+osName);
 99       } // end of else
100       
101 
102    }
103 
104    void handleLock(ConnectorWorker wr) throws Exception {
105       dumpThreadStack();
106       //Assert.fail("Thread lock in connector worker "+wr + " giving up");
107    }
108 
109    public void testThreadLeakage() throws Exception {
110       int startNoThreads = -1;
111       int lastNoThreads = -1;
112       int round = 0;
113       while ( noError ) {
114          round++;
115          log.info("Doing a new connection round no " + round);
116          for ( int i = 0; i< noConnections;i++) {
117             ConnectorWorker conn = new ConnectorWorker(glob, cttl);
118             connections.add(conn);
119          } // end of for ()
120          
121          // Wait a while
122          System.gc();
123          Thread.sleep(1000);
124          System.gc();
125          Thread.sleep(1000);
126          
127          // Count threads, if more than maxThreadDiff has been created since
128          // the first round: fail
129          int noThreads = ThreadLister.countThreads();
130          if (startNoThreads != -1 ) {
131             startNoThreads = noThreads;
132             lastNoThreads = noThreads;
133          } else {
134             // Check how many since first round
135             int firstDiff = noThreads - startNoThreads;
136             int lastDiff = noThreads - lastNoThreads;
137             log.info("No of thread created since start:"+firstDiff+"; number of threads created since last round: " + lastDiff);
138             lastNoThreads = noThreads;
139             if ( firstDiff > maxThreadDiff) {
140                ThreadLister.listAllThreads(System.out);
141                Assert.fail("Max number of new threads reached " +firstDiff +
142                            " number of threads created since first round: XmlBlaster is leaking huge numbers of threads. Happened in round " + round);
143             } // end of if ()
144             
145          } // end of else
146          
147          // Wait a  while for connections to stop
148          Thread.sleep( noConnections*1000 );
149          
150          // Check that all connections are finished.
151          Iterator c = connections.iterator();
152          while ( c.hasNext() ) {
153             ConnectorWorker w = (ConnectorWorker)c.next();
154 
155             // Check that its NOT alive
156             if ( w.isAlive() ) {
157                // Opps, do we have a lock here
158 
159                // We give it five rounds if its still in Connecting state we abort
160                int j = 0;
161                while (w.isAlive() && j < 4) {
162                   log.warning("Possible lock of connection " + w + " detected, waiting 30 s round "+j);
163                   j++;
164                   Thread.sleep(30*1000);
165                   if ( j > 3) {
166                      log.severe("Possible lock of connection " + w + " detected, aborting");
167                      noError = false;
168                      handleLock(w);
169                   } // end of if ()
170                   
171                } // end of while ()
172                
173             } // end of if ()
174             Throwable t = w.getException();
175             if ( t != null) {
176                log.severe("Connection had exception, giving up : "+t);
177                t.printStackTrace();
178                Assert.fail("Connection had exception, giving up : "+t);
179             } // end of if ()
180             
181          } // end of while ()
182          connections.clear();
183 
184 
185 
186       } // end of while ()
187       
188 
189 
190    }
191 
192    /**
193     * This is the callback method invoked from xmlBlaster
194     * delivering us a new asynchronous message. 
195     * @see org.xmlBlaster.client.I_Callback#update(String, UpdateKey,
196 byte[], UpdateQos)
197     */
198    public String update(String cbSessionId, UpdateKey updateKey, byte[]
199 content, UpdateQos updateQos)
200    {
201       log.info("Receiving update of a message " + updateKey.getOid() + " for subId: " + updateQos.getSubscriptionId() );
202       log.fine("Got message " + new String(content));
203       return "";
204    }
205    /**
206     * Method is used by TestRunner to load these tests
207     */
208    public static Test suite() throws Exception 
209    {
210       
211       TestSuite suite= new TestSuite();
212       suite.addTest(new TestThreadLeak(new Global(),
213 "testThreadLeakage"));
214       return suite;
215    }
216 
217    /**
218     * Invoke: 
219     * <pre>
220     *   java org.xmlBlaster.test.mime.TestXPathSubscribeFilter
221     *   java -Djava.compiler= junit.textui.TestRunner -noloading
222 org.xmlBlaster.test.mime.TestXPathSubscribeFilter
223     * <pre>
224     */
225    public static void main(String args[]) throws Exception 
226    {
227       try {
228          Global glob = new Global();
229          if (glob.init(args) != 0) {
230             System.err.println(ME + ": Init failed");
231             System.exit(1);
232          }
233          TestThreadLeak testSub = new TestThreadLeak(glob, "testThreadLeak");
234          testSub.setUp();
235          testSub.testThreadLeakage();
236          testSub.tearDown();
237       } catch (Throwable e) {
238          e.printStackTrace();
239          System.exit(0);
240       } // end of try-catch
241 
242    }
243    
244 
245    /**
246     * Connect to XmlBlaster and disconnect after timeout milis.
247     */
248    class ConnectorWorker implements Runnable {
249       Global glob;
250       long timeout;
251       private I_XmlBlasterAccess con = null;
252       Thread internalThread;
253       Throwable ie;
254       ConnectReturnQos retQos;
255       volatile String state = "CREATED";
256       volatile long started;
257       public ConnectorWorker(Global glob, long timeout) {
258          this.glob = glob.getClone(null);
259          this.timeout = timeout;
260          internalThread = new Thread(this);
261          internalThread.start();
262       }
263 
264       public void run() {
265          started = System.currentTimeMillis();
266          state = "RUNNING";
267          if ( Thread.currentThread() != internalThread ) {
268             ie = new RuntimeException("Only internal thread allowed");
269             throw (RuntimeException)ie;
270          } // end of if ()
271          try {
272             // Connect
273             state = "CONNECTING";
274             con = glob.getXmlBlasterAccess();
275             ConnectQos qos = new ConnectQos(glob, "test", "dummy");
276 
277             retQos = con.connect(qos, TestThreadLeak.this); // Login to xmlBlaster
278             log.info("Connected "+ this);
279             state = "CONNECTED";
280             Thread.sleep(timeout);
281 
282             // Disconnect
283             state = "DISCONNECTING";
284             con.disconnect(null);
285             state = "DISCONNECTED";
286             con=null;
287             glob.shutdown();
288             glob = null;
289          } catch (Throwable e) {
290             ie = e;
291             log.severe("Giving up " + e);
292          } // end of try-catch
293          
294          
295       }
296       public long getAgeSeconds() {
297          return (System.currentTimeMillis()-started)/1000;
298          
299       }
300 
301       public String getState() {
302          return state;
303       }
304 
305       public String toString() {
306          String s = super.toString();
307          String q = "";
308          if ( retQos != null) {
309             SessionQos ses = retQos.getSessionQos() ;
310             q =  "secretId="+ses.getSecretSessionId() + " publicId="+ses.getPublicSessionId();
311          } // end of if ()
312          
313          return s+":"+state+" age="+getAgeSeconds()+" seconds old, (thread="+internalThread.getName()+") "+q;
314       }
315       
316       public Throwable getException() {
317          return ie;
318       }
319 
320       public boolean isAlive() {
321          return internalThread.isAlive();
322       }
323       
324    }
325 } // TestThreadLeak


syntax highlighted by Code2HTML, v. 0.9.1