1 /*------------------------------------------------------------------------------
  2 Name:      ClientDispatchConnection.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.client.dispatch;
  7 
  8 import java.util.logging.Level;
  9 import java.util.logging.Logger;
 10 
 11 import org.xmlBlaster.authentication.plugins.CryptDataHolder;
 12 import org.xmlBlaster.authentication.plugins.I_MsgSecurityInterceptor;
 13 import org.xmlBlaster.client.protocol.I_XmlBlasterConnection;
 14 import org.xmlBlaster.client.protocol.ProtocolPluginManager;
 15 import org.xmlBlaster.client.qos.ConnectReturnQos;
 16 import org.xmlBlaster.client.qos.EraseReturnQos;
 17 import org.xmlBlaster.client.qos.PublishReturnQos;
 18 import org.xmlBlaster.client.qos.SubscribeReturnQos;
 19 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
 20 import org.xmlBlaster.client.queuemsg.MsgQueueConnectEntry;
 21 import org.xmlBlaster.client.queuemsg.MsgQueueDisconnectEntry;
 22 import org.xmlBlaster.client.queuemsg.MsgQueueEraseEntry;
 23 import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;
 24 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
 25 import org.xmlBlaster.client.queuemsg.MsgQueueSubscribeEntry;
 26 import org.xmlBlaster.client.queuemsg.MsgQueueUnSubscribeEntry;
 27 import org.xmlBlaster.util.Global;
 28 import org.xmlBlaster.util.IsoDateParser;
 29 import org.xmlBlaster.util.MsgUnit;
 30 import org.xmlBlaster.util.MsgUnitRaw;
 31 import org.xmlBlaster.util.XmlBlasterException;
 32 import org.xmlBlaster.util.checkpoint.I_Checkpoint;
 33 import org.xmlBlaster.util.def.Constants;
 34 import org.xmlBlaster.util.def.ErrorCode;
 35 import org.xmlBlaster.util.def.MethodName;
 36 import org.xmlBlaster.util.dispatch.DispatchConnection;
 37 import org.xmlBlaster.util.qos.ConnectQosData;
 38 import org.xmlBlaster.util.qos.address.Address;
 39 import org.xmlBlaster.util.qos.address.AddressBase;
 40 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
 41 import org.xmlBlaster.util.xbformat.I_ProgressListener;
 42 
 43 
 44 /**
 45  * Holding all necessary infos to establish callback
 46  * connections and invoke their update().
 47  * @see DispatchConnection
 48  * @author xmlBlaster@marcelruff.info
 49  */
 50 public final class ClientDispatchConnection extends DispatchConnection
 51 {
 52    private static Logger log = Logger.getLogger(ClientDispatchConnection.class.getName());
 53    private final String ME;
 54    private I_XmlBlasterConnection driver;
 55    private final I_MsgSecurityInterceptor securityInterceptor;
 56    private ConnectQosData connectQosData;
 57    private ConnectReturnQos connectReturnQos;
 58    private String[] checkPointContext;
 59    private MsgQueueEntry connectEntry;
 60    //private SessionName sessionName;
 61 
 62    /**
 63     * @param connectionsHandler The DevliveryConnectionsHandler witch i belong to
 64     * @param aAddress The address i shall connect to
 65     */
 66    public ClientDispatchConnection(Global glob, ClientDispatchConnectionsHandler connectionsHandler, AddressBase address) throws XmlBlasterException {
 67       super(glob, connectionsHandler, address);
 68       this.ME = "ClientDispatchConnection-" + this.hashCode() + "-" + connectionsHandler.getDispatchManager().getQueue().getStorageId();
 69       this.securityInterceptor = connectionsHandler.getDispatchManager().getMsgSecurityInterceptor();
 70    }
 71 
 72    public final String getDriverName() {
 73       return (this.driver != null) ? this.driver.getProtocol() : "unknown";
 74    }
 75 
 76    /**
 77     * @return A nice name for logging
 78     */
 79    public final String getName() {
 80       return ME;
 81    }
 82 
 83    /**
 84     * Load the appropriate protocol driver, e.g the CORBA protocol plugin. 
 85     * <p>
 86     * This method is called by our base class during initialization.
 87     * </p>
 88     */
 89    public final void loadPlugin() throws XmlBlasterException {
 90       ProtocolPluginManager loader = glob.getProtocolPluginManager();
 91       this.driver = loader.getPlugin(super.address.getType(), super.address.getVersion()); // e.g. CorbaConnection(glob);
 92       if (this.driver == null)
 93          throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED, ME, "Sorry, protocol type='" + super.address.getType() + "' is not supported");
 94    }
 95 
 96    /**
 97     * @see DispatchConnection#connectLowlevel()
 98     */
 99    public final void connectLowlevel() throws XmlBlasterException {
100       if (this.driver == null)
101          throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED, ME, "Sorry, protocol type='" + super.address.getType() + "' is not supported");
102       this.driver.connectLowlevel((Address)super.address);
103       if (super.address.getPingInterval() > 0) {
104          //spanPingTimer(1, true); // Could deadlock as it uses complete dispatch framework with its synchronized?
105          this.driver.ping("<qos><state info='"+Constants.INFO_INITIAL+"'/></qos>");  // Try a low level ping
106       }
107       if (log.isLoggable(Level.FINE)) log.fine(ME+": Connected low level to " + super.address.toString());
108    }
109 
110    /**
111     * Send the messages to xmlBlaster. 
112     * @param msgArr The messages to send.
113     *  msgArr[i].getReturnVal() will contain the returned QoS object or null for oneway operations
114     * @param isAsyncMode true if coming from queue
115     */
116    public void doSend(MsgQueueEntry[] msgArr_, boolean isAsyncMode) throws XmlBlasterException {
117       if (msgArr_.length < 1) {
118          return;
119       }
120 
121       boolean onlyPublish = true;
122       boolean onlyPublishOneway = true;
123       for (int ii=0; ii<msgArr_.length; ii++) {
124          if (MethodName.PUBLISH_ONEWAY != msgArr_[ii].getMethodName())
125             onlyPublishOneway = false;
126          if (MethodName.PUBLISH != msgArr_[ii].getMethodName())
127             onlyPublish = false;
128       }
129       if (onlyPublishOneway || onlyPublish) {
130          publish(msgArr_);
131          return;
132       }
133       
134       for (int ii=0; ii<msgArr_.length; ii++) {
135          try {
136             if (MethodName.PUBLISH_ONEWAY == msgArr_[ii].getMethodName()) {
137                MsgQueueEntry[] tmp = new MsgQueueEntry[] { msgArr_[ii] };
138                publish(tmp);
139             }
140             else if (MethodName.PUBLISH == msgArr_[ii].getMethodName()) {
141                MsgQueueEntry[] tmp = new MsgQueueEntry[] { msgArr_[ii] };
142                publish(tmp);
143             }
144             else if (MethodName.GET == msgArr_[ii].getMethodName()) {
145                get(msgArr_[ii]);
146             }
147             else if (MethodName.SUBSCRIBE == msgArr_[ii].getMethodName()) {
148                subscribe(msgArr_[ii]);
149             }
150             else if (MethodName.UNSUBSCRIBE == msgArr_[ii].getMethodName()) {
151                unSubscribe(msgArr_[ii]);
152             }
153             else if (MethodName.ERASE == msgArr_[ii].getMethodName()) {
154                erase(msgArr_[ii]);
155             }
156             else if (MethodName.CONNECT == msgArr_[ii].getMethodName()) {
157                if (isAsyncMode && isAlive() && this.driver.isLoggedIn()) {
158                   return; // Coming from queue but we are alive already
159                }
160                connect(msgArr_[ii]);
161                this.connectEntry = msgArr_[ii]; // remember it
162             }
163             else if (MethodName.DISCONNECT == msgArr_[ii].getMethodName()) {
164                this.connectEntry = null;
165                disconnect(msgArr_[ii]);
166             }
167             else {
168                throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Message type '" + msgArr_[ii].getEmbeddedType() + "' is not implemented");
169             }
170          }
171          catch (XmlBlasterException e) {
172             if (this.connectEntry != null && e.isErrorCode(ErrorCode.USER_SECURITY_AUTHENTICATION_ACCESSDENIED)) {
173                // Happens if the client was killed in the server by an admin task
174                // and has tried to reconnect with the old sessionId
175                log.warning(ME+": Server changed sessionId, trying reconnect now: " + e.toString());
176                //reconnect();   // loops?!
177                connect(this.connectEntry);
178                connectionsHandler.getDispatchManager().postSendNotification(this.connectEntry);
179                if (log.isLoggable(Level.FINE)) log.fine(ME+": Server changed sessionId to " + this.connectReturnQos.getServerInstanceId());
180                ii--;
181             }
182             else {
183                throw e;
184             }
185          }
186       }
187    }
188    
189    private void setCheckpointContext(ConnectReturnQos qr) {
190       if (qr == null) {
191          this.checkPointContext = null;
192          return;
193       }
194       this.checkPointContext = new String[] {"sessionName", qr.getSessionName().getAbsoluteName()};
195    }
196 
197    private void publish(MsgQueueEntry[] msgArr_) throws XmlBlasterException {
198       
199       I_Checkpoint cp = glob.getCheckpointPlugin();
200 
201       // Convert to PublishEntry
202       MsgUnit[] msgArr = new MsgUnit[msgArr_.length];
203       for (int i=0; i<msgArr.length; i++) {
204          MsgQueuePublishEntry publishEntry = (MsgQueuePublishEntry)msgArr_[i];
205          msgArr[i] = publishEntry.getMsgUnit();
206       }
207 
208       MsgUnitRaw[] msgUnitRawArr = new MsgUnitRaw[msgArr.length];
209       // We export/encrypt the message (call the interceptor)
210       if (securityInterceptor != null) {
211          for (int i=0; i<msgArr.length; i++) {
212             CryptDataHolder dataHolder = new CryptDataHolder(MethodName.PUBLISH, msgArr[i].getMsgUnitRaw());
213             msgUnitRawArr[i] = securityInterceptor.exportMessage(dataHolder);
214          }
215          if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted " + msgArr.length + " publish messages.");
216       }
217       else {
218          log.warning("No session security context, sending " + msgArr.length + " publish messages without encryption");
219          for (int i=0; i<msgArr.length; i++) {
220             msgUnitRawArr[i] = msgArr[i].getMsgUnitRaw();
221          }
222       }
223 
224       if (MethodName.PUBLISH_ONEWAY == msgArr_[0].getMethodName()) {
225          this.driver.publishOneway(msgUnitRawArr);
226          connectionsHandler.getDispatchStatistic().incrNumPublish(msgUnitRawArr.length);
227          if (log.isLoggable(Level.FINE)) log.fine(ME+": Success, sent " + msgArr.length + " oneway publish messages.");
228          if (cp != null) {
229             for (int i=0; i<msgArr.length; i++) {
230                cp.passingBy(I_Checkpoint.CP_CONNECTION_PUBLISH_ACK, msgArr[i],
231                      null, this.checkPointContext);
232             }
233          }
234          return;
235       }
236 
237       if (log.isLoggable(Level.FINE)) log.fine(ME+": Before publish " + msgArr.length + " acknowledged messages ...");
238 
239       String[] rawReturnVal = this.driver.publishArr(msgUnitRawArr);
240       if (rawReturnVal == null) {
241          String text = "driver.publishArr len= " + msgUnitRawArr.length + " returned null: " + ((msgUnitRawArr.length>0)?msgUnitRawArr[0].getKey():"");
242          throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION, ME, text);
243       }
244       connectionsHandler.getDispatchStatistic().incrNumPublish(rawReturnVal.length);
245 
246       if (log.isLoggable(Level.FINE)) log.fine(ME+": Success, sent " + msgArr.length + " acknowledged publish messages, return value #1 is '" + rawReturnVal[0] + "'");
247 
248       if (rawReturnVal != null) {
249          for (int i=0; i<rawReturnVal.length; i++) {
250             if (cp != null) {
251                MsgQueuePublishEntry publishEntry = (MsgQueuePublishEntry)msgArr_[i];
252                cp.passingBy(I_Checkpoint.CP_CONNECTION_PUBLISH_ACK, publishEntry.getMsgUnit(),
253                         null, this.checkPointContext);
254             }
255             
256             if (!msgArr_[i].wantReturnObj())
257                continue;
258 
259             if (securityInterceptor != null) {
260                CryptDataHolder dataHolder = new CryptDataHolder(MethodName.PUBLISH, new MsgUnitRaw(null, (byte[])null, rawReturnVal[i]));
261                dataHolder.setReturnValue(true);
262                rawReturnVal[i] = securityInterceptor.importMessage(dataHolder).getQos();
263             }
264 
265             // create return object
266             try {
267                msgArr_[i].setReturnObj(new PublishReturnQos(glob, rawReturnVal[i]));
268             }
269             catch (Throwable e) {
270                log.warning(ME+": Can't parse publish returned value '" + rawReturnVal[i] + "', setting to default: " + e.toString());
271                //e.printStackTrace();
272                msgArr_[i].setReturnObj(new PublishReturnQos(glob, "<qos/>"));
273             }
274          }
275          if (log.isLoggable(Level.FINE)) log.fine(ME+": Imported/decrypted " + rawReturnVal.length + " publish message return values.");
276       }
277    }
278 
279    /**
280     * Encrypt and send a subscribe request, decrypt the returned data
281     */
282    private void subscribe(MsgQueueEntry entry) throws XmlBlasterException {
283       MsgQueueSubscribeEntry subscribeEntry = (MsgQueueSubscribeEntry)entry;
284 
285       String key = subscribeEntry.getSubscribeKeyData().toXml();
286       String qos = subscribeEntry.getSubscribeQosData().toXml();
287       if (securityInterceptor != null) {  // We export/encrypt the message (call the interceptor)
288          CryptDataHolder dataHolder = new CryptDataHolder(MethodName.SUBSCRIBE, new MsgUnitRaw(key, (byte[])null, qos));
289          MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder);
290          key = msgUnitRaw.getKey();
291          qos = msgUnitRaw.getQos();
292          if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted subscribe request.");
293       }
294       else {
295          log.warning(ME+": No session security context, subscribe request is not encrypted");
296       }
297 
298       String rawReturnVal = this.driver.subscribe(key, qos); // Invoke remote server
299 
300       connectionsHandler.getDispatchStatistic().incrNumSubscribe(1);
301       
302       if (subscribeEntry.wantReturnObj()) {
303          if (securityInterceptor != null) { // decrypt return value ...
304             CryptDataHolder dataHolder = new CryptDataHolder(MethodName.SUBSCRIBE, new MsgUnitRaw(null, (byte[])null, rawReturnVal));
305             dataHolder.setReturnValue(true);
306             rawReturnVal = securityInterceptor.importMessage(dataHolder).getQos();
307          }
308          try {
309             subscribeEntry.setReturnObj(new SubscribeReturnQos(glob, rawReturnVal));
310          }
311          catch (Throwable e) {
312             log.warning(ME+": Can't parse returned subscribe value '" + rawReturnVal + "', setting to default: " + e.toString());
313             subscribeEntry.setReturnObj(new SubscribeReturnQos(glob, "<qos/>"));
314          }
315       }
316    }
317 
318    /**
319     * Encrypt and send a unSubscribe request, decrypt the returned data
320     */
321    private void unSubscribe(MsgQueueEntry entry) throws XmlBlasterException {
322       MsgQueueUnSubscribeEntry unSubscribeEntry = (MsgQueueUnSubscribeEntry)entry;
323 
324       String key = unSubscribeEntry.getUnSubscribeKey().toXml();
325       String qos = unSubscribeEntry.getUnSubscribeQos().toXml();
326       if (securityInterceptor != null) {  // We export/encrypt the message (call the interceptor)
327          CryptDataHolder dataHolder = new CryptDataHolder(MethodName.UNSUBSCRIBE, new MsgUnitRaw(key, (byte[])null, qos));
328          MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder);
329          key = msgUnitRaw.getKey();
330          qos = msgUnitRaw.getQos();
331          if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted unSubscribe request.");
332       }
333       else {
334          log.warning(ME+": No session security context, unSubscribe request is not encrypted");
335       }
336 
337       String[] rawReturnValArr = this.driver.unSubscribe(key, qos); // Invoke remote server
338 
339       connectionsHandler.getDispatchStatistic().incrNumUnSubscribe(1);
340       
341       if (unSubscribeEntry.wantReturnObj()) {
342          UnSubscribeReturnQos[] retQosArr = new UnSubscribeReturnQos[rawReturnValArr.length];
343          for (int ii=0; ii<rawReturnValArr.length; ii++) {
344             if (securityInterceptor != null) { // decrypt return value ...
345                CryptDataHolder dataHolder = new CryptDataHolder(MethodName.UNSUBSCRIBE, new MsgUnitRaw(null, (byte[])null, rawReturnValArr[ii]));
346                dataHolder.setReturnValue(true);
347                String xmlQos = securityInterceptor.importMessage(dataHolder).getQos();
348                retQosArr[ii] = new UnSubscribeReturnQos(glob, xmlQos);
349             }
350          }
351 
352          try {
353             unSubscribeEntry.setReturnObj(retQosArr);
354          }
355          catch (Throwable e) {
356             log.warning(ME+": Can't parse returned unSubscribe value setting to default: " + e.toString());
357             for (int ii=0; ii<rawReturnValArr.length; ii++) {
358                retQosArr[ii] = new UnSubscribeReturnQos(glob, "<qos/>");
359             }
360             unSubscribeEntry.setReturnObj(retQosArr);
361          }
362       }
363    }
364 
365    /**
366     * Encrypt and send a synchronous get request, decrypt the returned data
367     */
368    private void get(MsgQueueEntry entry) throws XmlBlasterException {
369       MsgQueueGetEntry getEntry = (MsgQueueGetEntry)entry;
370 
371       String key = getEntry.getGetKey().toXml();
372       String qos = getEntry.getGetQos().toXml();
373       if (this.securityInterceptor != null) {  // We export/encrypt the message (call the interceptor)
374          CryptDataHolder dataHolder = new CryptDataHolder(MethodName.GET, new MsgUnitRaw(key, (byte[])null, qos));
375          MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder);
376          key = msgUnitRaw.getKey();
377          qos = msgUnitRaw.getQos();
378          if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted get request.");
379       }
380       else {
381          log.warning(ME+": No session security context, get request is not encrypted");
382       }
383 
384       MsgUnitRaw[] rawReturnValArr = this.driver.get(key, qos); // Invoke remote server
385 
386       connectionsHandler.getDispatchStatistic().incrNumGet(1);
387       
388       MsgUnit[] msgUnitArr = new MsgUnit[rawReturnValArr.length];
389       if (getEntry.wantReturnObj()) {
390          for (int ii=0; ii<rawReturnValArr.length; ii++) {
391             if (this.securityInterceptor != null) { // decrypt return value ...
392                CryptDataHolder dataHolder = new CryptDataHolder(MethodName.GET, rawReturnValArr[ii]);
393                dataHolder.setReturnValue(true);
394                rawReturnValArr[ii] = securityInterceptor.importMessage(dataHolder);
395             }
396             // NOTE: We use PUBLISH here instead of GET_RETURN to have the whole MsgUnit stored
397             msgUnitArr[ii] = new MsgUnit(glob, rawReturnValArr[ii], MethodName.PUBLISH);
398          }
399 
400          getEntry.setReturnObj(msgUnitArr);
401       }
402    }
403 
404    /**
405     * Encrypt and send a erase request, decrypt the returned data
406     */
407    private void erase(MsgQueueEntry entry) throws XmlBlasterException {
408       MsgQueueEraseEntry eraseEntry = (MsgQueueEraseEntry)entry;
409 
410       String key = eraseEntry.getEraseKey().toXml();
411       String qos = eraseEntry.getEraseQos().toXml();
412       if (securityInterceptor != null) {  // We export/encrypt the message (call the interceptor)
413          CryptDataHolder dataHolder = new CryptDataHolder(MethodName.ERASE, new MsgUnitRaw(key, (byte[])null, qos));
414          MsgUnitRaw msgUnitRaw = securityInterceptor.exportMessage(dataHolder);
415          key = msgUnitRaw.getKey();
416          qos = msgUnitRaw.getQos();
417          if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted erase request.");
418       }
419       else {
420          log.warning(ME+": No session security context, erase request is not encrypted");
421       }
422 
423       String[] rawReturnValArr = this.driver.erase(key, qos); // Invoke remote server
424 
425       connectionsHandler.getDispatchStatistic().incrNumErase(1);
426       
427       if (eraseEntry.wantReturnObj()) {
428          EraseReturnQos[] retQosArr = new EraseReturnQos[rawReturnValArr.length];
429          for (int ii=0; ii<rawReturnValArr.length; ii++) {
430             if (securityInterceptor != null) { // decrypt return value ...
431                CryptDataHolder dataHolder = new CryptDataHolder(MethodName.ERASE, new MsgUnitRaw(null, (byte[])null, rawReturnValArr[ii]));
432                dataHolder.setReturnValue(true);
433                String xmlQos = securityInterceptor.importMessage(dataHolder).getQos();
434                retQosArr[ii] = new EraseReturnQos(glob, xmlQos);
435             }
436          }
437 
438          try {
439             eraseEntry.setReturnObj(retQosArr);
440          }
441          catch (Throwable e) {
442             log.warning(ME+": Can't parse returned erase value setting to default: " + e.toString());
443             for (int ii=0; ii<rawReturnValArr.length; ii++) {
444                retQosArr[ii] = new EraseReturnQos(glob, "<qos/>");
445             }
446             eraseEntry.setReturnObj(retQosArr);
447          }
448       }
449    }
450 
451    /**
452     * Remember the cqd in this.connectQosData and return the encrypted string. 
453     */   
454    private String getEncryptedConnectQos(ConnectQosData cqd) throws XmlBlasterException {
455       if (cqd == null) {
456          return null;
457       }
458       cqd.addClientProperty(Constants.CLIENTPROPERTY_UTC, IsoDateParser.getCurrentUTCTimestamp());
459       this.connectQosData = cqd;
460       if (this.securityInterceptor != null) {  // We export/encrypt the message (call the interceptor)
461           if (log.isLoggable(Level.FINE)) log.fine(ME+": TODO: Crypting msg with exportMessage() is not supported for connect() as the server currently can't handle encrypted ConnectQos (for SOCKET see HandleClient.java:234)");
462           CryptDataHolder dataHolder = new CryptDataHolder(MethodName.CONNECT, new MsgUnitRaw(null, (byte[])null, cqd.toXml()));
463           String encryptedConnectQos = this.securityInterceptor.exportMessage(dataHolder).getQos();
464           if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted connect request.");
465           return encryptedConnectQos;
466       }
467       else {
468           log.warning(ME+": No session security context, connect request is not encrypted");
469           return cqd.toXml();
470       }
471    }
472 
473    /**
474     * Encrypt and send a connect request, decrypt the returned data
475     */
476    private void connect(MsgQueueEntry entry) throws XmlBlasterException {
477       MsgQueueConnectEntry connectEntry = (MsgQueueConnectEntry)entry;
478       //this.sessionName = connectEntry.getConnectQosData().getSessionName();
479 
480       String encryptedConnectQos = getEncryptedConnectQos(connectEntry.getConnectQosData());
481 
482       // TODO: pass connectEntry.getConnectQosData().getSender().getLoginName(); as this is used by SOCKET:requestId
483       String rawReturnVal = this.driver.connect(encryptedConnectQos); // Invoke remote server
484 
485       connectionsHandler.getDispatchStatistic().incrNumConnect(1);
486       
487       if (securityInterceptor != null) { // decrypt return value ...
488          CryptDataHolder dataHolder = new CryptDataHolder(MethodName.CONNECT, new MsgUnitRaw(null, (byte[])null, rawReturnVal));
489          dataHolder.setReturnValue(true);
490          rawReturnVal = securityInterceptor.importMessage(dataHolder).getQos();
491       }
492 
493       try {
494          this.connectReturnQos = new ConnectReturnQos(glob, rawReturnVal);
495          setCheckpointContext(this.connectReturnQos);
496       }
497       catch (XmlBlasterException e) {
498          log.severe(ME+": Can't parse returned connect QoS value '" + rawReturnVal + "': " + e.getMessage());
499          throw e;
500       }
501 
502       if (!connectEntry.getConnectQosData().getSessionName().isSession()) {
503          // We need to remember the server side assigned public session id for reconnect polling
504          // If do we should probably take a clone:
505          //ConnectQos connectQos = new ConnectQos(this.glob, this.connectReturnQos.getData());
506          ConnectQosData connectQos = connectEntry.getConnectQosData();
507          connectQos.setSessionName(this.connectReturnQos.getSessionName());
508          //this.sessionName = this.connectReturnQos.getSessionName();
509          connectQos.getSessionQos().setSecretSessionId(this.connectReturnQos.getSecretSessionId());
510          this.connectQosData = connectQos;
511       }
512 
513       if (connectEntry.wantReturnObj()) {
514          connectEntry.setReturnObj(this.connectReturnQos);
515       }
516       this.driver.setConnectReturnQos(this.connectReturnQos);
517    }
518 
519    /**
520     * Encrypt and send a disconnect request, decrypt the returned data
521     */
522    private void disconnect(MsgQueueEntry entry) throws XmlBlasterException {
523       MsgQueueDisconnectEntry disconnectEntry = (MsgQueueDisconnectEntry)entry;
524       String qos = disconnectEntry.getDisconnectQos().toXml();
525       if (securityInterceptor != null) {  // We export/encrypt the message (call the interceptor)
526          CryptDataHolder dataHolder = new CryptDataHolder(MethodName.DISCONNECT, new MsgUnitRaw(null, (byte[])null, qos));
527          qos = securityInterceptor.exportMessage(dataHolder).getQos();
528          if (log.isLoggable(Level.FINE)) log.fine(ME+": Exported/encrypted disconnect request.");
529       }
530       else {
531          log.warning(ME+": No session security context, disconnect request is not encrypted");
532       }
533 
534       //returns void
535       this.driver.disconnect(qos); // Invoke remote server
536    }
537 
538    /**
539     * @see org.xmlBlaster.util.dispatch.DispatchConnection#doPing(String)
540     */
541    public final String doPing(String data) throws XmlBlasterException {
542       String ret = driver.ping(data);
543       return (ret==null) ? "" : ret;
544    }
545 
546    /**
547     * Nothing to do here
548     */
549    public final void resetConnection() {
550       if (log.isLoggable(Level.FINE)) log.fine(ME+": resetConnection(): Initializing driver for polling");
551       this.connectReturnQos = null;
552       this.driver.resetConnection();
553    }
554 
555    /**
556     * On reconnect polling try to establish the connection. 
557     */
558    protected final void reconnect() throws XmlBlasterException {
559       if (this.driver == null) return;
560       if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering reconnect(" + this.driver.getProtocol() + ")");
561       
562       if (this.connectReturnQos != null) {
563          // needed to avoid failure
564          this.connectionsHandler.getDispatchStatistic().clearCurrentReads();
565          this.connectionsHandler.getDispatchStatistic().clearCurrentWrites();
566          super.ping("", false);
567          return;
568       }
569 
570       if (this.connectQosData == null) {
571          // We never had connected on application layer, so try low level layer only
572          this.driver.connectLowlevel((Address)super.address);
573          return;
574       }
575 
576       String encryptedConnectQos = getEncryptedConnectQos(this.connectQosData);
577       // low level connect (e.g. on TCP/IP layer) and remote invoke method connect()
578       String rawReturnVal = this.driver.connect(encryptedConnectQos); // Invoke remote server
579 
580       connectionsHandler.getDispatchStatistic().incrNumConnect(1);
581       
582       if (securityInterceptor != null) { // decrypt return value ...
583          CryptDataHolder dataHolder = new CryptDataHolder(MethodName.CONNECT, new MsgUnitRaw(null, (byte[])null, rawReturnVal));
584          dataHolder.setReturnValue(true);
585          rawReturnVal = securityInterceptor.importMessage(dataHolder).getQos();
586       }
587 
588       this.connectReturnQos = null;
589       try {
590          this.connectReturnQos = new ConnectReturnQos(glob, rawReturnVal);
591          setCheckpointContext(this.connectReturnQos);
592          if (this.connectEntry != null) {
593             if (this.connectEntry.wantReturnObj()) {
594                this.connectEntry.setReturnObj(this.connectReturnQos);
595             }
596             connectionsHandler.getDispatchManager().postSendNotification(this.connectEntry);
597          }
598       }
599       catch (XmlBlasterException e) {
600          log.severe(ME+": reconnect(): Can't parse returned connect QoS value '" + rawReturnVal + "': " + e.getMessage());
601          throw e;
602       }
603       this.driver.setConnectReturnQos(this.connectReturnQos);
604    }
605 
606    /**
607     * Stop all callback drivers of this client.
608     */
609    public final void shutdown(boolean delayed) throws XmlBlasterException {
610       super.shutdown(delayed);
611       if (driver != null) {
612          driver.shutdown();
613       }
614    }
615 
616    /**
617     * Dump state of this object into a XML ASCII string.
618     * <br>
619     * @param extraOffset indenting of tags for nice output
620     * @return internal state as an XML ASCII string
621     */
622    public final String toXml(String extraOffset) {
623       StringBuffer sb = new StringBuffer(256);
624       if (extraOffset == null) extraOffset = "";
625       String offset = Constants.OFFSET + extraOffset;
626 
627       sb.append(offset + "<ClientDispatchConnection>");
628       super.address.toXml(" " + offset);
629       if (driver == null)
630          sb.append(offset).append(" <noProtocolDriver />");
631       else
632          sb.append(offset).append(" <address type='" + driver.getProtocol() + "' state='" + getState() + "'/>");
633       sb.append(offset).append("</ClientDispatchConnection>");
634 
635       return sb.toString();
636    }
637    
638    public I_ProgressListener registerProgressListener(I_ProgressListener listener) {
639       if (this.driver == null) return null;
640       return this.driver.registerProgressListener(listener);
641    }
642 
643    protected boolean forcePingFailure() {
644       return true;
645    }
646    
647 }


syntax highlighted by Code2HTML, v. 0.9.1