1 package org.xmlBlaster.protocol.stomp;
  2 
  3 import java.io.UnsupportedEncodingException;
  4 import java.util.HashMap;
  5 import java.util.Map;
  6 import java.util.concurrent.ConcurrentHashMap;
  7 import java.util.logging.Level;
  8 import java.util.logging.Logger;
  9 
 10 import org.codehaus.stomp.ProtocolException;
 11 import org.codehaus.stomp.Stomp;
 12 import org.codehaus.stomp.StompFrame;
 13 import org.codehaus.stomp.StompHandler;
 14 import org.xmlBlaster.authentication.plugins.demo.SecurityQos;
 15 import org.xmlBlaster.engine.qos.ConnectQosServer;
 16 import org.xmlBlaster.engine.qos.ConnectReturnQosServer;
 17 import org.xmlBlaster.protocol.I_Authenticate;
 18 import org.xmlBlaster.protocol.I_CallbackDriver;
 19 import org.xmlBlaster.util.Global;
 20 import org.xmlBlaster.util.MsgUnit;
 21 import org.xmlBlaster.util.MsgUnitRaw;
 22 import org.xmlBlaster.util.ReplaceVariable;
 23 import org.xmlBlaster.util.SessionName;
 24 import org.xmlBlaster.util.Timestamp;
 25 import org.xmlBlaster.util.XmlBlasterException;
 26 import org.xmlBlaster.util.def.Constants;
 27 import org.xmlBlaster.util.def.ErrorCode;
 28 import org.xmlBlaster.util.def.MethodName;
 29 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 30 import org.xmlBlaster.util.key.KeyData;
 31 import org.xmlBlaster.util.plugin.PluginInfo;
 32 import org.xmlBlaster.util.qos.address.CallbackAddress;
 33 import org.xmlBlaster.util.xbformat.I_ProgressListener;
 34 
 35 /**
 36  * Protocol bridge, to bridge between xmlBlaster and STOMP protocol. implements
 37  * StompHandler and I_CallbackDriver for incoming STOMP messages and outgoing
 38  * XmlBlaster Messages.
 39  * <p />
 40  * One instance per client connect
 41  * 
 42  * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.stomp.html">Protocol integration</a>
 43  * @see <a href="http://stomp.codehaus.org/">Website</a>
 44  * @see <a href="http://stomp.codehaus.org/Protocol">Protocol describtion</a>
 45  * @author Dieter Saken (Marcel Ruff)
 46  */
 47 public class XbStompInOutBridge implements StompHandler, I_CallbackDriver {
 48    private static Logger log = Logger.getLogger(XbStompInOutBridge.class
 49          .getName());
 50    public static final String XB_SERVER_COMMAND_PING = "PING";
 51    public static final String XB_SERVER_HEADER_KEY = "key";
 52    public static final String XB_SERVER_HEADER_QOS = "qos";
 53    public String ME = "XbStompInOutBridge";
 54    public static final String PROTOCOL_NAME = "STOMP";
 55    private final StompHandler outputHandler;
 56    private final XbStompDriver driver;
 57    private final Global glob;
 58 
 59    private final ConcurrentHashMap<String, RequestHolder> framesToAck = new ConcurrentHashMap<String, RequestHolder>();
 60    private String secretSessionId;
 61    private I_Authenticate authenticate;
 62    private org.xmlBlaster.protocol.I_XmlBlaster xb;
 63    private boolean stompOpened;
 64 
 65    /** How long to block on remote call waiting on ping responses */
 66    protected long pingResponseTimeout;
 67    /** How long to block on remote call waiting on update responses */
 68    protected long updateResponseTimeout;
 69    
 70    private ConnectQosServer connectQos;
 71    private String remoteAddress = "";
 72 
 73    public XbStompInOutBridge(Global glob, XbStompDriver driver,
 74          StompHandler outputHandler) {
 75       this.glob = glob;
 76       this.driver = driver;
 77       this.outputHandler = outputHandler; // is  instanceof TcpTransport
 78       this.stompOpened = true;
 79       try {
 80          // "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
 81          this.remoteAddress = outputHandler.toString();
 82       }
 83       catch (Throwable e) {
 84          e.printStackTrace();
 85       }
 86    }
 87 
 88    /**
 89     * All stomp threads contain the remote address alreay, but not our internal threads
 90     */
 91    public String getExtendedLogId() {
 92       return ME + " " + this.remoteAddress;
 93    }
 94 
 95    /**
 96     * Called from xmlBlaster core on disconnect
 97     * Called on exception or driver deactivate
 98     */
 99    public void shutdown() {
100       log.info(getExtendedLogId() + " will shutdown");
101         _destroy();
102    }
103 
104    /**
105     * Callback from #StompHandler
106     */
107    public void close() {
108       // The stomp tread name contains the remote IP
109       log.info(ME + " close from StompHandler");
110       _destroy();
111    }
112    
113    private void _destroy() {
114         driver.removeClient(this);
115       try {
116          synchronized (this) {
117             if (this.stompOpened) {
118                this.stompOpened = false;
119                // somebody is trying to close me all the time :-(
120                I_Authenticate auth = this.authenticate;
121                if (auth != null) {
122                   // From the point of view of the incoming client connection we
123                   // are
124                   // dead
125                   // The callback dispatch framework may have another point of
126                   // view
127                   // (which is not of interest here)
128                   try {
129                      auth.connectionState(this.secretSessionId,
130                            ConnectionStateEnum.DEAD);
131                   }
132                   catch (Throwable e) {
133                      e.printStackTrace();
134                   }
135                }
136                this.secretSessionId = null;
137                this.authenticate = null;
138                this.xb = null;
139                log.info(getExtendedLogId() + " gets closed");
140                try {
141                   outputHandler.close();
142                } catch (Throwable e) {
143                   e.printStackTrace();
144                }
145                notifyAllFrameAcks();
146             }
147          }
148       } catch (Throwable e) {
149          e.printStackTrace();
150          log.severe(e.toString());
151       }
152    }
153 
154    public int notifyAllFrameAcks() {
155       RequestHolder[] arr = getFramesToAck();
156       log.info(ME + ". Close called with " + arr.length + " waiting frames, notify them now");
157       this.framesToAck.clear();
158       for (RequestHolder requestHolder : arr) {
159          requestHolder.shutdown = true;
160          notifyFrameAck(requestHolder);
161       }
162       return arr.length;
163    }
164 
165    public RequestHolder[] getFramesToAck() {
166       synchronized (this.framesToAck) {
167          return (RequestHolder[])this.framesToAck.values().toArray(new RequestHolder[this.framesToAck.size()]);
168       }
169    }
170 
171    /*
172     * This Code Area handles the incoming Stomp messages by implementing the
173     * StompHandler Interface
174     */
175 
176    private boolean checkStompConnected() {
177       return this.stompOpened;
178    }
179 
180    private boolean checkXbConnected() {
181       return this.stompOpened && this.xb != null;
182    }
183 
184    /**
185     * Callback from Stomp
186     */
187    public void onException(Exception e) {
188       log.warning(ME + " onException from Stomp: " + e.toString());
189       _destroy();
190    }
191    
192    //@SuppressWarnings("unknown")
193    private String dump(StompFrame frame) {
194       StringBuffer buf = new StringBuffer();
195       try {
196          if (frame == null) {
197             log.severe("Can't dump null frame");
198             return "Can't dump null frame";
199          }
200          buf.append("action=").append(frame.getAction()).append(",");
201          Map<String, Object> map = frame.getHeaders();
202          java.util.Iterator<String> it = map.keySet().iterator();
203          while (it.hasNext()) {
204             String key = it.next();
205             buf.append(key).append("=").append(map.get(key)).append(",");
206          }
207          buf.append("content=").append(new String(frame.getContent(), "UTF-8"));
208       }
209       catch (Throwable e) {
210          buf.append("Can't dump frame: " + e.toString());
211          log.severe("Can't dump frame: " + e.toString());
212       }
213       return buf.toString();
214    }
215 
216    /**
217     * Callback from stomp layer. 
218     */
219    public void onStompFrame(StompFrame frame) throws Exception {
220       try {
221          String action = frame.getAction();
222          if (action == null) {
223             log.warning(ME + " Ignoring null stomp action: " + dump(frame));
224             return;
225          }
226          if (log.isLoggable(Level.FINE))
227             log.fine(getExtendedLogId() + " onStompFrame " + dump(frame));
228          action = action.trim();
229          if (action.startsWith(Stomp.Commands.CONNECT)) {
230             onStompConnect(frame);
231          } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
232             onStompSubscribe(frame);
233          } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
234             onStompUnsubscribe(frame);
235          } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
236             onStompDisconnect(frame);
237          } else if (action.startsWith(Stomp.Commands.SEND)) {
238             onStompSend(frame);
239          } else if (action.startsWith(Stomp.Commands.ACK)) {
240             onStompAck(frame);
241          } else if (action.startsWith(Stomp.Commands.ABORT) || action.startsWith("NAK")) {
242             onStompNak(frame);
243          } else {
244             throw new ProtocolException("STOMP action: " + action
245                   + "not supported or unknown: ");
246          }
247       }
248       catch (ProtocolException e) {
249          log.severe(getExtendedLogId() + e.toString() + ": " + dump(frame));
250          throw e; // re-throw
251       }
252       catch (Exception e) {
253          e.printStackTrace();
254          log.warning(getExtendedLogId() + " onStompFrame failed: " + e.toString() + ": " + dump(frame));
255          throw e;
256       }
257       catch (Throwable e) {
258          e.printStackTrace();
259          log.severe(getExtendedLogId() + " onStompFrame failed: " + e.toString() + ": " + dump(frame));
260          _destroy();
261       }
262    }
263 
264    @SuppressWarnings("unchecked")
265    protected void onStompConnect(StompFrame command) {
266       try {
267          final Map headers = command.getHeaders();
268          String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
269          String login = (String) headers.get(Stomp.Headers.Connect.LOGIN);
270          org.xmlBlaster.engine.ServerScope engineGlob = (org.xmlBlaster.engine.ServerScope) glob
271                .getObjectEntry(Constants.OBJECT_ENTRY_ServerScope);
272          if (engineGlob == null)
273             throw new XmlBlasterException(this.glob,
274                   ErrorCode.INTERNAL_UNKNOWN, ME + ".init",
275                   "could not retreive the ServerNodeScope. Am I really on the server side ?");
276          this.authenticate = engineGlob.getAuthenticate();
277          xb = this.authenticate.getXmlBlaster();
278          if (this.authenticate == null) {
279             throw new XmlBlasterException(this.glob,
280                   ErrorCode.INTERNAL_UNKNOWN, ME + ".init",
281                   "authenticate object is null");
282          }
283          if (login != null) {
284             ConnectQosServer conQos = new ConnectQosServer(glob, qos);
285             
286             if (conQos.getData().getClientProperty("deviceGuid", (String)null) != null) {
287                StringBuffer buf = new StringBuffer();
288                buf.append("loginName=").append(conQos.getUserId()).append(",");
289                buf.append("version=").append(conQos.getData().getClientProperty("version", "")).append(",");
290                buf.append("deviceName=").append(conQos.getData().getClientProperty("deviceName", "")).append(",");
291                buf.append("platform.model=").append(conQos.getData().getClientProperty("platform.model", "")).append(",");
292                buf.append("platform.name=").append(conQos.getData().getClientProperty("platform.name", "")).append(",");
293                buf.append("batterylevel=").append(conQos.getData().getClientProperty("batterylevel", "")).append(",");
294                buf.append("version=").append(conQos.getData().getClientProperty("version", "")).append(",");
295                buf.append("version.OS=").append(conQos.getData().getClientProperty("version.OS", "")).append(",");
296                buf.append("deviceGuid=").append(conQos.getData().getClientProperty("deviceGuid", ""));
297                log.info(ME + "Connecting device: " + buf.toString());
298             }
299             
300             if (conQos.getSecurityQos() == null) {
301                String clientId = (String) headers
302                      .get(Stomp.Headers.Connect.CLIENT_ID);
303                String passcode = (String) headers
304                      .get(Stomp.Headers.Connect.PASSCODE);
305                if (clientId != null || passcode != null) {
306                   SecurityQos securityQos = new SecurityQos(glob);
307                   securityQos.setUserId(clientId);
308                   securityQos.setCredential(passcode);
309                   conQos.getData().setSecurityQos(securityQos);
310                   if (conQos.getSessionName() == null
311                         || conQos.getSessionName().getLoginName() == null) {
312                      conQos.setSessionName(new SessionName(engineGlob,
313                            clientId));
314                   }
315                } else {
316                   throw new XmlBlasterException(
317                         glob,
318                         ErrorCode.USER_SECURITY_AUTHENTICATION_ILLEGALARGUMENT,
319                         ME, "connect() without securityQos");
320                }
321             }
322             // conQos.getSecurityQos().setClientIp(outputHandler.getIp());
323             // conQos.setAddressServer(driver.getTcpServer().getBindLocation());
324             // setLoginName(conQos.getSessionName().getRelativeName());
325 
326             CallbackAddress[] cbArr = conQos.getSessionCbQueueProperty()
327                   .getCallbackAddresses();
328             for (int ii = 0; cbArr != null && ii < cbArr.length; ii++) {
329                cbArr[ii].setRawAddress(this.remoteAddress);//driver.getRawAddress());
330                try {
331                   cbArr[ii].setCallbackDriver(this);
332                } catch (Exception e) {
333                   e.printStackTrace();
334                   log.severe(ME + " Internal error during setCallbackDriver: " + e.toString() + ": " + dump(command));
335                }
336             }
337             ConnectReturnQosServer retQos = authenticate.connect(conQos);
338             this.connectQos = conQos;
339             this.secretSessionId = retQos.getSecretSessionId();
340             ME = retQos.getSessionName().getRelativeName();
341 
342             Map<String, String> responseHeaders = new HashMap<String, String>();
343             responseHeaders.put(Stomp.Headers.Connected.SESSION,
344                   this.secretSessionId);
345             String requestId = (String) headers
346                   .get(Stomp.Headers.Connect.REQUEST_ID);
347             if (requestId == null) {
348                requestId = (String) headers
349                      .get(Stomp.Headers.RECEIPT_REQUESTED);
350             }
351             if (requestId != null) {
352                responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID,
353                      requestId);
354                responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID,
355                      requestId);
356             }
357             StompFrame sc = new StompFrame();
358             sc.setAction(Stomp.Responses.CONNECTED);
359             sc.setHeaders((Map)responseHeaders);
360             sendFrameNoWait(sc);
361             log.info(ME + " ConnectReturn Qos is send");
362          }
363       } catch (XmlBlasterException e) {
364          this.connectQos = null;
365          log.warning(ME + " Connect failed: " + e.toString() + ": " + dump(command));
366          sendExeption(command, e);
367       }
368    }
369    
370    public String getLoginName() {
371       ConnectQosServer cs = this.connectQos;
372       if (cs != null) {
373          return cs.getSessionName().getLoginName();
374       }
375       return null;
376    }
377 
378    protected void onStompDisconnect(StompFrame command) {
379       if (!checkXbConnected()) {
380          sendExeption(command,
381                new XmlBlasterException(glob,
382                      ErrorCode.USER_WRONG_API_USAGE,
383                      "onStompDisconnect: Please call connect first: " + dump(command)));
384          return;
385       }
386       try {
387          @SuppressWarnings("unchecked")
388          final Map headers = command.getHeaders();
389          String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
390          authenticate.disconnect(null, secretSessionId, qos);
391       } catch (XmlBlasterException e) {
392          sendExeption(command, e);
393       }
394       _destroy();
395    }
396 
397    protected void onStompSend(StompFrame command) {
398       try {
399          if (!checkXbConnected()) {
400             sendExeption(command, new XmlBlasterException(glob,
401                   ErrorCode.USER_WRONG_API_USAGE,
402                   "onStompSend: Please call connect first: " + dump(command)));
403             return;
404          }
405          @SuppressWarnings("unchecked")
406          Map headers = command.getHeaders();
407          String key = (String) headers.get(XB_SERVER_HEADER_KEY);
408          String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
409          MsgUnitRaw msg = new MsgUnitRaw(key, command.getContent(), qos);
410          MsgUnitRaw[] msgArr = new MsgUnitRaw[1];
411          msgArr[0] = msg;
412          xb.publishArr(null, secretSessionId, msgArr);
413          sendResponse(command);
414       } catch (XmlBlasterException e) {
415          sendExeption(command, e);
416       }
417    }
418 
419    protected void onStompSubscribe(StompFrame command) throws Exception {
420       try {
421          if (!checkXbConnected()) {
422             sendExeption(command, new XmlBlasterException(glob,
423                   ErrorCode.USER_WRONG_API_USAGE,
424                   "onStompSubscribe: Please call connect first: " + dump(command)));
425             return;
426          }
427          @SuppressWarnings("unchecked")
428          final Map headers = command.getHeaders();
429          String key = (String) headers.get(XB_SERVER_HEADER_KEY);
430          String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
431          xb.subscribe(null, secretSessionId, key, qos);
432 
433          sendResponse(command);
434       } catch (XmlBlasterException e) {
435          sendExeption(command, e);
436       }
437    }
438 
439    protected void onStompUnsubscribe(StompFrame command) throws Exception {
440       try {
441          if (!checkXbConnected()) {
442             sendExeption(command, new XmlBlasterException(glob,
443                   ErrorCode.USER_WRONG_API_USAGE,
444                   "onStompUnsubscribe: Please call connect first: " + dump(command)));
445             return;
446          }
447          @SuppressWarnings("unchecked")
448          final Map headers = command.getHeaders();
449          String key = (String) headers.get(XB_SERVER_HEADER_KEY);
450          String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
451          xb.unSubscribe(null, secretSessionId, key, qos);
452          sendResponse(command);
453       } catch (XmlBlasterException e) {
454          sendExeption(command, e);
455       }
456    }
457 
458    protected void onStompAck(StompFrame command) throws Exception {
459       if (!checkStompConnected())
460          return;
461       @SuppressWarnings("unchecked")
462       Map headers = command.getHeaders();
463       String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
464       if (messageId == null) {
465          log.warning(ME + " ACK API error: missing messageId: " + dump(command));
466          throw new ProtocolException(
467                ME + " ACK received without a message-id to acknowledge!");
468       }
469 
470       RequestHolder requestHolder = framesToAck.get(messageId);
471       if (requestHolder == null) {
472          // Happens on multiple Ack or on wrong messageId
473          log.warning(getExtendedLogId() + " Internal ACK API error: messageId=" + messageId + " not found in framesToAck hashtable: " + dump(command));
474       }
475       
476       requestHolder.returnQos = (String) headers.get(XB_SERVER_HEADER_QOS);
477       if (log.isLoggable(Level.FINE)) log.fine(ME + " " + requestHolder.toString() + " ACK release and notify ...");
478       
479       removeFrameForMessageId(messageId);
480       notifyFrameAck(requestHolder);
481    }
482 
483    protected void onStompNak(StompFrame command) throws Exception {
484       if (!checkStompConnected())
485          return;
486       @SuppressWarnings("unchecked")
487       Map headers = command.getHeaders();
488       String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
489       if (messageId == null) {
490          log.warning(ME + " NAK API error: missing messageId: " + dump(command));
491          throw new ProtocolException(
492                ME + " NAK received without a message-id to acknowledge!");
493       }
494       String errorCode = (String) headers.get("errorCode");
495       ErrorCode errorCodeEnum = ErrorCode.toErrorCode(errorCode, ErrorCode.USER_CLIENTCODE);
496       String message = (String) headers.get(Stomp.Headers.Error.MESSAGE);
497       if (message == null)
498          message = "UPDATE failed, client has rejected message";
499 
500       RequestHolder requestHolder = framesToAck.get(messageId);
501       if (requestHolder == null) {
502          // Happens on multiple Ack or on wrong messageId
503          log.warning(getExtendedLogId() + " Internal NAK API error: messageId=" + messageId + " not found in framesToAck hashtable: " + dump(command));
504       }
505       requestHolder.xmlBlasterException = new XmlBlasterException(glob, errorCodeEnum, message);
506       
507       log.info(ME + " NAK release and notify ... " + errorCode);
508       removeFrameForMessageId(messageId);
509       notifyFrameAck(requestHolder);
510    }
511 
512    private boolean notifyFrameAck(RequestHolder requestHolder) {
513       if (requestHolder != null && requestHolder.stompFrame != null) {
514          synchronized (requestHolder.stompFrame) {
515             try {
516                requestHolder.stompFrame.notify();
517                return true;
518             }
519             catch (Throwable e) {
520                e.printStackTrace();
521                return false;
522             }
523          }
524       }
525       return false;
526    }
527 
528    // ===================== I_CallbackDriver ==========================
529 
530    /*
531     * This Code Area handles the outgoing xmlBlaster messages by implementing
532     * the I_CallbackDRiver Interface
533     */
534 
535    //@Override()
536    public String getName() {
537       return ME;
538    }
539 
540    //@Override()
541    public String getProtocolId() {
542       return PROTOCOL_NAME;
543    }
544 
545    //@Override()
546    public String getRawAddress() {
547       return outputHandler.toString();
548    }
549 
550    /**
551     * How long to block on remote call waiting on a ping response. The default
552     * is to block for one minute This method can be overwritten by
553     * implementations like EMAIL
554     */
555    public long getDefaultPingResponseTimeout() {
556       return Constants.MINUTE_IN_MILLIS;
557    }
558 
559    /**
560     * How long to block on remote call waiting on a update() response. The
561     * default is to block forever This method can be overwritten by
562     * implementations like EMAIL
563     */
564    public long getDefaultUpdateResponseTimeout() {
565       return Integer.MAX_VALUE;
566    }
567 
568    /**
569     * Set the given millis to protect against blocking client for ping
570     * invocations.
571     * 
572     * @param millis
573     *            If <= 0 it is set to the default (one minute). An argument
574     *            less than or equal to zero means not to wait at all and is not
575     *            supported
576     */
577    public final void setPingResponseTimeout(long millis) {
578       if (millis <= 0L) {
579          log.warning(ME + " pingResponseTimeout=" + millis
580                + " is invalid, setting it to "
581                + getDefaultPingResponseTimeout() + " millis");
582          this.pingResponseTimeout = getDefaultPingResponseTimeout();
583       } else
584          this.pingResponseTimeout = millis;
585    }
586 
587    /**
588     * Set the given millis to protect against blocking client for update()
589     * invocations.
590     * 
591     * @param millis
592     *            If <= 0 it is set to the default (one minute). An argument
593     *            less than or equal to zero means not to wait at all and is not
594     *            supported
595     */
596    public final void setUpdateResponseTimeout(long millis) {
597       if (millis <= 0L) {
598          log.warning(ME + " updateResponseTimeout=" + millis
599                + " is invalid, setting it to "
600                + getDefaultUpdateResponseTimeout() + " millis");
601          this.updateResponseTimeout = getDefaultUpdateResponseTimeout();
602       } else
603          this.updateResponseTimeout = millis;
604    }
605 
606    /**
607     * @return Returns the responseTimeout.
608     */
609    public long getResponseTimeout(MethodName methodName) {
610       if (MethodName.PING.equals(methodName)) {
611          return this.pingResponseTimeout;
612       } else if (MethodName.UPDATE.equals(methodName)) {
613          return this.updateResponseTimeout;
614       }
615       return this.updateResponseTimeout;
616       // return this.responseTimeout;
617    }
618 
619    //@Override()
620    public void init(Global glob, CallbackAddress addressConfig)
621          throws XmlBlasterException {
622 
623       setPingResponseTimeout(addressConfig.getEnv("pingResponseTimeout",
624             getDefaultPingResponseTimeout()).getValue());
625       if (log.isLoggable(Level.FINE))
626          log.fine(addressConfig.getEnvLookupKey("pingResponseTimeout") + "="
627                + this.pingResponseTimeout);
628 
629       setUpdateResponseTimeout(addressConfig.getEnv("updateResponseTimeout",
630             getDefaultUpdateResponseTimeout()).getValue());
631       if (log.isLoggable(Level.FINE))
632          log.fine(addressConfig.getEnvLookupKey("updateResponseTimeout")
633                + "=" + this.updateResponseTimeout);
634    }
635 
636    //@Override()
637    public boolean isAlive() {
638       return this.stompOpened;
639    }
640 
641    //@Override()
642    public String ping(String qos) throws XmlBlasterException {
643       // never ping client without session
644       // <qos><state info='INITIAL'/></qos>
645       if (log.isLoggable(Level.FINE)) log.fine("Ping again");
646       if (qos != null && qos.indexOf("INITIAL") != -1)
647          return "<qos/>";
648       if (!checkStompConnected())
649          throw new XmlBlasterException(glob,
650                ErrorCode.COMMUNICATION_NOCONNECTION, ME,
651                "Stomp callback ping failed");
652       StompFrame frame = new StompFrame();
653       frame.setAction(XB_SERVER_COMMAND_PING);
654       frame.getHeaders().put(XB_SERVER_HEADER_QOS, qos);
655       String returnValue = sendFrameAndWait(frame, MethodName.PING);
656       return returnValue;
657    }
658 
659    public I_ProgressListener registerProgressListener(
660          I_ProgressListener listener) {
661       return null;
662    }
663    
664    /**
665     * HTTP header key/value should not contain new line. 
666     * @param str
667     * @return
668     */
669    private String cleanNewlines(String str) {
670       return ReplaceVariable.replaceAll(str, "\n", " ");
671    }
672    
673    private KeyData getKeyData(MsgUnitRaw msgUnitRaw) {
674       try {
675          if (msgUnitRaw == null)
676             return null;
677          MsgUnit msgUnit = (MsgUnit)msgUnitRaw.getMsgUnit();
678          if (msgUnit == null)
679             return null;
680          return msgUnit.getKeyData();
681       }
682       catch(Throwable e) {
683          e.printStackTrace();
684          return null;
685       }
686    }
687    
688    private String getContentType(MsgUnitRaw msgUnitRaw) {
689       KeyData keyData = getKeyData(msgUnitRaw);
690       if (keyData != null) {
691          String contentType = keyData.getContentMime();
692          if (contentType != null && contentType.length() > 0)
693             return contentType;
694       }
695       return "text/xml";
696    }
697 
698    public String[] sendUpdate(MsgUnitRaw[] msgArr) throws XmlBlasterException {
699       String[] ret = new String[msgArr.length];
700       int i = 0;
701       for (MsgUnitRaw msgUnitRaw : msgArr) {
702          StompFrame frame = new StompFrame();
703          // MsgUnit msg = (MsgUnit) msgUnit.getMsgUnit();
704          // String senderLoginName =
705          // msg.getQosData().getSender().getAbsoluteName();
706          MsgUnit msg = (MsgUnit) msgUnitRaw.getMsgUnit();
707          String topicId = msg.getKeyOid();
708          frame.setAction(Stomp.Responses.MESSAGE);
709          frame.getHeaders().put(Stomp.Headers.Message.DESTINATION, topicId);
710          frame.getHeaders().put("methodName", MethodName.UPDATE.toString());
711          String contentType = getContentType(msgUnitRaw);
712          frame.getHeaders().put("content-type", contentType); // "text/xml"
713          frame.getHeaders().put(XB_SERVER_HEADER_KEY, cleanNewlines(msgUnitRaw.getKey()));
714          frame.getHeaders().put(XB_SERVER_HEADER_QOS, cleanNewlines(msgUnitRaw.getQos()));
715          byte[] content = msgUnitRaw.getContent();
716          frame.getHeaders().put(Stomp.Headers.CONTENT_LENGTH, content.length);
717          frame.setContent(content);
718          if (log.isLoggable(Level.FINER)) log.finer(ME + " UPDATE Sending now ... " + msgUnitRaw.getKey().trim());
719          ret[i] = sendFrameAndWait(frame, MethodName.UPDATE);
720          if (log.isLoggable(Level.FINER)) log.finer(ME + " UPDATE Done " + msgUnitRaw.getKey().trim() + ": " + ret[i]);
721          i++;
722       }
723       return ret;
724    }
725 
726    public void sendUpdateOneway(MsgUnitRaw[] msgArr)
727          throws XmlBlasterException {
728       for (MsgUnitRaw msgUnitRaw : msgArr) {
729          try {
730             StompFrame frame = new StompFrame();
731             MsgUnit msg = (MsgUnit) msgUnitRaw.getMsgUnit();
732             String topicId = msg.getKeyOid();
733             frame.setAction(Stomp.Responses.MESSAGE);
734             frame.getHeaders().put(Stomp.Headers.Message.DESTINATION,
735                   topicId);
736             frame.getHeaders().put("methodName", MethodName.UPDATE_ONEWAY);
737             frame.getHeaders().put("content-type", getContentType(msgUnitRaw)); // "text/xml"
738             frame.getHeaders().put(XB_SERVER_HEADER_KEY, cleanNewlines(msgUnitRaw.getKey()));
739             frame.getHeaders().put(XB_SERVER_HEADER_QOS, cleanNewlines(msgUnitRaw.getQos()));
740             frame.getHeaders().put(Stomp.Headers.CONTENT_LENGTH,
741                   msgUnitRaw.getContent().length);
742             frame.setContent(msgUnitRaw.getContent());
743             sendFrameNoWait(frame);
744          } catch (Exception e) {
745             log.severe(e.getMessage());
746          }
747       }
748 
749    }
750 
751    public String getType() {
752       return PROTOCOL_NAME;
753    }
754 
755    public String getVersion() {
756       return "1.0";
757    }
758 
759    public void init(Global glob, PluginInfo pluginInfo)
760          throws XmlBlasterException {
761    }
762 
763    /*
764     * private internal stuff
765     */
766 
767    private RequestHolder getFrameForMessageId(String messageId) {
768       return (framesToAck.get(messageId));
769    }
770 
771    private RequestHolder registerFrame(StompFrame frame) {
772       //String messageId = "" + new Timestamp().getTimestamp();
773       //String messageId = frame.getAction() + "-" + secretSessionId + "-" + System.currentTimeMillis();
774       String messageId = frame.getAction() + "-" + new Timestamp().getTimestamp();
775       frame.getHeaders().put(Stomp.Headers.Message.MESSAGE_ID, messageId);
776       RequestHolder requestHolder = new RequestHolder(messageId, frame);
777       framesToAck.put(messageId, requestHolder);
778       return requestHolder;
779    }
780 
781    private void removeFrameForMessageId(String messageId) {
782       if (messageId == null)
783          return;
784       if (framesToAck.get(messageId) != null)
785          framesToAck.remove(messageId);
786    }
787 
788    private void sendFrameNoWait(StompFrame frame) throws XmlBlasterException {
789       checkStompConnected();
790       try {
791          outputHandler.onStompFrame(frame);
792       } catch (Exception e) {
793          e.printStackTrace();
794          throw new XmlBlasterException(this.glob,
795                ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME
796                      + ".sendFrameNoWait", e.getMessage());
797       }
798    }
799 
800    private String sendFrameAndWait(StompFrame frame, MethodName methodName)
801          throws XmlBlasterException {
802       final RequestHolder requestHolder = registerFrame(frame);
803       try {
804          checkStompConnected();
805          long timeout = getResponseTimeout(methodName);
806          if (log.isLoggable(Level.FINE))
807             log.fine(ME + " " + requestHolder.toString() + ": Sending now " + methodName.toString() + "...");
808          synchronized (frame) {
809             outputHandler.onStompFrame(frame);
810             frame.wait(timeout); // TODO: Port to CountDownLatch cdl = new CountDownLatch(1);
811          }
812          // Timeout occurred if requestHolder was not removed by ACK or NAK:
813          if (requestHolder == getFrameForMessageId(requestHolder.messageId)) {
814             String text = "methodName=" + methodName.toString() + " messageId=" + requestHolder.messageId + ": No Ack recieved in timeoutMillis=" + timeout;
815             log.warning(text + ": " + dump(frame));
816             removeFrameForMessageId(requestHolder.messageId);
817             throw new XmlBlasterException(this.glob,
818                   ErrorCode.COMMUNICATION_TIMEOUT, ME
819                         + ".sendFrameAndWait",
820                   text);
821          }
822       } catch (Exception e) {
823          if (e instanceof XmlBlasterException)
824             throw (XmlBlasterException) e;
825          else
826             throw new XmlBlasterException(this.glob,
827                   ErrorCode.COMMUNICATION_NOCONNECTION_DEAD,
828                   // ErrorCode.COMMUNICATION_NOCONNECTION_CALLBACKSERVER_NOTAVAILABLE,
829                   ME + ".sendFrameAndWait", e.getMessage());
830       }
831       // http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.update.html
832       if (requestHolder.shutdown) {
833          // connection was lost
834          log.warning(getExtendedLogId() + requestHolder.toString() + ": Shutdown during callback delivery: " + dump(frame));
835          throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_RESPONSETIMEOUT, ME + " Shutdown during update delivery");
836          //return "<qos><state id='FAIL'/>";
837       }
838       else if (requestHolder.xmlBlasterException != null) { // on XmlBlasterException
839          log.warning(ME + " " + requestHolder.toString() + ": Exception from client: " + requestHolder.xmlBlasterException.getMessage() + ": " + dump(frame));
840          throw requestHolder.xmlBlasterException;
841       }
842       else { // requestHolder.returnQos should filled
843          if (log.isLoggable(Level.FINE))
844             log.fine(ME + " " + requestHolder.toString() + ": Successfully send and acknowledged " + requestHolder.returnQos);
845          return (requestHolder.returnQos == null) ?  "<qos/>" : requestHolder.returnQos;
846       }
847    }
848 
849    @SuppressWarnings("unchecked")
850    private void sendResponse(StompFrame command) throws XmlBlasterException {
851       final String receiptId = (String) command.getHeaders().get(
852             Stomp.Headers.RECEIPT_REQUESTED);
853       // A response may not be needed.
854       if (receiptId != null) {
855          StompFrame sc = new StompFrame();
856          sc.setAction(Stomp.Responses.RECEIPT);
857          sc.setHeaders(new HashMap());
858          sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
859          sendFrameNoWait(sc);
860       }
861    }
862 
863    public static byte[] toUtf8Bytes(String s) {
864       if (s == null || s.length() == 0)
865          return new byte[0];
866       try {
867          return s.getBytes(Constants.UTF8_ENCODING);
868       } catch (UnsupportedEncodingException e) {
869          log.severe("PANIC in WatcheeConstants.toUtf8Bytes(" + s
870                + ", " + Constants.UTF8_ENCODING + "): " + e.toString());
871          e.printStackTrace();
872          return s.getBytes();
873       }
874    }
875 
876    @SuppressWarnings("unchecked")
877    private void sendExeption(StompFrame command, XmlBlasterException e) {
878       try {
879          final String receiptId = (String) command.getHeaders().get(
880                Stomp.Headers.RECEIPT_REQUESTED);
881          StompFrame sc = new StompFrame();
882          sc.setAction(Stomp.Responses.ERROR);
883          sc.setHeaders(new HashMap());
884          sc.getHeaders().put("errorCode", e.getErrorCodeStr()); // xmlBlaster way
885          sc.getHeaders().put(Stomp.Responses.MESSAGE, e.getErrorCodeStr()); // stomp
886                                                             // wants
887                                                             // it
888          if (receiptId != null) {
889             sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
890          }
891          String text = e.getMessage();
892          sc.getHeaders().put(Stomp.Headers.CONTENT_LENGTH, text.length());
893          sc.setContent(toUtf8Bytes(text));
894          try {
895             sendFrameNoWait(sc);
896          } catch (XmlBlasterException e1) {
897             e1.printStackTrace();
898          }
899          log.warning(ME + "sendException" + e.getMessage() + ": " + dump(command));
900       }
901       catch (Throwable e2) {
902          e2.printStackTrace();
903          log.severe("sendExeption failed for " + e.toString() + ": " + e2.toString() + ": " + dump(command));
904       }
905    }
906 }


syntax highlighted by Code2HTML, v. 0.9.1