1 /*------------------------------------------------------------------------------
2 Name: ClientDispatchConnection.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.client.dispatch;
7
8 import java.util.logging.Level;
9 import java.util.logging.Logger;
10
11 import org.xmlBlaster.authentication.plugins.CryptDataHolder;
12 import org.xmlBlaster.authentication.plugins.I_MsgSecurityInterceptor;
13 import org.xmlBlaster.client.protocol.I_XmlBlasterConnection;
14 import org.xmlBlaster.client.protocol.ProtocolPluginManager;
15 import org.xmlBlaster.client.qos.ConnectReturnQos;
16 import org.xmlBlaster.client.qos.EraseReturnQos;
17 import org.xmlBlaster.client.qos.PublishReturnQos;
18 import org.xmlBlaster.client.qos.SubscribeReturnQos;
19 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
20 import org.xmlBlaster.client.queuemsg.MsgQueueConnectEntry;
21 import org.xmlBlaster.client.queuemsg.MsgQueueDisconnectEntry;
22 import org.xmlBlaster.client.queuemsg.MsgQueueEraseEntry;
23 import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;
24 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
25 import org.xmlBlaster.client.queuemsg.MsgQueueSubscribeEntry;
26 import org.xmlBlaster.client.queuemsg.MsgQueueUnSubscribeEntry;
27 import org.xmlBlaster.util.Global;
28 import org.xmlBlaster.util.IsoDateParser;
29 import org.xmlBlaster.util.MsgUnit;
30 import org.xmlBlaster.util.MsgUnitRaw;
31 import org.xmlBlaster.util.XmlBlasterException;
32 import org.xmlBlaster.util.checkpoint.I_Checkpoint;
33 import org.xmlBlaster.util.def.Constants;
34 import org.xmlBlaster.util.def.ErrorCode;
35 import org.xmlBlaster.util.def.MethodName;
36 import org.xmlBlaster.util.dispatch.DispatchConnection;
37 import org.xmlBlaster.util.qos.ConnectQosData;
38 import org.xmlBlaster.util.qos.address.Address;
39 import org.xmlBlaster.util.qos.address.AddressBase;
40 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
41 import org.xmlBlaster.util.xbformat.I_ProgressListener;
42
43
44 /**
45 * Holding all necessary infos to establish callback
46 * connections and invoke their update().
47 * @see DispatchConnection
48 * @author xmlBlaster@marcelruff.info
49 */
50 public final class ClientDispatchConnection extends DispatchConnection
51 {
52 private static Logger log = Logger.getLogger(ClientDispatchConnection.class.getName());
53 private final String ME;
54 private I_XmlBlasterConnection driver;
55 private final I_MsgSecurityInterceptor securityInterceptor;
56 private ConnectQosData connectQosData;
57 private ConnectReturnQos connectReturnQos;
58 private String[] checkPointContext;
59 private MsgQueueEntry connectEntry;
60 //private SessionName sessionName;
61
62 /**
63 * @param connectionsHandler The DevliveryConnectionsHandler witch i belong to
64 * @param aAddress The address i shall connect to
65 */
66 public ClientDispatchConnection(Global glob, ClientDispatchConnectionsHandler connectionsHandler, AddressBase address) throws XmlBlasterException {
67 super(glob, connectionsHandler, address);
68 this.ME = "ClientDispatchConnection-" + this.hashCode() + "-" + connectionsHandler.getDispatchManager().getQueue().getStorageId();
69 this.securityInterceptor = connectionsHandler.getDispatchManager().getMsgSecurityInterceptor();
70 }
71
72 public final String getDriverName() {
73 return (this.driver != null) ? this.driver.getProtocol() : "unknown";
74 }
75
76 /**
77 * @return A nice name for logging
78 */
79 public final String getName() {
80 return ME;
81 }
82
83 /**
84 * Load the appropriate protocol driver, e.g the CORBA protocol plugin.
85 * <p>
86 * This method is called by our base class during initialization.
87 * </p>
88 */
89 public final void loadPlugin() throws XmlBlasterException {
90 ProtocolPluginManager loader = glob.getProtocolPluginManager();
91 this.driver = loader.getPlugin(super.address.getType(), super.address.getVersion()); // e.g. CorbaConnection(glob);
92 if (this.driver == null)
93 throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED, ME, "Sorry, protocol type='" + super.address.getType() + "' is not supported");
94 }
95
96 /**
97 * @see DispatchConnection#connectLowlevel()
98 */
99 public final void connectLowlevel() throws XmlBlasterException {
100 if (this.driver == null)
101 throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED, ME, "Sorry, protocol type='" + super.address.getType() + "' is not supported");
102 this.driver.connectLowlevel((Address)super.address);
103 if (super.address.getPingInterval() > 0) {
104 //spanPingTimer(1, true); // Could deadlock as it uses complete dispatch framework with its synchronized?
105 this.driver.ping("<qos><state info='"+Constants.INFO_INITIAL+"'/></qos>"); // Try a low level ping
106 }
107 if (log.isLoggable(Level.FINE)) log.fine(ME+": Connected low level to " + super.address.toString());
108 }
109
110 /**
111 * Send the messages to xmlBlaster.
112 * @param msgArr The messages to send.
113 * msgArr[i].getReturnVal() will contain the returned QoS object or null for oneway operations
114 * @param isAsyncMode true if coming from queue
115 */
116 public void doSend(MsgQueueEntry[] msgArr_, boolean isAsyncMode) throws XmlBlasterException {
117 if (msgArr_.length < 1) {
118 return;
119 }
120
121 boolean onlyPublish = true;
122 boolean onlyPublishOneway = true;
123 for (int ii=0; ii<msgArr_.length; ii++) {
124 if (MethodName.PUBLISH_ONEWAY != msgArr_[ii].getMethodName())
125 onlyPublishOneway = false;
126 if (MethodName.PUBLISH != msgArr_[ii].getMethodName())
127 onlyPublish = false;
128 }
129 if (onlyPublishOneway || onlyPublish) {
130 publish(msgArr_);
131 return;
132 }
133
134 for (int ii=0; ii<msgArr_.length; ii++) {
135 try {
136 if (MethodName.PUBLISH_ONEWAY == msgArr_[ii].getMethodName()) {
137 MsgQueueEntry[] tmp = new MsgQueueEntry[] { msgArr_[ii] };
138 publish(tmp);
139 }
140 else if (MethodName.PUBLISH == msgArr_[ii].getMethodName()) {
141 MsgQueueEntry[] tmp = new MsgQueueEntry[] { msgArr_[ii] };
142 publish(tmp);
143 }
144 else if (MethodName.GET == msgArr_[ii].getMethodName()) {
145 get(msgArr_[ii]);
146 }
147 else if (MethodName.SUBSCRIBE == msgArr_[ii].getMethodName()) {
148 subscribe(msgArr_[ii]);
149 }
150 else if (MethodName.UNSUBSCRIBE == msgArr_[ii].getMethodName()) {
151 unSubscribe(msgArr_[ii]);
152 }
153 else if (MethodName.ERASE == msgArr_[ii].getMethodName()) {
154 erase(msgArr_[ii]);
155 }
156 else if (MethodName.CONNECT == msgArr_[ii].getMethodName()) {
157 if (isAsyncMode && isAlive() && this.driver.isLoggedIn()) {
158 return; // Coming from queue but we are alive already
159 }
160 connect(msgArr_[ii]);
161 this.connectEntry = msgArr_[ii]; // remember it
162 }
163 else if (MethodName.DISCONNECT == msgArr_[ii].getMethodName()) {
164 this.connectEntry = null;
165 disconnect(msgArr_[ii]);
166 }
167 else {
168 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Message type '" + msgArr_[ii].getEmbeddedType() + "' is not implemented");
169 }
170 }
171 catch (XmlBlasterException e) {
172 if (this.connectEntry != null && e.isErrorCode(ErrorCode.USER_SECURITY_AUTHENTICATION_ACCESSDENIED)) {
173 // Happens if the client was killed in the server by an admin task
174 // and has tried to reconnect with the old sessionId
175 log.warning(ME+": Server changed sessionId, trying reconnect now: " + e.toString());
176 //reconnect(); // loops?!
177 connect(this.connectEntry);
178 connectionsHandler.getDispatchManager().postSendNotification(this.connectEntry);
179 if (log.isLoggable(Level.FINE)) log.fine(ME+": Server changed sessionId to " + this.connectReturnQos.getServerInstanceId());
180 ii--;
181 }
182 else {
183 throw e;
184 }
185 }
186 }
187 }
188
189 private void setCheckpointContext(ConnectReturnQos qr) {
190 if (qr == null) {
191 this.checkPointContext = null;
192 return;
193 }
194 this.checkPointContext = new String[] {"sessionName", qr.getSessionName().getAbsoluteName()};
195 }
196
197 private void publish(MsgQueueEntry[] msgArr_) throws XmlBlasterException {
198
199 I_Checkpoint cp = glob.getCheckpointPlugin();
200
201 // Convert to PublishEntry
202 MsgUnit[] msgArr = new MsgUnit[msgArr_.length];
203 for (int i=0; i<msgArr.length; i++) {
204 MsgQueuePublishEntry publishEntry = (MsgQueuePublishEntry)msgArr_[i];
205 msgArr[i] = publishEntry.getMsgUnit();
206 }
207
208 MsgUnitRaw[] msgUnitRawArr = new MsgUnitRaw[msgArr.length];
209 // We export/encrypt the message (call the interceptor)
210 if (securityInterceptor != null) {
211 for (int i=0; i<msgArr.length; i++) {
212 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.PUBLISH, msgArr[i].getMsgUnitRaw());
213 msgUnitRawArr[i] = securityInterceptor.exportMessage(dataHolder);
214 }
215 if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted " + msgArr.length + " publish messages.");
216 }
217 else {
218 log.warning("No session security context, sending " + msgArr.length + " publish messages without encryption");
219 for (int i=0; i<msgArr.length; i++) {
220 msgUnitRawArr[i] = msgArr[i].getMsgUnitRaw();
221 }
222 }
223
224 if (MethodName.PUBLISH_ONEWAY == msgArr_[0].getMethodName()) {
225 this.driver.publishOneway(msgUnitRawArr);
226 connectionsHandler.getDispatchStatistic().incrNumPublish(msgUnitRawArr.length);
227 if (log.isLoggable(Level.FINE)) log.fine(ME+": Success, sent " + msgArr.length + " oneway publish messages.");
228 if (cp != null) {
229 for (int i=0; i<msgArr.length; i++) {
230 cp.passingBy(I_Checkpoint.CP_CONNECTION_PUBLISH_ACK, msgArr[i],
231 null, this.checkPointContext);
232 }
233 }
234 return;
235 }
236
237 if (log.isLoggable(Level.FINE)) log.fine(ME+": Before publish " + msgArr.length + " acknowledged messages ...");
238
239 String[] rawReturnVal = this.driver.publishArr(msgUnitRawArr);
240 if (rawReturnVal == null) {
241 String text = "driver.publishArr len= " + msgUnitRawArr.length + " returned null: " + ((msgUnitRawArr.length>0)?msgUnitRawArr[0].getKey():"");
242 throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION, ME, text);
243 }
244 connectionsHandler.getDispatchStatistic().incrNumPublish(rawReturnVal.length);
245
246 if (log.isLoggable(Level.FINE)) log.fine(ME+": Success, sent " + msgArr.length + " acknowledged publish messages, return value #1 is '" + rawReturnVal[0] + "'");
247
248 if (rawReturnVal != null) {
249 for (int i=0; i<rawReturnVal.length; i++) {
250 if (cp != null) {
251 MsgQueuePublishEntry publishEntry = (MsgQueuePublishEntry)msgArr_[i];
252 cp.passingBy(I_Checkpoint.CP_CONNECTION_PUBLISH_ACK, publishEntry.getMsgUnit(),
253 null, this.checkPointContext);
254 }
255
256 if (!msgArr_[i].wantReturnObj())
257 continue;
258
259 if (securityInterceptor != null) {
260 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.PUBLISH, new MsgUnitRaw(null, (byte[])null, rawReturnVal[i]));
261 dataHolder.setReturnValue(true);
262 rawReturnVal[i] = securityInterceptor.importMessage(dataHolder).getQos();
263 }
264
265 // create return object
266 try {
267 msgArr_[i].setReturnObj(new PublishReturnQos(glob, rawReturnVal[i]));
268 }
269 catch (Throwable e) {
270 log.warning(ME+": Can't parse publish returned value '" + rawReturnVal[i] + "', setting to default: " + e.toString());
271 //e.printStackTrace();
272 msgArr_[i].setReturnObj(new PublishReturnQos(glob, "<qos/>"));
273 }
274 }
275 if (log.isLoggable(Level.FINE)) log.fine(ME+": Imported/decrypted " + rawReturnVal.length + " publish message return values.");
276 }
277 }
278
279 /**
280 * Encrypt and send a subscribe request, decrypt the returned data
281 */
282 private void subscribe(MsgQueueEntry entry) throws XmlBlasterException {
283 MsgQueueSubscribeEntry subscribeEntry = (MsgQueueSubscribeEntry)entry;
284
285 String key = subscribeEntry.getSubscribeKeyData().toXml();
286 String qos = subscribeEntry.getSubscribeQosData().toXml();
287 if (securityInterceptor != null) { // We export/encrypt the message (call the interceptor)
288 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.SUBSCRIBE, new MsgUnitRaw(key, (byte[])null, qos));
289 MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder);
290 key = msgUnitRaw.getKey();
291 qos = msgUnitRaw.getQos();
292 if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted subscribe request.");
293 }
294 else {
295 log.warning(ME+": No session security context, subscribe request is not encrypted");
296 }
297
298 String rawReturnVal = this.driver.subscribe(key, qos); // Invoke remote server
299
300 connectionsHandler.getDispatchStatistic().incrNumSubscribe(1);
301
302 if (subscribeEntry.wantReturnObj()) {
303 if (securityInterceptor != null) { // decrypt return value ...
304 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.SUBSCRIBE, new MsgUnitRaw(null, (byte[])null, rawReturnVal));
305 dataHolder.setReturnValue(true);
306 rawReturnVal = securityInterceptor.importMessage(dataHolder).getQos();
307 }
308 try {
309 subscribeEntry.setReturnObj(new SubscribeReturnQos(glob, rawReturnVal));
310 }
311 catch (Throwable e) {
312 log.warning(ME+": Can't parse returned subscribe value '" + rawReturnVal + "', setting to default: " + e.toString());
313 subscribeEntry.setReturnObj(new SubscribeReturnQos(glob, "<qos/>"));
314 }
315 }
316 }
317
318 /**
319 * Encrypt and send a unSubscribe request, decrypt the returned data
320 */
321 private void unSubscribe(MsgQueueEntry entry) throws XmlBlasterException {
322 MsgQueueUnSubscribeEntry unSubscribeEntry = (MsgQueueUnSubscribeEntry)entry;
323
324 String key = unSubscribeEntry.getUnSubscribeKey().toXml();
325 String qos = unSubscribeEntry.getUnSubscribeQos().toXml();
326 if (securityInterceptor != null) { // We export/encrypt the message (call the interceptor)
327 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.UNSUBSCRIBE, new MsgUnitRaw(key, (byte[])null, qos));
328 MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder);
329 key = msgUnitRaw.getKey();
330 qos = msgUnitRaw.getQos();
331 if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted unSubscribe request.");
332 }
333 else {
334 log.warning(ME+": No session security context, unSubscribe request is not encrypted");
335 }
336
337 String[] rawReturnValArr = this.driver.unSubscribe(key, qos); // Invoke remote server
338
339 connectionsHandler.getDispatchStatistic().incrNumUnSubscribe(1);
340
341 if (unSubscribeEntry.wantReturnObj()) {
342 UnSubscribeReturnQos[] retQosArr = new UnSubscribeReturnQos[rawReturnValArr.length];
343 for (int ii=0; ii<rawReturnValArr.length; ii++) {
344 if (securityInterceptor != null) { // decrypt return value ...
345 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.UNSUBSCRIBE, new MsgUnitRaw(null, (byte[])null, rawReturnValArr[ii]));
346 dataHolder.setReturnValue(true);
347 String xmlQos = securityInterceptor.importMessage(dataHolder).getQos();
348 retQosArr[ii] = new UnSubscribeReturnQos(glob, xmlQos);
349 }
350 }
351
352 try {
353 unSubscribeEntry.setReturnObj(retQosArr);
354 }
355 catch (Throwable e) {
356 log.warning(ME+": Can't parse returned unSubscribe value setting to default: " + e.toString());
357 for (int ii=0; ii<rawReturnValArr.length; ii++) {
358 retQosArr[ii] = new UnSubscribeReturnQos(glob, "<qos/>");
359 }
360 unSubscribeEntry.setReturnObj(retQosArr);
361 }
362 }
363 }
364
365 /**
366 * Encrypt and send a synchronous get request, decrypt the returned data
367 */
368 private void get(MsgQueueEntry entry) throws XmlBlasterException {
369 MsgQueueGetEntry getEntry = (MsgQueueGetEntry)entry;
370
371 String key = getEntry.getGetKey().toXml();
372 String qos = getEntry.getGetQos().toXml();
373 if (this.securityInterceptor != null) { // We export/encrypt the message (call the interceptor)
374 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.GET, new MsgUnitRaw(key, (byte[])null, qos));
375 MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder);
376 key = msgUnitRaw.getKey();
377 qos = msgUnitRaw.getQos();
378 if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted get request.");
379 }
380 else {
381 log.warning(ME+": No session security context, get request is not encrypted");
382 }
383
384 MsgUnitRaw[] rawReturnValArr = this.driver.get(key, qos); // Invoke remote server
385
386 connectionsHandler.getDispatchStatistic().incrNumGet(1);
387
388 MsgUnit[] msgUnitArr = new MsgUnit[rawReturnValArr.length];
389 if (getEntry.wantReturnObj()) {
390 for (int ii=0; ii<rawReturnValArr.length; ii++) {
391 if (this.securityInterceptor != null) { // decrypt return value ...
392 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.GET, rawReturnValArr[ii]);
393 dataHolder.setReturnValue(true);
394 rawReturnValArr[ii] = securityInterceptor.importMessage(dataHolder);
395 }
396 // NOTE: We use PUBLISH here instead of GET_RETURN to have the whole MsgUnit stored
397 msgUnitArr[ii] = new MsgUnit(glob, rawReturnValArr[ii], MethodName.PUBLISH);
398 }
399
400 getEntry.setReturnObj(msgUnitArr);
401 }
402 }
403
404 /**
405 * Encrypt and send a erase request, decrypt the returned data
406 */
407 private void erase(MsgQueueEntry entry) throws XmlBlasterException {
408 MsgQueueEraseEntry eraseEntry = (MsgQueueEraseEntry)entry;
409
410 String key = eraseEntry.getEraseKey().toXml();
411 String qos = eraseEntry.getEraseQos().toXml();
412 if (securityInterceptor != null) { // We export/encrypt the message (call the interceptor)
413 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.ERASE, new MsgUnitRaw(key, (byte[])null, qos));
414 MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder);
415 key = msgUnitRaw.getKey();
416 qos = msgUnitRaw.getQos();
417 if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted erase request.");
418 }
419 else {
420 log.warning(ME+": No session security context, erase request is not encrypted");
421 }
422
423 String[] rawReturnValArr = this.driver.erase(key, qos); // Invoke remote server
424
425 connectionsHandler.getDispatchStatistic().incrNumErase(1);
426
427 if (eraseEntry.wantReturnObj()) {
428 EraseReturnQos[] retQosArr = new EraseReturnQos[rawReturnValArr.length];
429 for (int ii=0; ii<rawReturnValArr.length; ii++) {
430 if (securityInterceptor != null) { // decrypt return value ...
431 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.ERASE, new MsgUnitRaw(null, (byte[])null, rawReturnValArr[ii]));
432 dataHolder.setReturnValue(true);
433 String xmlQos = securityInterceptor.importMessage(dataHolder).getQos();
434 retQosArr[ii] = new EraseReturnQos(glob, xmlQos);
435 }
436 }
437
438 try {
439 eraseEntry.setReturnObj(retQosArr);
440 }
441 catch (Throwable e) {
442 log.warning(ME+": Can't parse returned erase value setting to default: " + e.toString());
443 for (int ii=0; ii<rawReturnValArr.length; ii++) {
444 retQosArr[ii] = new EraseReturnQos(glob, "<qos/>");
445 }
446 eraseEntry.setReturnObj(retQosArr);
447 }
448 }
449 }
450
451 /**
452 * Remember the cqd in this.connectQosData and return the encrypted string.
453 */
454 private String getEncryptedConnectQos(ConnectQosData cqd) throws XmlBlasterException {
455 if (cqd == null) {
456 return null;
457 }
458 cqd.addClientProperty(Constants.CLIENTPROPERTY_UTC, IsoDateParser.getCurrentUTCTimestamp());
459 this.connectQosData = cqd;
460 if (this.securityInterceptor != null) { // We export/encrypt the message (call the interceptor)
461 if (log.isLoggable(Level.FINE)) log.fine(ME+": TODO: Crypting msg with exportMessage() is not supported for connect() as the server currently can't handle encrypted ConnectQos (for SOCKET see HandleClient.java:234)");
462 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.CONNECT, new MsgUnitRaw(null, (byte[])null, cqd.toXml()));
463 String encryptedConnectQos = this.securityInterceptor.exportMessage(dataHolder).getQos();
464 if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted connect request.");
465 return encryptedConnectQos;
466 }
467 else {
468 log.warning(ME+": No session security context, connect request is not encrypted");
469 return cqd.toXml();
470 }
471 }
472
473 /**
474 * Encrypt and send a connect request, decrypt the returned data
475 */
476 private void connect(MsgQueueEntry entry) throws XmlBlasterException {
477 MsgQueueConnectEntry connectEntry = (MsgQueueConnectEntry)entry;
478 //this.sessionName = connectEntry.getConnectQosData().getSessionName();
479
480 String encryptedConnectQos = getEncryptedConnectQos(connectEntry.getConnectQosData());
481
482 // TODO: pass connectEntry.getConnectQosData().getSender().getLoginName(); as this is used by SOCKET:requestId
483 String rawReturnVal = this.driver.connect(encryptedConnectQos); // Invoke remote server
484
485 connectionsHandler.getDispatchStatistic().incrNumConnect(1);
486
487 if (securityInterceptor != null) { // decrypt return value ...
488 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.CONNECT, new MsgUnitRaw(null, (byte[])null, rawReturnVal));
489 dataHolder.setReturnValue(true);
490 rawReturnVal = securityInterceptor.importMessage(dataHolder).getQos();
491 }
492
493 try {
494 this.connectReturnQos = new ConnectReturnQos(glob, rawReturnVal);
495 setCheckpointContext(this.connectReturnQos);
496 }
497 catch (XmlBlasterException e) {
498 log.severe(ME+": Can't parse returned connect QoS value '" + rawReturnVal + "': " + e.getMessage());
499 throw e;
500 }
501
502 if (!connectEntry.getConnectQosData().getSessionName().isSession()) {
503 // We need to remember the server side assigned public session id for reconnect polling
504 // If do we should probably take a clone:
505 //ConnectQos connectQos = new ConnectQos(this.glob, this.connectReturnQos.getData());
506 ConnectQosData connectQos = connectEntry.getConnectQosData();
507 connectQos.setSessionName(this.connectReturnQos.getSessionName());
508 //this.sessionName = this.connectReturnQos.getSessionName();
509 connectQos.getSessionQos().setSecretSessionId(this.connectReturnQos.getSecretSessionId());
510 this.connectQosData = connectQos;
511 }
512
513 if (connectEntry.wantReturnObj()) {
514 connectEntry.setReturnObj(this.connectReturnQos);
515 }
516 this.driver.setConnectReturnQos(this.connectReturnQos);
517 }
518
519 /**
520 * Encrypt and send a disconnect request, decrypt the returned data
521 */
522 private void disconnect(MsgQueueEntry entry) throws XmlBlasterException {
523 MsgQueueDisconnectEntry disconnectEntry = (MsgQueueDisconnectEntry)entry;
524 String qos = disconnectEntry.getDisconnectQos().toXml();
525 if (securityInterceptor != null) { // We export/encrypt the message (call the interceptor)
526 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.DISCONNECT, new MsgUnitRaw(null, (byte[])null, qos));
527 qos = securityInterceptor.exportMessage(dataHolder).getQos();
528 if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted disconnect request.");
529 }
530 else {
531 log.warning(ME+": No session security context, disconnect request is not encrypted");
532 }
533
534 //returns void
535 this.driver.disconnect(qos); // Invoke remote server
536 }
537
538 /**
539 * @see org.xmlBlaster.util.dispatch.DispatchConnection#doPing(String)
540 */
541 public final String doPing(String data) throws XmlBlasterException {
542 String ret = driver.ping(data);
543 return (ret==null) ? "" : ret;
544 }
545
546 /**
547 * Nothing to do here
548 */
549 public final void resetConnection() {
550 if (log.isLoggable(Level.FINE)) log.fine(ME+": resetConnection(): Initializing driver for polling");
551 this.connectReturnQos = null;
552 this.driver.resetConnection();
553 }
554
555 /**
556 * On reconnect polling try to establish the connection.
557 */
558 protected final void reconnect() throws XmlBlasterException {
559 if (this.driver == null) return;
560 if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering reconnect(" + this.driver.getProtocol() + ")");
561
562 if (this.connectReturnQos != null) {
563 // needed to avoid failure
564 this.connectionsHandler.getDispatchStatistic().clearCurrentReads();
565 this.connectionsHandler.getDispatchStatistic().clearCurrentWrites();
566 super.ping("", false);
567 return;
568 }
569
570 if (this.connectQosData == null) {
571 // We never had connected on application layer, so try low level layer only
572 this.driver.connectLowlevel((Address)super.address);
573 return;
574 }
575
576 String encryptedConnectQos = getEncryptedConnectQos(this.connectQosData);
577 // low level connect (e.g. on TCP/IP layer) and remote invoke method connect()
578 String rawReturnVal = this.driver.connect(encryptedConnectQos); // Invoke remote server
579
580 connectionsHandler.getDispatchStatistic().incrNumConnect(1);
581
582 if (securityInterceptor != null) { // decrypt return value ...
583 CryptDataHolder dataHolder = new CryptDataHolder(MethodName.CONNECT, new MsgUnitRaw(null, (byte[])null, rawReturnVal));
584 dataHolder.setReturnValue(true);
585 rawReturnVal = securityInterceptor.importMessage(dataHolder).getQos();
586 }
587
588 this.connectReturnQos = null;
589 try {
590 this.connectReturnQos = new ConnectReturnQos(glob, rawReturnVal);
591 setCheckpointContext(this.connectReturnQos);
592 if (this.connectEntry != null) {
593 if (this.connectEntry.wantReturnObj()) {
594 this.connectEntry.setReturnObj(this.connectReturnQos);
595 }
596 connectionsHandler.getDispatchManager().postSendNotification(this.connectEntry);
597 }
598 }
599 catch (XmlBlasterException e) {
600 log.severe(ME+": reconnect(): Can't parse returned connect QoS value '" + rawReturnVal + "': " + e.getMessage());
601 throw e;
602 }
603 this.driver.setConnectReturnQos(this.connectReturnQos);
604 }
605
606 /**
607 * Stop all callback drivers of this client.
608 */
609 public final void shutdown(boolean delayed) throws XmlBlasterException {
610 super.shutdown(delayed);
611 if (driver != null) {
612 driver.shutdown();
613 }
614 }
615
616 /**
617 * Dump state of this object into a XML ASCII string.
618 * <br>
619 * @param extraOffset indenting of tags for nice output
620 * @return internal state as an XML ASCII string
621 */
622 public final String toXml(String extraOffset) {
623 StringBuffer sb = new StringBuffer(256);
624 if (extraOffset == null) extraOffset = "";
625 String offset = Constants.OFFSET + extraOffset;
626
627 sb.append(offset + "<ClientDispatchConnection>");
628 super.address.toXml(" " + offset);
629 if (driver == null)
630 sb.append(offset).append(" <noProtocolDriver />");
631 else
632 sb.append(offset).append(" <address type='" + driver.getProtocol() + "' state='" + getState() + "'/>");
633 sb.append(offset).append("</ClientDispatchConnection>");
634
635 return sb.toString();
636 }
637
638 public I_ProgressListener registerProgressListener(I_ProgressListener listener) {
639 if (this.driver == null) return null;
640 return this.driver.registerProgressListener(listener);
641 }
642
643 protected boolean forcePingFailure() {
644 return true;
645 }
646
647 }
syntax highlighted by Code2HTML, v. 0.9.1