1 package org.xmlBlaster.protocol.stomp;
  2 
  3 import java.io.UnsupportedEncodingException;
  4 import java.util.HashMap;
  5 import java.util.Map;
  6 import java.util.concurrent.ConcurrentHashMap;
  7 import java.util.logging.Level;
  8 import java.util.logging.Logger;
  9 
 10 import org.codehaus.stomp.ProtocolException;
 11 import org.codehaus.stomp.Stomp;
 12 import org.codehaus.stomp.StompFrame;
 13 import org.codehaus.stomp.StompHandler;
 14 import org.xmlBlaster.authentication.plugins.demo.SecurityQos;
 15 import org.xmlBlaster.engine.qos.ConnectQosServer;
 16 import org.xmlBlaster.engine.qos.ConnectReturnQosServer;
 17 import org.xmlBlaster.protocol.I_Authenticate;
 18 import org.xmlBlaster.protocol.I_CallbackDriver;
 19 import org.xmlBlaster.util.Global;
 20 import org.xmlBlaster.util.MsgUnit;
 21 import org.xmlBlaster.util.MsgUnitRaw;
 22 import org.xmlBlaster.util.ReplaceVariable;
 23 import org.xmlBlaster.util.SessionName;
 24 import org.xmlBlaster.util.Timestamp;
 25 import org.xmlBlaster.util.XmlBlasterException;
 26 import org.xmlBlaster.util.def.Constants;
 27 import org.xmlBlaster.util.def.ErrorCode;
 28 import org.xmlBlaster.util.def.MethodName;
 29 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 30 import org.xmlBlaster.util.key.KeyData;
 31 import org.xmlBlaster.util.plugin.PluginInfo;
 32 import org.xmlBlaster.util.qos.address.CallbackAddress;
 33 import org.xmlBlaster.util.xbformat.I_ProgressListener;
 34 
 35 /**
 36  * Protocol bridge, to bridge between xmlBlaster and STOMP protocol. implements
 37  * StompHandler and I_CallbackDriver for incoming STOMP messages and outgoing
 38  * XmlBlaster Messages.
 39  * <p />
 40  * One instance per client connect
 41  * 
 42  * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.stomp.html">Protocol integration</a>
 43  * @see <a href="http://stomp.codehaus.org/">Website</a>
 44  * @see <a href="http://stomp.codehaus.org/Protocol">Protocol describtion</a>
 45  * @author Dieter Saken (Marcel Ruff)
 46  */
 47 public class XbStompInOutBridge implements StompHandler, I_CallbackDriver {
 48    private static Logger log = Logger.getLogger(XbStompInOutBridge.class
 49          .getName());
 50    public static final String XB_SERVER_COMMAND_PING = "PING";
 51    public static final String XB_SERVER_HEADER_KEY = "key";
 52    public static final String XB_SERVER_HEADER_QOS = "qos";
 53    public String ME = "XbStompInOutBridge";
 54    public static final String PROTOCOL_NAME = "STOMP";
 55    private final StompHandler outputHandler;
 56    private final XbStompDriver driver;
 57    private final Global glob;
 58 
 59    private final ConcurrentHashMap<String, RequestHolder> framesToAck = new ConcurrentHashMap<String, RequestHolder>();
 60    private String secretSessionId;
 61    private I_Authenticate authenticate;
 62    private org.xmlBlaster.protocol.I_XmlBlaster xb;
 63    private boolean stompOpened;
 64 
 65    /** How long to block on remote call waiting on ping responses */
 66    protected long pingResponseTimeout;
 67    /** How long to block on remote call waiting on update responses */
 68    protected long updateResponseTimeout;
 69    
 70    private ConnectQosServer connectQos;
 71    private String remoteAddress = "";
 72 
 73    public XbStompInOutBridge(Global glob, XbStompDriver driver,
 74          StompHandler outputHandler) {
 75       this.glob = glob;
 76       this.driver = driver;
 77       this.outputHandler = outputHandler; // is  instanceof TcpTransport
 78       this.stompOpened = true;
 79       try {
 80          // "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
 81          this.remoteAddress = outputHandler.toString();
 82       }
 83       catch (Throwable e) {
 84          e.printStackTrace();
 85       }
 86    }
 87 
 88    /**
 89     * All stomp threads contain the remote address alreay, but not our internal threads
 90     */
 91    public String getExtendedLogId() {
 92       return ME + " " + this.remoteAddress;
 93    }
 94 
 95    /**
 96     * Called from xmlBlaster core on disconnect
 97     * Called on exception or driver deactivate
 98     */
 99    public void shutdown() {
100       log.info(getExtendedLogId() + " will shutdown");
101         _destroy();
102    }
103 
104    /**
105     * Callback from #StompHandler
106     */
107    public void close() {
108       // The stomp tread name contains the remote IP
109       log.info(ME + " close from StompHandler");
110       _destroy();
111    }
112    
113    private void _destroy() {
114         driver.removeClient(this);
115       try {
116          synchronized (this) {
117             if (this.stompOpened) {
118                this.stompOpened = false;
119                // somebody is trying to close me all the time :-(
120                I_Authenticate auth = this.authenticate;
121                if (auth != null) {
122                   // From the point of view of the incoming client connection we
123                   // are
124                   // dead
125                   // The callback dispatch framework may have another point of
126                   // view
127                   // (which is not of interest here)
128                   try {
129                      auth.connectionState(this.secretSessionId,
130                            ConnectionStateEnum.DEAD);
131                   }
132                   catch (Throwable e) {
133                      e.printStackTrace();
134                   }
135                }
136                this.secretSessionId = null;
137                this.authenticate = null;
138                this.xb = null;
139                log.info(getExtendedLogId() + " gets closed");
140                try {
141                   outputHandler.close();
142                } catch (Throwable e) {
143                   e.printStackTrace();
144                }
145                notifyAllFrameAcks();
146             }
147          }
148       } catch (Throwable e) {
149          e.printStackTrace();
150          log.severe(e.toString());
151       }
152    }
153 
154    public int notifyAllFrameAcks() {
155       RequestHolder[] arr = getFramesToAck();
156       log.info(ME + ". Close called with " + arr.length + " waiting frames, notify them now");
157       this.framesToAck.clear();
158       for (RequestHolder requestHolder : arr) {
159          requestHolder.shutdown = true;
160          notifyFrameAck(requestHolder);
161       }
162       return arr.length;
163    }
164 
165    public RequestHolder[] getFramesToAck() {
166       synchronized (this.framesToAck) {
167          return (RequestHolder[])this.framesToAck.values().toArray(new RequestHolder[this.framesToAck.size()]);
168       }
169    }
170 
171    /*
172     * This Code Area handles the incoming Stomp messages by implementing the
173     * StompHandler Interface
174     */
175 
176    private boolean checkStompConnected() {
177       return this.stompOpened;
178    }
179 
180    private boolean checkXbConnected() {
181       return this.stompOpened && this.xb != null;
182    }
183 
184    /**
185     * Callback from Stomp
186     */
187    public void onException(Exception e) {
188       log.warning(ME + " onException from Stomp: " + e.toString());
189       _destroy();
190    }
191    
192    //@SuppressWarnings("unknown")
193    private String dump(StompFrame frame) {
194       StringBuffer buf = new StringBuffer();
195       try {
196          if (frame == null) {
197             log.severe("Can't dump null frame");
198             return "Can't dump null frame";
199          }
200          buf.append("action=").append(frame.getAction()).append(",");
201          Map<String, Object> map = frame.getHeaders();
202          java.util.Iterator<String> it = map.keySet().iterator();
203          while (it.hasNext()) {
204             String key = it.next();
205             buf.append(key).append("=").append(map.get(key)).append(",");
206          }
207          buf.append("content=").append(new String(frame.getContent(), "UTF-8"));
208       }
209       catch (Throwable e) {
210          buf.append("Can't dump frame: " + e.toString());
211          log.severe("Can't dump frame: " + e.toString());
212       }
213       return buf.toString();
214    }
215 
216    /**
217     * Callback from stomp layer. 
218     */
219    public void onStompFrame(StompFrame frame) throws Exception {
220       try {
221          String action = frame.getAction();
222          if (action == null) {
223             log.warning(ME + " Ignoring null stomp action: " + dump(frame));
224             return;
225          }
226          if (log.isLoggable(Level.FINE))
227             log.fine(getExtendedLogId() + " onStompFrame " + dump(frame));
228          action = action.trim();
229          if (action.startsWith(Stomp.Commands.CONNECT)) {
230             onStompConnect(frame);
231          } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
232             onStompSubscribe(frame);
233          } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
234             onStompUnsubscribe(frame);
235          } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
236             onStompDisconnect(frame);
237          } else if (action.startsWith(Stomp.Commands.SEND)) {
238             onStompSend(frame);
239          } else if (action.startsWith(Stomp.Commands.ACK)) {
240             onStompAck(frame);
241          } else if (action.startsWith(Stomp.Commands.ABORT) || action.startsWith("NAK")) {
242             onStompNak(frame);
243          } else {
244             throw new ProtocolException("STOMP action: " + action
245                   + "not supported or unknown: ");
246          }
247       }
248       catch (ProtocolException e) {
249          log.severe(getExtendedLogId() + e.toString() + ": " + dump(frame));
250          throw e; // re-throw
251       }
252       catch (Exception e) {
253          e.printStackTrace();
254          log.warning(getExtendedLogId() + " onStompFrame failed: " + e.toString() + ": " + dump(frame));
255          throw e;
256       }
257       catch (Throwable e) {
258          e.printStackTrace();
259          log.severe(getExtendedLogId() + " onStompFrame failed: " + e.toString() + ": " + dump(frame));
260          _destroy();
261       }
262    }
263 
264    @SuppressWarnings("unchecked")
265    protected void onStompConnect(StompFrame command) {
266       try {
267          final Map headers = command.getHeaders();
268          String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
269          String login = (String) headers.get(Stomp.Headers.Connect.LOGIN);
270          org.xmlBlaster.engine.ServerScope engineGlob = (org.xmlBlaster.engine.ServerScope) glob
271                .getObjectEntry(Constants.OBJECT_ENTRY_ServerScope);
272          if (engineGlob == null)
273             throw new XmlBlasterException(this.glob,
274                   ErrorCode.INTERNAL_UNKNOWN, ME + ".init",
275                   "could not retreive the ServerNodeScope. Am I really on the server side ?");
276          this.authenticate = engineGlob.getAuthenticate();
277          xb = this.authenticate.getXmlBlaster();
278          if (this.authenticate == null) {
279             throw new XmlBlasterException(this.glob,
280                   ErrorCode.INTERNAL_UNKNOWN, ME + ".init",
281                   "authenticate object is null");
282          }
283          if (login != null) {
284             ConnectQosServer conQos = new ConnectQosServer(glob, qos);
285             
286             if (conQos.getData().getClientProperty("deviceGuid", (String)null) != null) {
287                StringBuffer buf = new StringBuffer();
288                buf.append("loginName=").append(conQos.getUserId()).append(",");
289                buf.append("version=").append(conQos.getData().getClientProperty("version", "")).append(",");
290                buf.append("deviceName=").append(conQos.getData().getClientProperty("deviceName", "")).append(",");
291                buf.append("platform.model=").append(conQos.getData().getClientProperty("platform.model", "")).append(",");
292                buf.append("platform.name=").append(conQos.getData().getClientProperty("platform.name", "")).append(",");
293                buf.append("batterylevel=").append(conQos.getData().getClientProperty("batterylevel", "")).append(",");
294                buf.append("version=").append(conQos.getData().getClientProperty("version", "")).append(",");
295                buf.append("version.OS=").append(conQos.getData().getClientProperty("version.OS", "")).append(",");
296                buf.append("deviceGuid=").append(conQos.getData().getClientProperty("deviceGuid", ""));
297                log.info(ME + "Connecting device: " + buf.toString());
298             }
299             
300             if (conQos.getSecurityQos() == null) {
301                String clientId = (String) headers
302                      .get(Stomp.Headers.Connect.CLIENT_ID);
303                String passcode = (String) headers
304                      .get(Stomp.Headers.Connect.PASSCODE);
305                if (clientId != null || passcode != null) {
306                   SecurityQos securityQos = new SecurityQos(glob);
307                   securityQos.setUserId(clientId);
308                   securityQos.setCredential(passcode);
309                   conQos.getData().setSecurityQos(securityQos);
310                   if (conQos.getSessionName() == null
311                         || conQos.getSessionName().getLoginName() == null) {
312                      conQos.setSessionName(new SessionName(engineGlob,
313                            clientId));
314                   }
315                } else {
316                   throw new XmlBlasterException(
317                         glob,
318                         ErrorCode.USER_SECURITY_AUTHENTICATION_ILLEGALARGUMENT,
319                         ME, "connect() without securityQos");
320                }
321             }
322             // conQos.getSecurityQos().setClientIp(outputHandler.getIp());
323             // conQos.setAddressServer(driver.getTcpServer().getBindLocation());
324             // setLoginName(conQos.getSessionName().getRelativeName());
325 
326             CallbackAddress[] cbArr = conQos.getSessionCbQueueProperty()
327                   .getCallbackAddresses();
328             for (int ii = 0; cbArr != null && ii < cbArr.length; ii++) {
329                cbArr[ii].setRawAddress(this.remoteAddress);//driver.getRawAddress());
330                try {
331                   cbArr[ii].setCallbackDriver(this);
332                } catch (Exception e) {
333                   e.printStackTrace();
334                   log.severe(ME + " Internal error during setCallbackDriver: " + e.toString() + ": " + dump(command));
335                }
336             }
337             ConnectReturnQosServer retQos = authenticate.connect(conQos);
338             this.connectQos = conQos;
339             this.secretSessionId = retQos.getSecretSessionId();
340             ME = retQos.getSessionName().getRelativeName();
341 
342             Map<String, String> responseHeaders = new HashMap<String, String>();
343             responseHeaders.put(Stomp.Headers.Connected.SESSION,
344                   this.secretSessionId);
345             String requestId = (String) headers
346                   .get(Stomp.Headers.Connect.REQUEST_ID);
347             if (requestId == null) {
348                requestId = (String) headers
349                      .get(Stomp.Headers.RECEIPT_REQUESTED);
350             }
351             if (requestId != null) {
352                responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID,
353                      requestId);
354                responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID,
355                      requestId);
356             }
357             StompFrame sc = new StompFrame();
358             sc.setAction(Stomp.Responses.CONNECTED);
359             sc.setHeaders((Map)responseHeaders);
360             sendFrameNoWait(sc);
361             log.info(ME + " ConnectReturn Qos is send");
362          }
363       } catch (XmlBlasterException e) {
364          this.connectQos = null;
365          log.warning(ME + " Connect failed: " + e.toString() + ": " + dump(command));
366          sendExeption(command, e);
367       }
368    }
369    
370    public String getLoginName() {
371       ConnectQosServer cs = this.connectQos;
372       if (cs != null) {
373          return cs.getSessionName().getLoginName();
374       }
375       return null;
376    }
377 
378    protected void onStompDisconnect(StompFrame command) {
379       if (!checkXbConnected()) {
380          sendExeption(command,
381                new XmlBlasterException(glob,
382                      ErrorCode.USER_WRONG_API_USAGE,
383                      "onStompDisconnect: Please call connect first: " + dump(command)));
384          return;
385       }
386       try {
387          @SuppressWarnings("unchecked")
388          final Map headers = command.getHeaders();
389          String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
390          authenticate.disconnect(null, secretSessionId, qos);
391       } catch (XmlBlasterException e) {
392          sendExeption(command, e);
393       }
394       _destroy();
395    }
396 
397    protected void onStompSend(StompFrame command) {
398       try {
399          if (!checkXbConnected()) {
400             sendExeption(command, new XmlBlasterException(glob,
401                   ErrorCode.USER_WRONG_API_USAGE,
402                   "onStompSend: Please call connect first: " + dump(command)));
403             return;
404          }
405          @SuppressWarnings("unchecked")
406          Map headers = command.getHeaders();
407          String key = (String) headers.get(XB_SERVER_HEADER_KEY);
408          String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
409          MsgUnitRaw msg = new MsgUnitRaw(key, command.getContent(), qos);
410          MsgUnitRaw[] msgArr = new MsgUnitRaw[1];
411          msgArr[0] = msg;
412          xb.publishArr(null, secretSessionId, msgArr);
413          sendResponse(command);
414       } catch (XmlBlasterException e) {
415          sendExeption(command, e);
416       }
417    }
418 
419    protected void onStompSubscribe(StompFrame command) throws Exception {
420       try {
421          if (!checkXbConnected()) {
422             sendExeption(command, new XmlBlasterException(glob,
423                   ErrorCode.USER_WRONG_API_USAGE,
424                   "onStompSubscribe: Please call connect first: " + dump(command)));
425             return;
426          }
427          @SuppressWarnings("unchecked")
428          final Map headers = command.getHeaders();
429          String key = (String) headers.get(XB_SERVER_HEADER_KEY);
430          String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
431          xb.subscribe(null, secretSessionId, key, qos);
432 
433          sendResponse(command);
434       } catch (XmlBlasterException e) {
435          sendExeption(command, e);
436       }
437    }
438 
439    protected void onStompUnsubscribe(StompFrame command) throws Exception {
440       try {
441          if (!checkXbConnected()) {
442             sendExeption(command, new XmlBlasterException(glob,
443                   ErrorCode.USER_WRONG_API_USAGE,
444                   "onStompUnsubscribe: Please call connect first: " + dump(command)));
445             return;
446          }
447          @SuppressWarnings("unchecked")
448          final Map headers = command.getHeaders();
449          String key = (String) headers.get(XB_SERVER_HEADER_KEY);
450          String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
451          xb.unSubscribe(null, secretSessionId, key, qos);
452          sendResponse(command);
453       } catch (XmlBlasterException e) {
454          sendExeption(command, e);
455       }
456    }
457 
458    protected void onStompAck(StompFrame command) throws Exception {
459       if (!checkStompConnected())
460          return;
461       @SuppressWarnings("unchecked")
462       Map headers = command.getHeaders();
463       String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
464       if (messageId == null) {
465          log.warning(ME + " ACK API error: missing messageId: " + dump(command));
466          throw new ProtocolException(
467                ME + " ACK received without a message-id to acknowledge!");
468       }
469 
470       RequestHolder requestHolder = framesToAck.get(messageId);
471       if (requestHolder == null) {
472          // Happens on multiple Ack or on wrong messageId
473          log.severe(getExtendedLogId() + " Internal ACK API error: messageId=" + messageId + " not found in framesToAck hashtable: " + dump(command));
474          return;
475       }
476       
477       requestHolder.returnQos = (String) headers.get(XB_SERVER_HEADER_QOS);
478       if (log.isLoggable(Level.FINE)) log.fine(ME + " " + requestHolder.toString() + " ACK release and notify ...");
479       
480       removeFrameForMessageId(messageId);
481       notifyFrameAck(requestHolder);
482    }
483 
484    protected void onStompNak(StompFrame command) throws Exception {
485       if (!checkStompConnected())
486          return;
487       @SuppressWarnings("unchecked")
488       Map headers = command.getHeaders();
489       String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
490       if (messageId == null) {
491          log.warning(ME + " NAK API error: missing messageId: " + dump(command));
492          throw new ProtocolException(
493                ME + " NAK received without a message-id to acknowledge!");
494       }
495       String errorCode = (String) headers.get("errorCode");
496       ErrorCode errorCodeEnum = ErrorCode.toErrorCode(errorCode, ErrorCode.USER_CLIENTCODE);
497       String message = (String) headers.get(Stomp.Headers.Error.MESSAGE);
498       if (message == null)
499          message = "UPDATE failed, client has rejected message";
500 
501       RequestHolder requestHolder = framesToAck.get(messageId);
502       if (requestHolder == null) {
503          // Happens on multiple Ack or on wrong messageId
504          log.warning(getExtendedLogId() + " Internal NAK API error: messageId=" + messageId + " not found in framesToAck hashtable: " + dump(command));
505       }
506       requestHolder.xmlBlasterException = new XmlBlasterException(glob, errorCodeEnum, message);
507       
508       log.info(ME + " NAK release and notify ... " + errorCode);
509       removeFrameForMessageId(messageId);
510       notifyFrameAck(requestHolder);
511    }
512 
513    private boolean notifyFrameAck(RequestHolder requestHolder) {
514       if (requestHolder != null && requestHolder.stompFrame != null) {
515          synchronized (requestHolder.stompFrame) {
516             try {
517                requestHolder.stompFrame.notify();
518                return true;
519             }
520             catch (Throwable e) {
521                e.printStackTrace();
522                return false;
523             }
524          }
525       }
526       return false;
527    }
528 
529    // ===================== I_CallbackDriver ==========================
530 
531    /*
532     * This Code Area handles the outgoing xmlBlaster messages by implementing
533     * the I_CallbackDRiver Interface
534     */
535 
536    //@Override()
537    public String getName() {
538       return ME;
539    }
540 
541    //@Override()
542    public String getProtocolId() {
543       return PROTOCOL_NAME;
544    }
545 
546    //@Override()
547    public String getRawAddress() {
548       return outputHandler.toString();
549    }
550 
551    /**
552     * How long to block on remote call waiting on a ping response. The default
553     * is to block for one minute This method can be overwritten by
554     * implementations like EMAIL
555     */
556    public long getDefaultPingResponseTimeout() {
557       return Constants.MINUTE_IN_MILLIS;
558    }
559 
560    /**
561     * How long to block on remote call waiting on a update() response. The
562     * default is to block forever This method can be overwritten by
563     * implementations like EMAIL
564     */
565    public long getDefaultUpdateResponseTimeout() {
566       return Integer.MAX_VALUE;
567    }
568 
569    /**
570     * Set the given millis to protect against blocking client for ping
571     * invocations.
572     * 
573     * @param millis
574     *            If <= 0 it is set to the default (one minute). An argument
575     *            less than or equal to zero means not to wait at all and is not
576     *            supported
577     */
578    public final void setPingResponseTimeout(long millis) {
579       if (millis <= 0L) {
580          log.warning(ME + " pingResponseTimeout=" + millis
581                + " is invalid, setting it to "
582                + getDefaultPingResponseTimeout() + " millis");
583          this.pingResponseTimeout = getDefaultPingResponseTimeout();
584       } else
585          this.pingResponseTimeout = millis;
586    }
587 
588    /**
589     * Set the given millis to protect against blocking client for update()
590     * invocations.
591     * 
592     * @param millis
593     *            If <= 0 it is set to the default (one minute). An argument
594     *            less than or equal to zero means not to wait at all and is not
595     *            supported
596     */
597    public final void setUpdateResponseTimeout(long millis) {
598       if (millis <= 0L) {
599          log.warning(ME + " updateResponseTimeout=" + millis
600                + " is invalid, setting it to "
601                + getDefaultUpdateResponseTimeout() + " millis");
602          this.updateResponseTimeout = getDefaultUpdateResponseTimeout();
603       } else
604          this.updateResponseTimeout = millis;
605    }
606 
607    /**
608     * @return Returns the responseTimeout.
609     */
610    public long getResponseTimeout(MethodName methodName) {
611       if (MethodName.PING.equals(methodName)) {
612          return this.pingResponseTimeout;
613       } else if (MethodName.UPDATE.equals(methodName)) {
614          return this.updateResponseTimeout;
615       }
616       return this.updateResponseTimeout;
617       // return this.responseTimeout;
618    }
619 
620    //@Override()
621    public void init(Global glob, CallbackAddress addressConfig)
622          throws XmlBlasterException {
623 
624       setPingResponseTimeout(addressConfig.getEnv("pingResponseTimeout",
625             getDefaultPingResponseTimeout()).getValue());
626       if (log.isLoggable(Level.FINE))
627          log.fine(addressConfig.getEnvLookupKey("pingResponseTimeout") + "="
628                + this.pingResponseTimeout);
629 
630       setUpdateResponseTimeout(addressConfig.getEnv("updateResponseTimeout",
631             getDefaultUpdateResponseTimeout()).getValue());
632       if (log.isLoggable(Level.FINE))
633          log.fine(addressConfig.getEnvLookupKey("updateResponseTimeout")
634                + "=" + this.updateResponseTimeout);
635    }
636 
637    //@Override()
638    public boolean isAlive() {
639       return this.stompOpened;
640    }
641 
642    //@Override()
643    public String ping(String qos) throws XmlBlasterException {
644       // never ping client without session
645       // <qos><state info='INITIAL'/></qos>
646       if (log.isLoggable(Level.FINE)) log.fine("Ping again");
647       if (qos != null && qos.indexOf("INITIAL") != -1)
648          return "<qos/>";
649       if (!checkStompConnected())
650          throw new XmlBlasterException(glob,
651                ErrorCode.COMMUNICATION_NOCONNECTION, ME,
652                "Stomp callback ping failed");
653       StompFrame frame = new StompFrame();
654       frame.setAction(XB_SERVER_COMMAND_PING);
655       frame.getHeaders().put(XB_SERVER_HEADER_QOS, qos);
656       String returnValue = sendFrameAndWait(frame, MethodName.PING);
657       return returnValue;
658    }
659 
660    public I_ProgressListener registerProgressListener(
661          I_ProgressListener listener) {
662       return null;
663    }
664    
665    /**
666     * HTTP header key/value should not contain new line. 
667     * @param str
668     * @return
669     */
670    private String cleanNewlines(String str) {
671       return ReplaceVariable.replaceAll(str, "\n", " ");
672    }
673    
674    private KeyData getKeyData(MsgUnitRaw msgUnitRaw) {
675       try {
676          if (msgUnitRaw == null)
677             return null;
678          MsgUnit msgUnit = (MsgUnit)msgUnitRaw.getMsgUnit();
679          if (msgUnit == null)
680             return null;
681          return msgUnit.getKeyData();
682       }
683       catch(Throwable e) {
684          e.printStackTrace();
685          return null;
686       }
687    }
688    
689    private String getContentType(MsgUnitRaw msgUnitRaw) {
690       KeyData keyData = getKeyData(msgUnitRaw);
691       if (keyData != null) {
692          String contentType = keyData.getContentMime();
693          if (contentType != null && contentType.length() > 0)
694             return contentType;
695       }
696       return "text/xml";
697    }
698 
699    public String[] sendUpdate(MsgUnitRaw[] msgArr) throws XmlBlasterException {
700       String[] ret = new String[msgArr.length];
701       int i = 0;
702       for (MsgUnitRaw msgUnitRaw : msgArr) {
703          StompFrame frame = new StompFrame();
704          // MsgUnit msg = (MsgUnit) msgUnit.getMsgUnit();
705          // String senderLoginName =
706          // msg.getQosData().getSender().getAbsoluteName();
707          MsgUnit msg = (MsgUnit) msgUnitRaw.getMsgUnit();
708          String topicId = msg.getKeyOid();
709          frame.setAction(Stomp.Responses.MESSAGE);
710          frame.getHeaders().put(Stomp.Headers.Message.DESTINATION, topicId);
711          frame.getHeaders().put("methodName", MethodName.UPDATE.toString());
712          String contentType = getContentType(msgUnitRaw);
713          frame.getHeaders().put("content-type", contentType); // "text/xml"
714          frame.getHeaders().put(XB_SERVER_HEADER_KEY, cleanNewlines(msgUnitRaw.getKey()));
715          frame.getHeaders().put(XB_SERVER_HEADER_QOS, cleanNewlines(msgUnitRaw.getQos()));
716          byte[] content = msgUnitRaw.getContent();
717          frame.getHeaders().put(Stomp.Headers.CONTENT_LENGTH, content.length);
718          frame.setContent(content);
719          if (log.isLoggable(Level.FINER)) log.finer(ME + " UPDATE Sending now ... " + msgUnitRaw.getKey().trim());
720          ret[i] = sendFrameAndWait(frame, MethodName.UPDATE);
721          if (log.isLoggable(Level.FINER)) log.finer(ME + " UPDATE Done " + msgUnitRaw.getKey().trim() + ": " + ret[i]);
722          i++;
723       }
724       return ret;
725    }
726 
727    public void sendUpdateOneway(MsgUnitRaw[] msgArr)
728          throws XmlBlasterException {
729       for (MsgUnitRaw msgUnitRaw : msgArr) {
730          try {
731             StompFrame frame = new StompFrame();
732             MsgUnit msg = (MsgUnit) msgUnitRaw.getMsgUnit();
733             String topicId = msg.getKeyOid();
734             frame.setAction(Stomp.Responses.MESSAGE);
735             frame.getHeaders().put(Stomp.Headers.Message.DESTINATION,
736                   topicId);
737             frame.getHeaders().put("methodName", MethodName.UPDATE_ONEWAY);
738             frame.getHeaders().put("content-type", getContentType(msgUnitRaw)); // "text/xml"
739             frame.getHeaders().put(XB_SERVER_HEADER_KEY, cleanNewlines(msgUnitRaw.getKey()));
740             frame.getHeaders().put(XB_SERVER_HEADER_QOS, cleanNewlines(msgUnitRaw.getQos()));
741             frame.getHeaders().put(Stomp.Headers.CONTENT_LENGTH,
742                   msgUnitRaw.getContent().length);
743             frame.setContent(msgUnitRaw.getContent());
744             sendFrameNoWait(frame);
745          } catch (Exception e) {
746             log.severe(e.getMessage());
747          }
748       }
749 
750    }
751 
752    public String getType() {
753       return PROTOCOL_NAME;
754    }
755 
756    public String getVersion() {
757       return "1.0";
758    }
759 
760    public void init(Global glob, PluginInfo pluginInfo)
761          throws XmlBlasterException {
762    }
763 
764    /*
765     * private internal stuff
766     */
767 
768    private RequestHolder getFrameForMessageId(String messageId) {
769       return (framesToAck.get(messageId));
770    }
771 
772    private RequestHolder registerFrame(StompFrame frame) {
773       //String messageId = "" + new Timestamp().getTimestamp();
774       //String messageId = frame.getAction() + "-" + secretSessionId + "-" + System.currentTimeMillis();
775       String messageId = frame.getAction() + "-" + new Timestamp().getTimestamp();
776       frame.getHeaders().put(Stomp.Headers.Message.MESSAGE_ID, messageId);
777       RequestHolder requestHolder = new RequestHolder(messageId, frame);
778       framesToAck.put(messageId, requestHolder);
779       return requestHolder;
780    }
781 
782    private void removeFrameForMessageId(String messageId) {
783       if (messageId == null)
784          return;
785       if (framesToAck.get(messageId) != null)
786          framesToAck.remove(messageId);
787    }
788 
789    private void sendFrameNoWait(StompFrame frame) throws XmlBlasterException {
790       checkStompConnected();
791       try {
792          outputHandler.onStompFrame(frame);
793       } catch (Exception e) {
794          e.printStackTrace();
795          throw new XmlBlasterException(this.glob,
796                ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME
797                      + ".sendFrameNoWait", e.getMessage());
798       }
799    }
800 
801    private String sendFrameAndWait(StompFrame frame, MethodName methodName)
802          throws XmlBlasterException {
803       final RequestHolder requestHolder = registerFrame(frame);
804       try {
805          checkStompConnected();
806          long timeout = getResponseTimeout(methodName);
807          if (log.isLoggable(Level.FINE))
808             log.fine(ME + " " + requestHolder.toString() + ": Sending now " + methodName.toString() + "...");
809          synchronized (frame) {
810             outputHandler.onStompFrame(frame);
811             frame.wait(timeout); // TODO: Port to CountDownLatch cdl = new CountDownLatch(1);
812          }
813          // Timeout occurred if requestHolder was not removed by ACK or NAK:
814          if (requestHolder == getFrameForMessageId(requestHolder.messageId)) {
815             String text = "methodName=" + methodName.toString() + " messageId=" + requestHolder.messageId + ": No Ack recieved in timeoutMillis=" + timeout;
816             log.warning(text + ": " + dump(frame));
817             removeFrameForMessageId(requestHolder.messageId);
818             throw new XmlBlasterException(this.glob,
819                   ErrorCode.COMMUNICATION_TIMEOUT, ME
820                         + ".sendFrameAndWait",
821                   text);
822          }
823       } catch (Exception e) {
824          if (e instanceof XmlBlasterException)
825             throw (XmlBlasterException) e;
826          else
827             throw new XmlBlasterException(this.glob,
828                   ErrorCode.COMMUNICATION_NOCONNECTION_DEAD,
829                   // ErrorCode.COMMUNICATION_NOCONNECTION_CALLBACKSERVER_NOTAVAILABLE,
830                   ME + ".sendFrameAndWait", e.getMessage());
831       }
832       // http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.update.html
833       if (requestHolder.shutdown) {
834          // connection was lost
835          log.warning(getExtendedLogId() + requestHolder.toString() + ": Shutdown during callback delivery: " + dump(frame));
836          throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_RESPONSETIMEOUT, ME + " Shutdown during update delivery");
837          //return "<qos><state id='FAIL'/>";
838       }
839       else if (requestHolder.xmlBlasterException != null) { // on XmlBlasterException
840          log.warning(ME + " " + requestHolder.toString() + ": Exception from client: " + requestHolder.xmlBlasterException.getMessage() + ": " + dump(frame));
841          throw requestHolder.xmlBlasterException;
842       }
843       else { // requestHolder.returnQos should filled
844          if (log.isLoggable(Level.FINE))
845             log.fine(ME + " " + requestHolder.toString() + ": Successfully send and acknowledged " + requestHolder.returnQos);
846          return (requestHolder.returnQos == null) ?  "<qos/>" : requestHolder.returnQos;
847       }
848    }
849 
850    @SuppressWarnings("unchecked")
851    private void sendResponse(StompFrame command) throws XmlBlasterException {
852       final String receiptId = (String) command.getHeaders().get(
853             Stomp.Headers.RECEIPT_REQUESTED);
854       // A response may not be needed.
855       if (receiptId != null) {
856          StompFrame sc = new StompFrame();
857          sc.setAction(Stomp.Responses.RECEIPT);
858          sc.setHeaders(new HashMap());
859          sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
860          sendFrameNoWait(sc);
861       }
862    }
863 
864    public static byte[] toUtf8Bytes(String s) {
865       if (s == null || s.length() == 0)
866          return new byte[0];
867       try {
868          return s.getBytes(Constants.UTF8_ENCODING);
869       } catch (UnsupportedEncodingException e) {
870          log.severe("PANIC in WatcheeConstants.toUtf8Bytes(" + s
871                + ", " + Constants.UTF8_ENCODING + "): " + e.toString());
872          e.printStackTrace();
873          return s.getBytes();
874       }
875    }
876 
877    @SuppressWarnings("unchecked")
878    private void sendExeption(StompFrame command, XmlBlasterException e) {
879       try {
880          final String receiptId = (String) command.getHeaders().get(
881                Stomp.Headers.RECEIPT_REQUESTED);
882          StompFrame sc = new StompFrame();
883          sc.setAction(Stomp.Responses.ERROR);
884          sc.setHeaders(new HashMap());
885          sc.getHeaders().put("errorCode", e.getErrorCodeStr()); // xmlBlaster way
886          sc.getHeaders().put(Stomp.Responses.MESSAGE, e.getErrorCodeStr()); // stomp
887                                                             // wants
888                                                             // it
889          if (receiptId != null) {
890             sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
891          }
892          String text = e.getMessage();
893          sc.getHeaders().put(Stomp.Headers.CONTENT_LENGTH, text.length());
894          sc.setContent(toUtf8Bytes(text));
895          try {
896             sendFrameNoWait(sc);
897          } catch (XmlBlasterException e1) {
898             e1.printStackTrace();
899          }
900          log.warning(ME + "sendException" + e.getMessage() + ": " + dump(command));
901       }
902       catch (Throwable e2) {
903          e2.printStackTrace();
904          log.severe("sendExeption failed for " + e.toString() + ": " + e2.toString() + ": " + dump(command));
905       }
906    }
907 }


syntax highlighted by Code2HTML, v. 0.9.1