1 package org.xmlBlaster.test.cluster;
2
3 import java.util.logging.Logger;
4 import java.util.logging.Level;
5 import org.xmlBlaster.util.Global;
6
7 // for client connections:
8 import org.xmlBlaster.util.*;
9 import org.xmlBlaster.client.I_Callback;
10 import org.xmlBlaster.client.key.PublishKey;
11 import org.xmlBlaster.client.key.EraseKey;
12 import org.xmlBlaster.client.key.GetKey;
13 import org.xmlBlaster.client.key.SubscribeKey;
14 import org.xmlBlaster.client.key.UnSubscribeKey;
15 import org.xmlBlaster.client.key.UpdateKey;
16 import org.xmlBlaster.client.qos.PublishQos;
17 import org.xmlBlaster.client.qos.PublishReturnQos;
18 import org.xmlBlaster.client.qos.UpdateQos;
19 import org.xmlBlaster.client.qos.SubscribeQos;
20 import org.xmlBlaster.client.qos.SubscribeReturnQos;
21 import org.xmlBlaster.client.qos.UnSubscribeQos;
22 import org.xmlBlaster.client.qos.EraseQos;
23 import org.xmlBlaster.client.I_XmlBlasterAccess;
24 import org.xmlBlaster.util.MsgUnit;
25 import org.xmlBlaster.util.qos.address.Destination;
26
27
28 import junit.framework.*;
29
30 /**
31 * Test publishing a message from bilbo to heron.
32 * <p />
33 * <pre>
34 * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.cluster.PtPTest
35 * </pre>
36 * NOTE: asserts() in update() methods are routed back to server and are not handled
37 * by the junit testsuite, so we check double (see code).
38 * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">Cluster requirement</a>
39 */
40 public class PtPTest extends TestCase {
41 private String ME = "PtPTest";
42 private Global glob;
43 private static Logger log = Logger.getLogger(PtPTest.class.getName());
44 private ServerHelper serverHelper;
45
46 private I_XmlBlasterAccess heronCon, avalonCon, golanCon, frodoCon, bilboCon;
47
48 private int updateCounterHeron = 0;
49 private int updateCounterFrodo = 0;
50 private int updateCounterBilbo = 0;
51 private String oid = "PublishToBilbo";
52 private String domain = "RUGBY_NEWS"; // heron is master for RUGBY_NEWS
53 private String contentStr = "We win";
54
55 private String assertInUpdateHeron = null;
56 private String assertInUpdateBilbo = null;
57
58 public PtPTest(String name) {
59 super(name);
60 this.glob = new Global(null, true, false);
61 }
62
63 /**
64 * Initialize the test ...
65 */
66 protected void setUp() {
67
68 log.info("Entering setUp(), test starts");
69
70 serverHelper = new ServerHelper(glob, log, ME);
71
72 // Starts a cluster node
73 serverHelper.startHeron();
74 //serverHelper.startAvalon();
75 //serverHelper.startGolan();
76 serverHelper.startFrodo();
77 serverHelper.startBilbo();
78 }
79
80 /**
81 * cleaning up ...
82 */
83 protected void tearDown() {
84 log.info("Entering tearDown(), test is finished");
85 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
86
87 if (bilboCon != null) { bilboCon.disconnect(null); bilboCon = null; }
88 if (frodoCon != null) { frodoCon.disconnect(null); frodoCon = null; }
89 if (golanCon != null) { golanCon.disconnect(null); golanCon = null; }
90 if (avalonCon != null) { avalonCon.disconnect(null); avalonCon = null; }
91 if (heronCon != null) { heronCon.disconnect(null); heronCon = null; }
92
93 serverHelper.tearDown();
94 }
95
96 /**
97 * We start bilbo, frodo and heron nodes as described in requirement
98 * PtP messages are routed from ClientTo[bilbo] -> bilbo -> frodo -> heron -> ClientTo[heron]
99 * <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/cluster.html" target="others">cluster</a>
100 * publish a message to bilbo which should be routed to client XX which is logged in to heron.
101 */
102 public void testPublishPtP() {
103 System.err.println("***PtPTest: Publish a message to a cluster slave ...");
104 try {
105 log.info("Login to heron and wait for PtP message ...");
106 heronCon = serverHelper.connect(serverHelper.getHeronGlob(), new I_Callback() { // Login to xmlBlaster, register for updates
107 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
108 log.info("Received message '" + updateKey.getOid() + "' state=" +
109 updateQos.getState() + " from '" + updateQos.getSender() + "'");
110 if (!updateQos.getSender().equalsAbsolute(bilboCon.getConnectReturnQos().getSessionName())) {
111 assertInUpdateHeron = serverHelper.getHeronGlob().getId() + ": Did not expect message update in default handler";
112 }
113 updateCounterHeron++;
114 return "";
115 }
116 });
117 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
118 assertTrue(assertInUpdateHeron, assertInUpdateHeron == null);
119 assertInUpdateHeron = null;
120
121 log.info("Login to bilbo to send PtP message ...");
122 bilboCon = serverHelper.connect(serverHelper.getBilboGlob(), new I_Callback() { // Login to xmlBlaster, register for updates
123 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) {
124 assertInUpdateBilbo = serverHelper.getBilboGlob().getId() + ": Should not receive the message '" + updateKey.getOid() + "'";
125 fail(assertInUpdateBilbo); // This is routed to server, not to junit
126 return "";
127 }
128 });
129 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
130 assertTrue(assertInUpdateBilbo, assertInUpdateBilbo == null);
131 assertInUpdateBilbo = null;
132
133 int num = 5;
134 for (int i=0; i<num; i++) {
135 PublishKey pk = new PublishKey(glob, oid, "text/plain", "1.0");
136 pk.setDomain("RUGBY_NEWS"); // heron is master: need for routing as heron is not directly connected to us
137 PublishQos pq = new PublishQos(glob);
138 SessionName sessionName = heronCon.getConnectReturnQos().getSessionName(); // destination client
139 Destination destination = new Destination(sessionName);
140 destination.forceQueuing(true);
141 pq.addDestination(destination);
142 log.info("Sending PtP message '" + oid + "' from bilbo to '" + sessionName + "' :" + pq.toXml());
143 MsgUnit msgUnit = new MsgUnit(pk, (contentStr+"-"+i).getBytes(), pq);
144 PublishReturnQos prq = bilboCon.publish(msgUnit);
145 log.info("Published message to destination='" + sessionName +
146 "' content='" + (contentStr+"-"+i) +
147 "' to xmlBlaster node with IP=" + serverHelper.getBilboGlob().getProperty().get("bootstrapPort",0) +
148 ", the returned QoS is: " + prq.getKeyOid());
149 }
150
151 try { Thread.sleep(1000); } catch( InterruptedException i) {} // Wait some time
152 assertTrue(assertInUpdateHeron, assertInUpdateHeron == null);
153 assertEquals("Heron client did not receive PtP message", num, updateCounterHeron);
154 }
155 catch (XmlBlasterException e) {
156 e.printStackTrace();
157 fail("PublishToBilbo-Exception: " + e.getMessage());
158 }
159
160 System.err.println("***PtPTest: testPublishPtP [SUCCESS]");
161 }
162
163 /**
164 * Invoke:
165 * <pre>
166 * java -Dtrace[cluster]=true -Dcall[cluster]=true -Dcall[core]=true org.xmlBlaster.test.cluster.PtPTest
167 * java -Djava.compiler= junit.textui.TestRunner -noloading org.xmlBlaster.test.cluster.PtPTest
168 * <pre>
169 */
170 public static void main(String args[]) {
171 Global glob = new Global(null, true, false);
172 if (glob.init(args) != 0) {
173 System.exit(0);
174 }
175 PtPTest testSub = new PtPTest("PtPTest");
176 testSub.setUp();
177 testSub.testPublishPtP();
178 testSub.tearDown();
179 }
180 }
syntax highlighted by Code2HTML, v. 0.9.1