1 package org.xmlBlaster.protocol.stomp;
2
3 import java.io.UnsupportedEncodingException;
4 import java.util.HashMap;
5 import java.util.Map;
6 import java.util.concurrent.ConcurrentHashMap;
7 import java.util.logging.Level;
8 import java.util.logging.Logger;
9
10 import org.codehaus.stomp.ProtocolException;
11 import org.codehaus.stomp.Stomp;
12 import org.codehaus.stomp.StompFrame;
13 import org.codehaus.stomp.StompHandler;
14 import org.xmlBlaster.authentication.plugins.demo.SecurityQos;
15 import org.xmlBlaster.engine.qos.ConnectQosServer;
16 import org.xmlBlaster.engine.qos.ConnectReturnQosServer;
17 import org.xmlBlaster.protocol.I_Authenticate;
18 import org.xmlBlaster.protocol.I_CallbackDriver;
19 import org.xmlBlaster.util.Global;
20 import org.xmlBlaster.util.MsgUnit;
21 import org.xmlBlaster.util.MsgUnitRaw;
22 import org.xmlBlaster.util.ReplaceVariable;
23 import org.xmlBlaster.util.SessionName;
24 import org.xmlBlaster.util.Timestamp;
25 import org.xmlBlaster.util.XmlBlasterException;
26 import org.xmlBlaster.util.def.Constants;
27 import org.xmlBlaster.util.def.ErrorCode;
28 import org.xmlBlaster.util.def.MethodName;
29 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
30 import org.xmlBlaster.util.key.KeyData;
31 import org.xmlBlaster.util.plugin.PluginInfo;
32 import org.xmlBlaster.util.qos.address.CallbackAddress;
33 import org.xmlBlaster.util.xbformat.I_ProgressListener;
34
35 /**
36 * Protocol bridge, to bridge between xmlBlaster and STOMP protocol. implements
37 * StompHandler and I_CallbackDriver for incoming STOMP messages and outgoing
38 * XmlBlaster Messages.
39 * <p />
40 * One instance per client connect
41 *
42 * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.stomp.html">Protocol integration</a>
43 * @see <a href="http://stomp.codehaus.org/">Website</a>
44 * @see <a href="http://stomp.codehaus.org/Protocol">Protocol describtion</a>
45 * @author Dieter Saken (Marcel Ruff)
46 */
47 public class XbStompInOutBridge implements StompHandler, I_CallbackDriver {
48 private static Logger log = Logger.getLogger(XbStompInOutBridge.class
49 .getName());
50 public static final String XB_SERVER_COMMAND_PING = "PING";
51 public static final String XB_SERVER_HEADER_KEY = "key";
52 public static final String XB_SERVER_HEADER_QOS = "qos";
53 public String ME = "XbStompInOutBridge";
54 public static final String PROTOCOL_NAME = "STOMP";
55 private final StompHandler outputHandler;
56 private final XbStompDriver driver;
57 private final Global glob;
58
59 private final ConcurrentHashMap<String, RequestHolder> framesToAck = new ConcurrentHashMap<String, RequestHolder>();
60 private String secretSessionId;
61 private I_Authenticate authenticate;
62 private org.xmlBlaster.protocol.I_XmlBlaster xb;
63 private boolean stompOpened;
64
65 /** How long to block on remote call waiting on ping responses */
66 protected long pingResponseTimeout;
67 /** How long to block on remote call waiting on update responses */
68 protected long updateResponseTimeout;
69
70 private ConnectQosServer connectQos;
71 private String remoteAddress = "";
72
73 public XbStompInOutBridge(Global glob, XbStompDriver driver,
74 StompHandler outputHandler) {
75 this.glob = glob;
76 this.driver = driver;
77 this.outputHandler = outputHandler; // is instanceof TcpTransport
78 this.stompOpened = true;
79 try {
80 // "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
81 this.remoteAddress = outputHandler.toString();
82 }
83 catch (Throwable e) {
84 e.printStackTrace();
85 }
86 }
87
88 /**
89 * All stomp threads contain the remote address alreay, but not our internal threads
90 */
91 public String getExtendedLogId() {
92 return ME + " " + this.remoteAddress;
93 }
94
95 /**
96 * Called from xmlBlaster core on disconnect
97 * Called on exception or driver deactivate
98 */
99 public void shutdown() {
100 log.info(getExtendedLogId() + " will shutdown");
101 _destroy();
102 }
103
104 /**
105 * Callback from #StompHandler
106 */
107 public void close() {
108 // The stomp tread name contains the remote IP
109 log.info(ME + " close from StompHandler");
110 _destroy();
111 }
112
113 private void _destroy() {
114 driver.removeClient(this);
115 try {
116 synchronized (this) {
117 if (this.stompOpened) {
118 this.stompOpened = false;
119 // somebody is trying to close me all the time :-(
120 I_Authenticate auth = this.authenticate;
121 if (auth != null) {
122 // From the point of view of the incoming client connection we
123 // are
124 // dead
125 // The callback dispatch framework may have another point of
126 // view
127 // (which is not of interest here)
128 try {
129 auth.connectionState(this.secretSessionId,
130 ConnectionStateEnum.DEAD);
131 }
132 catch (Throwable e) {
133 e.printStackTrace();
134 }
135 }
136 this.secretSessionId = null;
137 this.authenticate = null;
138 this.xb = null;
139 log.info(getExtendedLogId() + " gets closed");
140 try {
141 outputHandler.close();
142 } catch (Throwable e) {
143 e.printStackTrace();
144 }
145 notifyAllFrameAcks();
146 }
147 }
148 } catch (Throwable e) {
149 e.printStackTrace();
150 log.severe(e.toString());
151 }
152 }
153
154 public int notifyAllFrameAcks() {
155 RequestHolder[] arr = getFramesToAck();
156 log.info(ME + ". Close called with " + arr.length + " waiting frames, notify them now");
157 this.framesToAck.clear();
158 for (RequestHolder requestHolder : arr) {
159 requestHolder.shutdown = true;
160 notifyFrameAck(requestHolder);
161 }
162 return arr.length;
163 }
164
165 public RequestHolder[] getFramesToAck() {
166 synchronized (this.framesToAck) {
167 return (RequestHolder[])this.framesToAck.values().toArray(new RequestHolder[this.framesToAck.size()]);
168 }
169 }
170
171 /*
172 * This Code Area handles the incoming Stomp messages by implementing the
173 * StompHandler Interface
174 */
175
176 private boolean checkStompConnected() {
177 return this.stompOpened;
178 }
179
180 private boolean checkXbConnected() {
181 return this.stompOpened && this.xb != null;
182 }
183
184 /**
185 * Callback from Stomp
186 */
187 public void onException(Exception e) {
188 log.warning(ME + " onException from Stomp: " + e.toString());
189 _destroy();
190 }
191
192 //@SuppressWarnings("unknown")
193 private String dump(StompFrame frame) {
194 StringBuffer buf = new StringBuffer();
195 try {
196 if (frame == null) {
197 log.severe("Can't dump null frame");
198 return "Can't dump null frame";
199 }
200 buf.append("action=").append(frame.getAction()).append(",");
201 Map<String, Object> map = frame.getHeaders();
202 java.util.Iterator<String> it = map.keySet().iterator();
203 while (it.hasNext()) {
204 String key = it.next();
205 buf.append(key).append("=").append(map.get(key)).append(",");
206 }
207 buf.append("content=").append(new String(frame.getContent(), "UTF-8"));
208 }
209 catch (Throwable e) {
210 buf.append("Can't dump frame: " + e.toString());
211 log.severe("Can't dump frame: " + e.toString());
212 }
213 return buf.toString();
214 }
215
216 /**
217 * Callback from stomp layer.
218 */
219 public void onStompFrame(StompFrame frame) throws Exception {
220 try {
221 String action = frame.getAction();
222 if (action == null) {
223 log.warning(ME + " Ignoring null stomp action: " + dump(frame));
224 return;
225 }
226 if (log.isLoggable(Level.FINE))
227 log.fine(getExtendedLogId() + " onStompFrame " + dump(frame));
228 action = action.trim();
229 if (action.startsWith(Stomp.Commands.CONNECT)) {
230 onStompConnect(frame);
231 } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
232 onStompSubscribe(frame);
233 } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
234 onStompUnsubscribe(frame);
235 } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
236 onStompDisconnect(frame);
237 } else if (action.startsWith(Stomp.Commands.SEND)) {
238 onStompSend(frame);
239 } else if (action.startsWith(Stomp.Commands.ACK)) {
240 onStompAck(frame);
241 } else if (action.startsWith(Stomp.Commands.ABORT) || action.startsWith("NAK")) {
242 onStompNak(frame);
243 } else {
244 throw new ProtocolException("STOMP action: " + action
245 + "not supported or unknown: ");
246 }
247 }
248 catch (ProtocolException e) {
249 log.severe(getExtendedLogId() + e.toString() + ": " + dump(frame));
250 throw e; // re-throw
251 }
252 catch (Exception e) {
253 e.printStackTrace();
254 log.warning(getExtendedLogId() + " onStompFrame failed: " + e.toString() + ": " + dump(frame));
255 throw e;
256 }
257 catch (Throwable e) {
258 e.printStackTrace();
259 log.severe(getExtendedLogId() + " onStompFrame failed: " + e.toString() + ": " + dump(frame));
260 _destroy();
261 }
262 }
263
264 @SuppressWarnings("unchecked")
265 protected void onStompConnect(StompFrame command) {
266 try {
267 final Map headers = command.getHeaders();
268 String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
269 String login = (String) headers.get(Stomp.Headers.Connect.LOGIN);
270 org.xmlBlaster.engine.ServerScope engineGlob = (org.xmlBlaster.engine.ServerScope) glob
271 .getObjectEntry(Constants.OBJECT_ENTRY_ServerScope);
272 if (engineGlob == null)
273 throw new XmlBlasterException(this.glob,
274 ErrorCode.INTERNAL_UNKNOWN, ME + ".init",
275 "could not retreive the ServerNodeScope. Am I really on the server side ?");
276 this.authenticate = engineGlob.getAuthenticate();
277 xb = this.authenticate.getXmlBlaster();
278 if (this.authenticate == null) {
279 throw new XmlBlasterException(this.glob,
280 ErrorCode.INTERNAL_UNKNOWN, ME + ".init",
281 "authenticate object is null");
282 }
283 if (login != null) {
284 ConnectQosServer conQos = new ConnectQosServer(glob, qos);
285
286 if (conQos.getData().getClientProperty("deviceGuid", (String)null) != null) {
287 StringBuffer buf = new StringBuffer();
288 buf.append("loginName=").append(conQos.getUserId()).append(",");
289 buf.append("version=").append(conQos.getData().getClientProperty("version", "")).append(",");
290 buf.append("deviceName=").append(conQos.getData().getClientProperty("deviceName", "")).append(",");
291 buf.append("platform.model=").append(conQos.getData().getClientProperty("platform.model", "")).append(",");
292 buf.append("platform.name=").append(conQos.getData().getClientProperty("platform.name", "")).append(",");
293 buf.append("batterylevel=").append(conQos.getData().getClientProperty("batterylevel", "")).append(",");
294 buf.append("version=").append(conQos.getData().getClientProperty("version", "")).append(",");
295 buf.append("version.OS=").append(conQos.getData().getClientProperty("version.OS", "")).append(",");
296 buf.append("deviceGuid=").append(conQos.getData().getClientProperty("deviceGuid", ""));
297 log.info(ME + "Connecting device: " + buf.toString());
298 }
299
300 if (conQos.getSecurityQos() == null) {
301 String clientId = (String) headers
302 .get(Stomp.Headers.Connect.CLIENT_ID);
303 String passcode = (String) headers
304 .get(Stomp.Headers.Connect.PASSCODE);
305 if (clientId != null || passcode != null) {
306 SecurityQos securityQos = new SecurityQos(glob);
307 securityQos.setUserId(clientId);
308 securityQos.setCredential(passcode);
309 conQos.getData().setSecurityQos(securityQos);
310 if (conQos.getSessionName() == null
311 || conQos.getSessionName().getLoginName() == null) {
312 conQos.setSessionName(new SessionName(engineGlob,
313 clientId));
314 }
315 } else {
316 throw new XmlBlasterException(
317 glob,
318 ErrorCode.USER_SECURITY_AUTHENTICATION_ILLEGALARGUMENT,
319 ME, "connect() without securityQos");
320 }
321 }
322 // conQos.getSecurityQos().setClientIp(outputHandler.getIp());
323 // conQos.setAddressServer(driver.getTcpServer().getBindLocation());
324 // setLoginName(conQos.getSessionName().getRelativeName());
325
326 CallbackAddress[] cbArr = conQos.getSessionCbQueueProperty()
327 .getCallbackAddresses();
328 for (int ii = 0; cbArr != null && ii < cbArr.length; ii++) {
329 cbArr[ii].setRawAddress(this.remoteAddress);//driver.getRawAddress());
330 try {
331 cbArr[ii].setCallbackDriver(this);
332 } catch (Exception e) {
333 e.printStackTrace();
334 log.severe(ME + " Internal error during setCallbackDriver: " + e.toString() + ": " + dump(command));
335 }
336 }
337 ConnectReturnQosServer retQos = authenticate.connect(conQos);
338 this.connectQos = conQos;
339 this.secretSessionId = retQos.getSecretSessionId();
340 ME = retQos.getSessionName().getRelativeName();
341
342 Map<String, String> responseHeaders = new HashMap<String, String>();
343 responseHeaders.put(Stomp.Headers.Connected.SESSION,
344 this.secretSessionId);
345 String requestId = (String) headers
346 .get(Stomp.Headers.Connect.REQUEST_ID);
347 if (requestId == null) {
348 requestId = (String) headers
349 .get(Stomp.Headers.RECEIPT_REQUESTED);
350 }
351 if (requestId != null) {
352 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID,
353 requestId);
354 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID,
355 requestId);
356 }
357 StompFrame sc = new StompFrame();
358 sc.setAction(Stomp.Responses.CONNECTED);
359 sc.setHeaders((Map)responseHeaders);
360 sendFrameNoWait(sc);
361 log.info(ME + " ConnectReturn Qos is send");
362 }
363 } catch (XmlBlasterException e) {
364 this.connectQos = null;
365 log.warning(ME + " Connect failed: " + e.toString() + ": " + dump(command));
366 sendExeption(command, e);
367 }
368 }
369
370 public String getLoginName() {
371 ConnectQosServer cs = this.connectQos;
372 if (cs != null) {
373 return cs.getSessionName().getLoginName();
374 }
375 return null;
376 }
377
378 protected void onStompDisconnect(StompFrame command) {
379 if (!checkXbConnected()) {
380 sendExeption(command,
381 new XmlBlasterException(glob,
382 ErrorCode.USER_WRONG_API_USAGE,
383 "onStompDisconnect: Please call connect first: " + dump(command)));
384 return;
385 }
386 try {
387 @SuppressWarnings("unchecked")
388 final Map headers = command.getHeaders();
389 String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
390 authenticate.disconnect(null, secretSessionId, qos);
391 } catch (XmlBlasterException e) {
392 sendExeption(command, e);
393 }
394 _destroy();
395 }
396
397 protected void onStompSend(StompFrame command) {
398 try {
399 if (!checkXbConnected()) {
400 sendExeption(command, new XmlBlasterException(glob,
401 ErrorCode.USER_WRONG_API_USAGE,
402 "onStompSend: Please call connect first: " + dump(command)));
403 return;
404 }
405 @SuppressWarnings("unchecked")
406 Map headers = command.getHeaders();
407 String key = (String) headers.get(XB_SERVER_HEADER_KEY);
408 String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
409 MsgUnitRaw msg = new MsgUnitRaw(key, command.getContent(), qos);
410 MsgUnitRaw[] msgArr = new MsgUnitRaw[1];
411 msgArr[0] = msg;
412 xb.publishArr(null, secretSessionId, msgArr);
413 sendResponse(command);
414 } catch (XmlBlasterException e) {
415 sendExeption(command, e);
416 }
417 }
418
419 protected void onStompSubscribe(StompFrame command) throws Exception {
420 try {
421 if (!checkXbConnected()) {
422 sendExeption(command, new XmlBlasterException(glob,
423 ErrorCode.USER_WRONG_API_USAGE,
424 "onStompSubscribe: Please call connect first: " + dump(command)));
425 return;
426 }
427 @SuppressWarnings("unchecked")
428 final Map headers = command.getHeaders();
429 String key = (String) headers.get(XB_SERVER_HEADER_KEY);
430 String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
431 xb.subscribe(null, secretSessionId, key, qos);
432
433 sendResponse(command);
434 } catch (XmlBlasterException e) {
435 sendExeption(command, e);
436 }
437 }
438
439 protected void onStompUnsubscribe(StompFrame command) throws Exception {
440 try {
441 if (!checkXbConnected()) {
442 sendExeption(command, new XmlBlasterException(glob,
443 ErrorCode.USER_WRONG_API_USAGE,
444 "onStompUnsubscribe: Please call connect first: " + dump(command)));
445 return;
446 }
447 @SuppressWarnings("unchecked")
448 final Map headers = command.getHeaders();
449 String key = (String) headers.get(XB_SERVER_HEADER_KEY);
450 String qos = (String) headers.get(XB_SERVER_HEADER_QOS);
451 xb.unSubscribe(null, secretSessionId, key, qos);
452 sendResponse(command);
453 } catch (XmlBlasterException e) {
454 sendExeption(command, e);
455 }
456 }
457
458 protected void onStompAck(StompFrame command) throws Exception {
459 if (!checkStompConnected())
460 return;
461 @SuppressWarnings("unchecked")
462 Map headers = command.getHeaders();
463 String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
464 if (messageId == null) {
465 log.warning(ME + " ACK API error: missing messageId: " + dump(command));
466 throw new ProtocolException(
467 ME + " ACK received without a message-id to acknowledge!");
468 }
469
470 RequestHolder requestHolder = framesToAck.get(messageId);
471 if (requestHolder == null) {
472 // Happens on multiple Ack or on wrong messageId
473 log.severe(getExtendedLogId() + " Internal ACK API error: messageId=" + messageId + " not found in framesToAck hashtable: " + dump(command));
474 return;
475 }
476
477 requestHolder.returnQos = (String) headers.get(XB_SERVER_HEADER_QOS);
478 if (log.isLoggable(Level.FINE)) log.fine(ME + " " + requestHolder.toString() + " ACK release and notify ...");
479
480 removeFrameForMessageId(messageId);
481 notifyFrameAck(requestHolder);
482 }
483
484 protected void onStompNak(StompFrame command) throws Exception {
485 if (!checkStompConnected())
486 return;
487 @SuppressWarnings("unchecked")
488 Map headers = command.getHeaders();
489 String messageId = (String) headers.get(Stomp.Headers.Ack.MESSAGE_ID);
490 if (messageId == null) {
491 log.warning(ME + " NAK API error: missing messageId: " + dump(command));
492 throw new ProtocolException(
493 ME + " NAK received without a message-id to acknowledge!");
494 }
495 String errorCode = (String) headers.get("errorCode");
496 ErrorCode errorCodeEnum = ErrorCode.toErrorCode(errorCode, ErrorCode.USER_CLIENTCODE);
497 String message = (String) headers.get(Stomp.Headers.Error.MESSAGE);
498 if (message == null)
499 message = "UPDATE failed, client has rejected message";
500
501 RequestHolder requestHolder = framesToAck.get(messageId);
502 if (requestHolder == null) {
503 // Happens on multiple Ack or on wrong messageId
504 log.warning(getExtendedLogId() + " Internal NAK API error: messageId=" + messageId + " not found in framesToAck hashtable: " + dump(command));
505 }
506 requestHolder.xmlBlasterException = new XmlBlasterException(glob, errorCodeEnum, message);
507
508 log.info(ME + " NAK release and notify ... " + errorCode);
509 removeFrameForMessageId(messageId);
510 notifyFrameAck(requestHolder);
511 }
512
513 private boolean notifyFrameAck(RequestHolder requestHolder) {
514 if (requestHolder != null && requestHolder.stompFrame != null) {
515 synchronized (requestHolder.stompFrame) {
516 try {
517 requestHolder.stompFrame.notify();
518 return true;
519 }
520 catch (Throwable e) {
521 e.printStackTrace();
522 return false;
523 }
524 }
525 }
526 return false;
527 }
528
529 // ===================== I_CallbackDriver ==========================
530
531 /*
532 * This Code Area handles the outgoing xmlBlaster messages by implementing
533 * the I_CallbackDRiver Interface
534 */
535
536 //@Override()
537 public String getName() {
538 return ME;
539 }
540
541 //@Override()
542 public String getProtocolId() {
543 return PROTOCOL_NAME;
544 }
545
546 //@Override()
547 public String getRawAddress() {
548 return outputHandler.toString();
549 }
550
551 /**
552 * How long to block on remote call waiting on a ping response. The default
553 * is to block for one minute This method can be overwritten by
554 * implementations like EMAIL
555 */
556 public long getDefaultPingResponseTimeout() {
557 return Constants.MINUTE_IN_MILLIS;
558 }
559
560 /**
561 * How long to block on remote call waiting on a update() response. The
562 * default is to block forever This method can be overwritten by
563 * implementations like EMAIL
564 */
565 public long getDefaultUpdateResponseTimeout() {
566 return Integer.MAX_VALUE;
567 }
568
569 /**
570 * Set the given millis to protect against blocking client for ping
571 * invocations.
572 *
573 * @param millis
574 * If <= 0 it is set to the default (one minute). An argument
575 * less than or equal to zero means not to wait at all and is not
576 * supported
577 */
578 public final void setPingResponseTimeout(long millis) {
579 if (millis <= 0L) {
580 log.warning(ME + " pingResponseTimeout=" + millis
581 + " is invalid, setting it to "
582 + getDefaultPingResponseTimeout() + " millis");
583 this.pingResponseTimeout = getDefaultPingResponseTimeout();
584 } else
585 this.pingResponseTimeout = millis;
586 }
587
588 /**
589 * Set the given millis to protect against blocking client for update()
590 * invocations.
591 *
592 * @param millis
593 * If <= 0 it is set to the default (one minute). An argument
594 * less than or equal to zero means not to wait at all and is not
595 * supported
596 */
597 public final void setUpdateResponseTimeout(long millis) {
598 if (millis <= 0L) {
599 log.warning(ME + " updateResponseTimeout=" + millis
600 + " is invalid, setting it to "
601 + getDefaultUpdateResponseTimeout() + " millis");
602 this.updateResponseTimeout = getDefaultUpdateResponseTimeout();
603 } else
604 this.updateResponseTimeout = millis;
605 }
606
607 /**
608 * @return Returns the responseTimeout.
609 */
610 public long getResponseTimeout(MethodName methodName) {
611 if (MethodName.PING.equals(methodName)) {
612 return this.pingResponseTimeout;
613 } else if (MethodName.UPDATE.equals(methodName)) {
614 return this.updateResponseTimeout;
615 }
616 return this.updateResponseTimeout;
617 // return this.responseTimeout;
618 }
619
620 //@Override()
621 public void init(Global glob, CallbackAddress addressConfig)
622 throws XmlBlasterException {
623
624 setPingResponseTimeout(addressConfig.getEnv("pingResponseTimeout",
625 getDefaultPingResponseTimeout()).getValue());
626 if (log.isLoggable(Level.FINE))
627 log.fine(addressConfig.getEnvLookupKey("pingResponseTimeout") + "="
628 + this.pingResponseTimeout);
629
630 setUpdateResponseTimeout(addressConfig.getEnv("updateResponseTimeout",
631 getDefaultUpdateResponseTimeout()).getValue());
632 if (log.isLoggable(Level.FINE))
633 log.fine(addressConfig.getEnvLookupKey("updateResponseTimeout")
634 + "=" + this.updateResponseTimeout);
635 }
636
637 //@Override()
638 public boolean isAlive() {
639 return this.stompOpened;
640 }
641
642 //@Override()
643 public String ping(String qos) throws XmlBlasterException {
644 // never ping client without session
645 // <qos><state info='INITIAL'/></qos>
646 if (log.isLoggable(Level.FINE)) log.fine("Ping again");
647 if (qos != null && qos.indexOf("INITIAL") != -1)
648 return "<qos/>";
649 if (!checkStompConnected())
650 throw new XmlBlasterException(glob,
651 ErrorCode.COMMUNICATION_NOCONNECTION, ME,
652 "Stomp callback ping failed");
653 StompFrame frame = new StompFrame();
654 frame.setAction(XB_SERVER_COMMAND_PING);
655 frame.getHeaders().put(XB_SERVER_HEADER_QOS, qos);
656 String returnValue = sendFrameAndWait(frame, MethodName.PING);
657 return returnValue;
658 }
659
660 public I_ProgressListener registerProgressListener(
661 I_ProgressListener listener) {
662 return null;
663 }
664
665 /**
666 * HTTP header key/value should not contain new line.
667 * @param str
668 * @return
669 */
670 private String cleanNewlines(String str) {
671 return ReplaceVariable.replaceAll(str, "\n", " ");
672 }
673
674 private KeyData getKeyData(MsgUnitRaw msgUnitRaw) {
675 try {
676 if (msgUnitRaw == null)
677 return null;
678 MsgUnit msgUnit = (MsgUnit)msgUnitRaw.getMsgUnit();
679 if (msgUnit == null)
680 return null;
681 return msgUnit.getKeyData();
682 }
683 catch(Throwable e) {
684 e.printStackTrace();
685 return null;
686 }
687 }
688
689 private String getContentType(MsgUnitRaw msgUnitRaw) {
690 KeyData keyData = getKeyData(msgUnitRaw);
691 if (keyData != null) {
692 String contentType = keyData.getContentMime();
693 if (contentType != null && contentType.length() > 0)
694 return contentType;
695 }
696 return "text/xml";
697 }
698
699 public String[] sendUpdate(MsgUnitRaw[] msgArr) throws XmlBlasterException {
700 String[] ret = new String[msgArr.length];
701 int i = 0;
702 for (MsgUnitRaw msgUnitRaw : msgArr) {
703 StompFrame frame = new StompFrame();
704 // MsgUnit msg = (MsgUnit) msgUnit.getMsgUnit();
705 // String senderLoginName =
706 // msg.getQosData().getSender().getAbsoluteName();
707 MsgUnit msg = (MsgUnit) msgUnitRaw.getMsgUnit();
708 String topicId = msg.getKeyOid();
709 frame.setAction(Stomp.Responses.MESSAGE);
710 frame.getHeaders().put(Stomp.Headers.Message.DESTINATION, topicId);
711 frame.getHeaders().put("methodName", MethodName.UPDATE.toString());
712 String contentType = getContentType(msgUnitRaw);
713 frame.getHeaders().put("content-type", contentType); // "text/xml"
714 frame.getHeaders().put(XB_SERVER_HEADER_KEY, cleanNewlines(msgUnitRaw.getKey()));
715 frame.getHeaders().put(XB_SERVER_HEADER_QOS, cleanNewlines(msgUnitRaw.getQos()));
716 byte[] content = msgUnitRaw.getContent();
717 frame.getHeaders().put(Stomp.Headers.CONTENT_LENGTH, content.length);
718 frame.setContent(content);
719 if (log.isLoggable(Level.FINER)) log.finer(ME + " UPDATE Sending now ... " + msgUnitRaw.getKey().trim());
720 ret[i] = sendFrameAndWait(frame, MethodName.UPDATE);
721 if (log.isLoggable(Level.FINER)) log.finer(ME + " UPDATE Done " + msgUnitRaw.getKey().trim() + ": " + ret[i]);
722 i++;
723 }
724 return ret;
725 }
726
727 public void sendUpdateOneway(MsgUnitRaw[] msgArr)
728 throws XmlBlasterException {
729 for (MsgUnitRaw msgUnitRaw : msgArr) {
730 try {
731 StompFrame frame = new StompFrame();
732 MsgUnit msg = (MsgUnit) msgUnitRaw.getMsgUnit();
733 String topicId = msg.getKeyOid();
734 frame.setAction(Stomp.Responses.MESSAGE);
735 frame.getHeaders().put(Stomp.Headers.Message.DESTINATION,
736 topicId);
737 frame.getHeaders().put("methodName", MethodName.UPDATE_ONEWAY);
738 frame.getHeaders().put("content-type", getContentType(msgUnitRaw)); // "text/xml"
739 frame.getHeaders().put(XB_SERVER_HEADER_KEY, cleanNewlines(msgUnitRaw.getKey()));
740 frame.getHeaders().put(XB_SERVER_HEADER_QOS, cleanNewlines(msgUnitRaw.getQos()));
741 frame.getHeaders().put(Stomp.Headers.CONTENT_LENGTH,
742 msgUnitRaw.getContent().length);
743 frame.setContent(msgUnitRaw.getContent());
744 sendFrameNoWait(frame);
745 } catch (Exception e) {
746 log.severe(e.getMessage());
747 }
748 }
749
750 }
751
752 public String getType() {
753 return PROTOCOL_NAME;
754 }
755
756 public String getVersion() {
757 return "1.0";
758 }
759
760 public void init(Global glob, PluginInfo pluginInfo)
761 throws XmlBlasterException {
762 }
763
764 /*
765 * private internal stuff
766 */
767
768 private RequestHolder getFrameForMessageId(String messageId) {
769 return (framesToAck.get(messageId));
770 }
771
772 private RequestHolder registerFrame(StompFrame frame) {
773 //String messageId = "" + new Timestamp().getTimestamp();
774 //String messageId = frame.getAction() + "-" + secretSessionId + "-" + System.currentTimeMillis();
775 String messageId = frame.getAction() + "-" + new Timestamp().getTimestamp();
776 frame.getHeaders().put(Stomp.Headers.Message.MESSAGE_ID, messageId);
777 RequestHolder requestHolder = new RequestHolder(messageId, frame);
778 framesToAck.put(messageId, requestHolder);
779 return requestHolder;
780 }
781
782 private void removeFrameForMessageId(String messageId) {
783 if (messageId == null)
784 return;
785 if (framesToAck.get(messageId) != null)
786 framesToAck.remove(messageId);
787 }
788
789 private void sendFrameNoWait(StompFrame frame) throws XmlBlasterException {
790 checkStompConnected();
791 try {
792 outputHandler.onStompFrame(frame);
793 } catch (Exception e) {
794 e.printStackTrace();
795 throw new XmlBlasterException(this.glob,
796 ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME
797 + ".sendFrameNoWait", e.getMessage());
798 }
799 }
800
801 private String sendFrameAndWait(StompFrame frame, MethodName methodName)
802 throws XmlBlasterException {
803 final RequestHolder requestHolder = registerFrame(frame);
804 try {
805 checkStompConnected();
806 long timeout = getResponseTimeout(methodName);
807 if (log.isLoggable(Level.FINE))
808 log.fine(ME + " " + requestHolder.toString() + ": Sending now " + methodName.toString() + "...");
809 synchronized (frame) {
810 outputHandler.onStompFrame(frame);
811 frame.wait(timeout); // TODO: Port to CountDownLatch cdl = new CountDownLatch(1);
812 }
813 // Timeout occurred if requestHolder was not removed by ACK or NAK:
814 if (requestHolder == getFrameForMessageId(requestHolder.messageId)) {
815 String text = "methodName=" + methodName.toString() + " messageId=" + requestHolder.messageId + ": No Ack recieved in timeoutMillis=" + timeout;
816 log.warning(text + ": " + dump(frame));
817 removeFrameForMessageId(requestHolder.messageId);
818 throw new XmlBlasterException(this.glob,
819 ErrorCode.COMMUNICATION_TIMEOUT, ME
820 + ".sendFrameAndWait",
821 text);
822 }
823 } catch (Exception e) {
824 if (e instanceof XmlBlasterException)
825 throw (XmlBlasterException) e;
826 else
827 throw new XmlBlasterException(this.glob,
828 ErrorCode.COMMUNICATION_NOCONNECTION_DEAD,
829 // ErrorCode.COMMUNICATION_NOCONNECTION_CALLBACKSERVER_NOTAVAILABLE,
830 ME + ".sendFrameAndWait", e.getMessage());
831 }
832 // http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.update.html
833 if (requestHolder.shutdown) {
834 // connection was lost
835 log.warning(getExtendedLogId() + requestHolder.toString() + ": Shutdown during callback delivery: " + dump(frame));
836 throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_RESPONSETIMEOUT, ME + " Shutdown during update delivery");
837 //return "<qos><state id='FAIL'/>";
838 }
839 else if (requestHolder.xmlBlasterException != null) { // on XmlBlasterException
840 log.warning(ME + " " + requestHolder.toString() + ": Exception from client: " + requestHolder.xmlBlasterException.getMessage() + ": " + dump(frame));
841 throw requestHolder.xmlBlasterException;
842 }
843 else { // requestHolder.returnQos should filled
844 if (log.isLoggable(Level.FINE))
845 log.fine(ME + " " + requestHolder.toString() + ": Successfully send and acknowledged " + requestHolder.returnQos);
846 return (requestHolder.returnQos == null) ? "<qos/>" : requestHolder.returnQos;
847 }
848 }
849
850 @SuppressWarnings("unchecked")
851 private void sendResponse(StompFrame command) throws XmlBlasterException {
852 final String receiptId = (String) command.getHeaders().get(
853 Stomp.Headers.RECEIPT_REQUESTED);
854 // A response may not be needed.
855 if (receiptId != null) {
856 StompFrame sc = new StompFrame();
857 sc.setAction(Stomp.Responses.RECEIPT);
858 sc.setHeaders(new HashMap());
859 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
860 sendFrameNoWait(sc);
861 }
862 }
863
864 public static byte[] toUtf8Bytes(String s) {
865 if (s == null || s.length() == 0)
866 return new byte[0];
867 try {
868 return s.getBytes(Constants.UTF8_ENCODING);
869 } catch (UnsupportedEncodingException e) {
870 log.severe("PANIC in WatcheeConstants.toUtf8Bytes(" + s
871 + ", " + Constants.UTF8_ENCODING + "): " + e.toString());
872 e.printStackTrace();
873 return s.getBytes();
874 }
875 }
876
877 @SuppressWarnings("unchecked")
878 private void sendExeption(StompFrame command, XmlBlasterException e) {
879 try {
880 final String receiptId = (String) command.getHeaders().get(
881 Stomp.Headers.RECEIPT_REQUESTED);
882 StompFrame sc = new StompFrame();
883 sc.setAction(Stomp.Responses.ERROR);
884 sc.setHeaders(new HashMap());
885 sc.getHeaders().put("errorCode", e.getErrorCodeStr()); // xmlBlaster way
886 sc.getHeaders().put(Stomp.Responses.MESSAGE, e.getErrorCodeStr()); // stomp
887 // wants
888 // it
889 if (receiptId != null) {
890 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
891 }
892 String text = e.getMessage();
893 sc.getHeaders().put(Stomp.Headers.CONTENT_LENGTH, text.length());
894 sc.setContent(toUtf8Bytes(text));
895 try {
896 sendFrameNoWait(sc);
897 } catch (XmlBlasterException e1) {
898 e1.printStackTrace();
899 }
900 log.warning(ME + "sendException" + e.getMessage() + ": " + dump(command));
901 }
902 catch (Throwable e2) {
903 e2.printStackTrace();
904 log.severe("sendExeption failed for " + e.toString() + ": " + e2.toString() + ": " + dump(command));
905 }
906 }
907 }
syntax highlighted by Code2HTML, v. 0.9.1