1 /*------------------------------------------------------------------------------
2 Name: DispatchManager.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.client.dispatch;
7 import java.util.ArrayList;
8 import java.util.HashSet;
9 import java.util.List;
10 import java.util.logging.Level;
11 import java.util.logging.Logger;
12
13 import org.xmlBlaster.authentication.plugins.I_MsgSecurityInterceptor;
14 import org.xmlBlaster.client.I_XmlBlasterAccess;
15 import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;
16 import org.xmlBlaster.util.Global;
17 import org.xmlBlaster.util.MsgUnit;
18 import org.xmlBlaster.util.SessionName;
19 import org.xmlBlaster.util.Timestamp;
20 import org.xmlBlaster.util.XmlBlasterException;
21 import org.xmlBlaster.util.def.Constants;
22 import org.xmlBlaster.util.def.ErrorCode;
23 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
24 import org.xmlBlaster.util.dispatch.DispatchConnection;
25 import org.xmlBlaster.util.dispatch.DispatchConnectionsHandler;
26 import org.xmlBlaster.util.dispatch.DispatchStatistic;
27 import org.xmlBlaster.util.dispatch.DispatchWorker;
28 import org.xmlBlaster.util.dispatch.I_ConnectionStatusListener;
29 import org.xmlBlaster.util.dispatch.I_DispatchManager;
30 import org.xmlBlaster.util.dispatch.I_PostSendListener;
31 import org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor;
32 import org.xmlBlaster.util.error.I_MsgErrorHandler;
33 import org.xmlBlaster.util.error.MsgErrorInfo;
34 import org.xmlBlaster.util.plugin.PluginManagerBase;
35 import org.xmlBlaster.util.property.PropString;
36 import org.xmlBlaster.util.qos.address.AddressBase;
37 import org.xmlBlaster.util.qos.address.CallbackAddress;
38 import org.xmlBlaster.util.queue.I_Entry;
39 import org.xmlBlaster.util.queue.I_Queue;
40 import org.xmlBlaster.util.queue.I_QueueEntry;
41 import org.xmlBlaster.util.queue.I_QueuePutListener;
42 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
43
44 /**
45 * Manages the sending of messages and commands and does error recovery
46 * further we communicate with the dispatcher plugin if one is configured.
47 * <p />
48 * There is one instance of this class per queue and remote connection.
49 * @author xmlBlaster@marcelruff.info
50 */
51 public final class ClientDispatchManager implements I_DispatchManager
52 {
53 public final String ME;
54 private final Global glob;
55 private static Logger log = Logger.getLogger(ClientDispatchManager.class.getName());
56 private final I_Queue msgQueue;
57 private final ClientDispatchConnectionsHandler dispatchConnectionsHandler;
58 private final I_MsgErrorHandler failureListener;
59 private final I_MsgSecurityInterceptor securityInterceptor;
60 private final I_MsgDispatchInterceptor msgInterceptor;
61 private HashSet connectionStatusListeners;
62 private final String typeVersion;
63 /** If > 0 does burst mode */
64 private long collectTime = -1L;
65 private long toAliveTime = 0;
66 private long toPollingTime = 0;
67
68 private boolean dispatchWorkerIsActive = false;
69
70 /** The worker for synchronous invocations */
71 private DispatchWorker syncDispatchWorker;
72
73 private Timestamp timerKey = null;
74
75 private int notifyCounter = 0;
76
77 private boolean isShutdown = false;
78 private boolean isSyncMode = false;
79 private boolean trySyncMode = false; // true: client side queue embedding, false: server side callback queue
80
81 private boolean inAliveTransition = false;
82 private final Object ALIVE_TRANSITION_MONITOR = new Object();
83
84 private int burstModeMaxEntries = -1;
85 private long burstModeMaxBytes = -1L;
86
87 /** async delivery is activated only when this flag is 'true'. Used to temporarly inhibit dispatch of messages */
88 private boolean dispatcherActive = true;
89
90 private boolean shallCallToAliveSync;
91 private boolean inDispatchManagerCtor;
92
93 private SessionName sessionName;
94
95 /**
96 * @param msgQueue The message queue which i use (!!! TODO: this changes, we should pass it on every method where needed)
97 * @param connectionStatusListener The implementation which listens on connectionState events (e.g. XmlBlasterAccess.java), or null
98 * @param addrArr The addresses i shall connect to
99 */
100 public ClientDispatchManager(Global glob, I_MsgErrorHandler failureListener,
101 I_MsgSecurityInterceptor securityInterceptor,
102 I_Queue msgQueue, I_ConnectionStatusListener connectionStatusListener,
103 AddressBase[] addrArr, SessionName sessionName) throws XmlBlasterException {
104 if (failureListener == null || msgQueue == null)
105 throw new IllegalArgumentException("DispatchManager failureListener=" + failureListener + " msgQueue=" + msgQueue);
106 this.inDispatchManagerCtor = true;
107 this.ME = msgQueue.getStorageId().getId();
108 this.glob = glob;
109
110 this.sessionName = sessionName;
111
112 if (log.isLoggable(Level.FINE)) log.fine(ME+": Loading DispatchManager ...");
113
114 this.msgQueue = msgQueue;
115 this.failureListener = failureListener;
116 this.securityInterceptor = securityInterceptor;
117 this.dispatchConnectionsHandler = new ClientDispatchConnectionsHandler(glob, this);
118 this.connectionStatusListeners = new HashSet();
119 if (connectionStatusListener != null) this.connectionStatusListeners.add(connectionStatusListener);
120
121 initDispatcherActive(addrArr);
122
123 /*
124 * Check i a plugin is configured ("DispatchPlugin/defaultPlugin")
125 * If configured, the plugin instance is searched in the Global scope
126 * and if none is found one is created (see DispatcherPluginManager)
127 * Default server setting is to use no dispatcher plugin
128 */
129 PropString propString = new PropString(PluginManagerBase.NO_PLUGIN_TYPE); // "undef";
130 if (addrArr != null && addrArr.length > 0) // Check if client wishes a specific plugin
131 propString.setValue(addrArr[0].getDispatchPlugin());
132 this.typeVersion = propString.getValue();
133 this.msgInterceptor = glob.getDispatchPluginManager().getPlugin(this.typeVersion); // usually from cache
134 if (log.isLoggable(Level.FINE)) log.fine(ME+": DispatchPlugin/defaultPlugin=" + propString.getValue() + " this.msgInterceptor=" + this.msgInterceptor);
135 if (this.msgInterceptor != null) {
136 this.msgInterceptor.addDispatchManager(this);
137 if (log.isLoggable(Level.FINE)) log.fine(ME+": Activated dispatcher plugin '" + this.typeVersion + "'");
138 }
139
140 this.msgQueue.addPutListener(this); // to get putPre() and putPost() events
141
142 this.dispatchConnectionsHandler.initialize(addrArr);
143 this.inDispatchManagerCtor = false;
144 }
145
146 /**
147 * @return Never null
148 */
149 public SessionName getSessionName() {
150 return this.sessionName;
151 }
152
153 public boolean isSyncMode() {
154 return this.isSyncMode;
155 }
156
157 /**
158 * Set behavior of dispatch framework.
159 * @param trySyncMode true: client side queue embedding, false: server side callback queue
160 * defaults to false
161 */
162 public void trySyncMode(boolean trySyncMode) {
163 this.trySyncMode = trySyncMode;
164 switchToSyncMode();
165 }
166
167 /**
168 * Reconfigure dispatcher with given properties.
169 *
170 * Note that only a limited re-configuration is supported
171 * @param addressArr The new configuration
172 */
173 public final void updateProperty(CallbackAddress[] addressArr) throws XmlBlasterException {
174 initDispatcherActive(addressArr);
175 this.dispatchConnectionsHandler.initialize(addressArr);
176 }
177
178 public void finalize() {
179 try {
180 removeBurstModeTimer();
181 //if (log.isLoggable(Level.FINE)) log.fine(ME+": finalize - garbage collected");
182 }
183 catch (Throwable e) {
184 e.printStackTrace();
185 }
186 try {
187 super.finalize();
188 }
189 catch (Throwable e) {
190 e.printStackTrace();
191 }
192 }
193
194 public I_Queue getQueue() {
195 return this.msgQueue;
196 }
197
198 /*
199 * Register yourself if you want to be informed about the remote connection status.
200 * @param connectionStatusListener The implementation which listens on connectionState events (e.g. XmlBlasterAccess.java)
201 * @return true if we did not already contain the specified element.
202 */
203 public synchronized boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener) {
204 return this.connectionStatusListeners.add(connectionStatusListener);
205 }
206
207 public synchronized boolean addConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener, boolean fireInitial) {
208 if (connectionStatusListener == null) return true;
209 boolean ret = this.connectionStatusListeners.add(connectionStatusListener);
210 if (fireInitial) {
211 if (isDead())
212 connectionStatusListener.toDead(this, ConnectionStateEnum.DEAD, null/*"Initial call"*/);
213 else if (isPolling())
214 connectionStatusListener.toPolling(this, ConnectionStateEnum.POLLING);
215 else
216 connectionStatusListener.toAlive(this, ConnectionStateEnum.ALIVE);
217 }
218 return ret;
219 }
220
221
222 /**
223 * Remove the given listener
224 * @param connectionStatusListener
225 * @return true if it was removed
226 */
227 public synchronized boolean removeConnectionStatusListener(I_ConnectionStatusListener connectionStatusListener) {
228 return this.connectionStatusListeners.remove(connectionStatusListener);
229 }
230
231 public synchronized I_ConnectionStatusListener[] getConnectionStatusListeners() {
232 if (this.connectionStatusListeners.size() == 0)
233 return new I_ConnectionStatusListener[0];
234 return (I_ConnectionStatusListener[])this.connectionStatusListeners.toArray(new I_ConnectionStatusListener[this.connectionStatusListeners.size()]);
235 }
236
237 /**
238 * The name in the configuration file for the plugin
239 * @return e.g. "Priority,1.0"
240 */
241 public String getTypeVersion() {
242 return this.typeVersion;
243 }
244
245 /**
246 * @return The import/export encrypt handle or null if created by a SubjectInfo (no session info available)
247 */
248 public I_MsgSecurityInterceptor getMsgSecurityInterceptor() {
249 return this.securityInterceptor;
250 }
251
252 /**
253 * @return The handler of all callback plugins, is never null
254 */
255 public final DispatchConnectionsHandler getDispatchConnectionsHandler() {
256 return this.dispatchConnectionsHandler;
257 }
258
259 /**
260 * How many messages maximum shall the callback thread take in one bulk out of the
261 * callback queue and deliver in one bulk.
262 */
263 public final int getBurstModeMaxEntries() {
264 return this.burstModeMaxEntries;
265 }
266
267 /**
268 * How many bytes maximum shall the callback thread take in one bulk out of the
269 * callback queue and deliver in one bulk.
270 */
271 public final long getBurstModeMaxBytes() {
272 return this.burstModeMaxBytes;
273 }
274
275 /**
276 * Get timestamp when we went to ALIVE state.
277 * @return millis timestamp
278 */
279 public final long getAliveSinceTime() {
280 return this.toAliveTime;
281 }
282
283 /**
284 * Get timestamp when we went to POLLING state.
285 * @return millis timestamp
286 */
287 public final long getPollingSinceTime() {
288 return this.toPollingTime;
289 }
290
291 /**
292 * Call by DispatchConnectionsHandler on state transition
293 * NOTE: toAlive is called initially when a protocol plugin is successfully loaded
294 * but we don't know yet if it ever is able to connect
295 */
296 public void toAlive(ConnectionStateEnum oldState) {
297
298 if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to ALIVE");
299
300 // Remember the current collectTime
301 AddressBase addr = this.dispatchConnectionsHandler.getAliveAddress();
302 if (addr == null) {
303 log.severe(ME+": toAlive action has no alive address");
304 return;
305 }
306
307 try {
308 this.inAliveTransition = true;
309
310 if (this.toAliveTime <= this.toPollingTime) {
311 this.toAliveTime = System.currentTimeMillis();
312 }
313
314 this.burstModeMaxEntries = addr.getBurstModeMaxEntries();
315 this.burstModeMaxBytes = addr.getBurstModeMaxBytes();
316
317 synchronized (this.ALIVE_TRANSITION_MONITOR) {
318 // 1. We allow a client to intercept and for example destroy all entries in the queue
319 I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();
320 for (int i=0; i<listeners.length; i++) {
321 listeners[i].toAlive(this, oldState);
322 }
323 // 2. If a dispatch plugin is registered it may do its work
324 if (this.msgInterceptor != null)
325 this.msgInterceptor.toAlive(this, oldState);
326 }
327 }
328 finally {
329 this.inAliveTransition = false;
330 }
331
332 collectTime = addr.getCollectTime(); // burst mode if > 0L
333
334 // 3. Deliver. Will be delayed if burst mode timer is activated, will switch to sync mode if necessary
335 activateDispatchWorker();
336
337 if (this.shallCallToAliveSync && !this.inDispatchManagerCtor && this.isSyncMode)
338 callToAliveSync();
339 }
340
341 public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
342 I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();
343 for (int i=0; i<listeners.length; i++) {
344 listeners[i].toAlive(this, oldState);
345 }
346 }
347
348 /** Call by DispatchConnectionsHandler on state transition */
349 public void toPolling(ConnectionStateEnum oldState) {
350
351 if (isDead()) {
352 return;
353 }
354
355 if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to POLLING");
356 if (this.toPollingTime <= this.toAliveTime) {
357 this.toPollingTime = System.currentTimeMillis();
358 }
359 switchToASyncMode();
360
361 // 1. We allow a client to intercept and for example destroy all entries in the queue
362 I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();
363 for (int i=0; i<listeners.length; i++) {
364 listeners[i].toPolling(this, oldState);
365 }
366
367 // 2. If a dispatch plugin is registered it may do its work
368 if (this.msgInterceptor != null)
369 this.msgInterceptor.toPolling(this, oldState);
370 }
371
372 /**
373 *
374 * @param ex
375 */
376 public void toDead(XmlBlasterException ex) {
377 shutdownFomAnyState(ConnectionStateEnum.UNDEF, ex);
378 }
379
380 /** Call by DispatchConnectionsHandler on state transition */
381 public void shutdownFomAnyState(ConnectionStateEnum oldState, XmlBlasterException ex) {
382 if (log.isLoggable(Level.FINER)) log.finer(ME+": Switch from " + oldState + " to DEAD");
383 if (oldState == ConnectionStateEnum.DEAD) return;
384 if (this.isShutdown) return;
385 if (ex != null) { // Very dangerous code! The caller ends up with changed Exception type
386 ex.changeErrorCode(ErrorCode.COMMUNICATION_NOCONNECTION_DEAD);
387 }
388 else {
389 ex = new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME,
390 "Switch from " + oldState + " to DEAD, reason is not known");
391 }
392
393 // 1. We allow a client to intercept and for example destroy all entries in the queue
394 I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();
395 for (int i=0; i<listeners.length; i++) {
396 try {
397 // Only pass original ex.getMessage() - not the changed errorCode
398 listeners[i].toDead(this, oldState, ex);
399 }
400 catch (Throwable e) {
401 e.printStackTrace();
402 }
403 }
404
405 // 2. If a dispatch plugin is registered it may do its work
406 if (this.msgInterceptor != null)
407 this.msgInterceptor.toDead(this, oldState, ex);
408
409 if (oldState != ConnectionStateEnum.UNDEF)
410 givingUpDelivery(ex);
411 }
412
413 private void givingUpDelivery(XmlBlasterException ex) {
414 if (log.isLoggable(Level.FINE)) log.fine(ME+": Entering givingUpDelivery(), state is " + this.dispatchConnectionsHandler.getState());
415 removeBurstModeTimer();
416
417 boolean userThread = this.dispatchConnectionsHandler.isUserThread();
418 if (!userThread) { // If the client user thread it will receive the exception and handle it self
419 // The error handler flushed the queue and does error handling with them
420 getMsgErrorHandler().handleError(new MsgErrorInfo(glob, (MsgQueueEntry)null, this, ex));
421 }
422
423 shutdown();
424 }
425
426 public void postSendNotification(MsgQueueEntry entry) {
427 MsgQueueEntry[] entries = new MsgQueueEntry[] { entry };
428 postSendNotification(entries);
429 }
430
431 public void postSendNotification(MsgQueueEntry[] entries) {
432 I_PostSendListener postSendListener = this.dispatchConnectionsHandler.getPostSendListener();
433 if (postSendListener != null) {
434 try {
435 postSendListener.postSend(entries);
436 }
437 catch (Throwable e) {
438 e.printStackTrace();
439 log.warning("postSendListener.postSend() exception: " + e.toString());
440 }
441 }
442 }
443
444 /**
445 * Notify I_PostSendListener about problem.
446 * <p>
447 * Typically XmlBlasterAccess is notified when message came asynchronously from queue
448 *
449 * @param entryList
450 * @param ex
451 * @return true if processed
452 * @see I_PostSendListener#postSend(MsgQueueEntry) for explanation
453 */
454 public boolean sendingFailedNotification(MsgQueueEntry[] entries, XmlBlasterException ex) {
455 I_PostSendListener postSendListener = this.dispatchConnectionsHandler.getPostSendListener();
456 if (postSendListener == null)
457 return false;
458 try {
459 return postSendListener.sendingFailed(entries, ex);
460 }
461 catch (Throwable e) {
462 e.printStackTrace();
463 log.warning("postSendListener.sendingFailed() exception: " + e.toString());
464 return false;
465 }
466 }
467
468 /**
469 * Called by DispatchWorker if an Exception occured in sync mode
470 * Only on client side
471 */
472 public void handleSyncWorkerException(List<I_Entry> entryList, Throwable throwable) throws XmlBlasterException {
473
474 if (log.isLoggable(Level.FINER)) log.finer(ME+": Sync delivery failed connection state is " + this.dispatchConnectionsHandler.getState().toString() + ": " + throwable.toString());
475
476 XmlBlasterException xmlBlasterException = XmlBlasterException.convert(glob,ME,null,throwable);
477
478 if (isDead()) throw xmlBlasterException;
479
480 if (xmlBlasterException.isUser()) {
481 // Exception from remote client from update(), pass it to error handler and carry on ...?
482 // A PublishPlugin could throw it
483 MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
484 getMsgErrorHandler().handleErrorSync(new MsgErrorInfo(glob, entries, this, xmlBlasterException));
485 return;
486 }
487 else if (xmlBlasterException.isCommunication()) {
488
489 if (this.msgInterceptor != null && isPolling()) { // If we have a plugin it shall handle it
490 try {
491 entryList = this.msgInterceptor.handleNextMessages(this, entryList);
492 if (entryList != null && entryList.size() > 0) {
493 MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
494 getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, xmlBlasterException));
495 }
496 }
497 catch (XmlBlasterException xmlBlasterException2) {
498 internalError(xmlBlasterException2);
499 }
500 if (entryList != null && entryList.size() > 0) {
501 MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
502 getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, xmlBlasterException));
503 }
504 return;
505 }
506
507 // Exception from connection to remote client (e.g. from Corba layer)
508 // DispatchManager handles this
509 // Error handling in sync mode
510 // 1. throwExceptionBackToPusher
511 // 2. Switch to async mode and collect message (wait on better times)
512 // 3. If we have serious problems (programming exceptions or isDead()) throw exception back
513 // 4. Pass exception to an error handler plugin
514 switchToASyncMode();
515
516 // Simulate return values, and manipulate missing informations into entries ...
517 I_QueueEntry[] entries = (I_QueueEntry[])entryList.toArray(new I_QueueEntry[entryList.size()]);
518 getDispatchConnectionsHandler().createFakedReturnObjects(entries, Constants.STATE_OK, Constants.INFO_QUEUED);
519 msgQueue.put(entries, I_Queue.IGNORE_PUT_INTERCEPTOR);
520
521 if (log.isLoggable(Level.FINE)) log.fine(ME+": Delivery failed, pushed " + entries.length + " entries into tail back queue");
522 }
523 else {
524 if (log.isLoggable(Level.FINE)) log.fine(ME+": Invocation failed: " + xmlBlasterException.getMessage());
525 throw xmlBlasterException;
526 }
527 }
528
529 /**
530 * Messages are successfully sent, remove them now from queue (sort of a commit()):
531 * We remove filtered/destroyed messages as well (which doen't show up in entryListChecked)
532 * @param postSendNotify TODO
533 */
534 public void removeFromQueue(MsgQueueEntry[] entries, boolean postSendNotify) throws XmlBlasterException {
535 I_MsgDispatchInterceptor msgInterceptor = getMsgDispatchInterceptor();
536 MsgUnit[] msgUnits = null;
537 if (msgInterceptor != null) { // we need to do this before removal since the msgUnits are weak references and would be deleted by gc
538 msgUnits = new MsgUnit[entries.length];
539 for (int i=0; i < msgUnits.length; i++) {
540 msgUnits[i] = entries[i].getMsgUnit();
541 }
542 }
543 this.msgQueue.removeRandom(entries);
544 /*(currently only done in sync invocation)
545 ArrayList defaultEntries = sendAsyncResponseEvent(entryList);
546 if (defaultEntries.size() > 0) {
547 MsgQueueEntry[] entries = (MsgQueueEntry[])defaultEntries.toArray(new MsgQueueEntry[defaultEntries.size()]);
548 this.msgQueue.removeRandom(entries);
549 }
550 */
551
552 if (postSendNotify)
553 postSendNotification(entries);
554
555 if (msgInterceptor != null) {
556 msgInterceptor.postHandleNextMessages(this, msgUnits);
557 }
558
559 if (log.isLoggable(Level.FINE)) log.fine("Commit of successful sending of " +
560 entries.length + " messages done, current queue size is " +
561 this.msgQueue.getNumOfEntries() + " '" + entries[0].getLogId() + "'");
562 }
563
564 /**
565 * Called by DispatchWorker if an Exception occurred in async mode.
566 * @throws XmlBlasterException should never happen but is possible during removing entries from queue
567 */
568 public void handleWorkerException(List<I_Entry> entryList, Throwable throwable) throws XmlBlasterException {
569 // Note: The DispatchManager is notified about connection problems directly by its DispatchConnectionsHandler
570 // we don't need to take care of ErrorCode.COMMUNICATION*
571 if (log.isLoggable(Level.FINER)) log.finer(ME+": Async delivery failed connection state is " + this.dispatchConnectionsHandler.getState().toString() + ": " + throwable.toString());
572 //Thread.currentThread().dumpStack();
573 if (entryList == null) {
574 if (!this.isShutdown)
575 log.warning(ME+": Didn't expect null entryList in handleWorkerException() for throwable " + throwable.getMessage() + toXml(""));
576 return;
577 }
578
579 getDispatchStatistic().setLastDeliveryException(throwable.toString());
580 getDispatchStatistic().incrNumDeliveryExceptions(1);
581
582 if (throwable instanceof XmlBlasterException) {
583 XmlBlasterException ex = (XmlBlasterException)throwable;
584 if (log.isLoggable(Level.FINE)) log.fine(ME+": Invocation or callback failed: " + ex.getMessage());
585 if (ex.isUser()) {
586 // Exception from remote client from update(), pass it to error handler and carry on ...
587 MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
588 boolean isHandled = sendingFailedNotification(entries, ex);
589 if (isHandled)
590 removeFromQueue(entries, false);
591 else
592 getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex));
593 }
594 else if (ex.isCommunication()) {
595
596 if (this.msgInterceptor != null) { // If we have a plugin it shall handle it
597 if (isPolling()) { // is this code really invoked ? Note of Michele Laghi on 2007-12-19
598 try {
599 entryList = this.msgInterceptor.handleNextMessages(this, entryList);
600 if (entryList != null && entryList.size() > 0) {
601 MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
602 getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex));
603 }
604 }
605 catch (XmlBlasterException ex2) {
606 internalError(ex2);
607 }
608 if (entryList != null && entryList.size() > 0) {
609 MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
610 getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex));
611 }
612 }
613 if (msgInterceptor != null) { // we want the exception notification at least
614 msgInterceptor.onDispatchWorkerException(this, ex);
615 }
616 }
617
618 // Exception from connection to remote client (e.g. from Corba layer)
619 // DispatchManager handles this
620 }
621 else {
622 //log.severe(ME+": Callback failed: " + ex.toString());
623 //ex.printStackTrace();
624 MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);
625 boolean isHandled = sendingFailedNotification(entries, ex);
626 if (isHandled)
627 removeFromQueue(entries, false);
628 else
629 internalError(ex);
630 }
631 }
632 else {
633 //log.severe(ME+": Callback failed: " + throwable.toString());
634 //throwable.printStackTrace();
635 XmlBlasterException ex = new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "", throwable);
636 // sendingFailedNotification() not called as the msgs remain in queue until problem is resolved by admin
637 internalError(ex);
638 }
639 }
640
641 public I_MsgErrorHandler getMsgErrorHandler() {
642 return this.failureListener;
643 }
644
645 /**
646 * We register a QueuePutListener and all put() into the queue are
647 * intercepted - our put() is called instead.
648 * We then deliver this QueueEntry directly to the remote
649 * connection and return synchronously the returned value or the
650 * Exception if one is thrown.
651 */
652 public void switchToSyncMode() {
653 if (this.isSyncMode) return;
654
655 synchronized (this) {
656 if (this.isSyncMode) return;
657 if (this.syncDispatchWorker == null) this.syncDispatchWorker = new DispatchWorker(glob, this);
658
659 this.isSyncMode = true;
660
661 if (this.timerKey != null)
662 log.severe(ME+": Burst mode timer was activated and we switched to synchronous delivery" +
663 " - handling of this situation is not coded yet");
664 removeBurstModeTimer();
665
666 boolean isAlive = isAlive();
667 log.info(ME+": Switched to synchronous message delivery, inAliveTransition=" + this.inAliveTransition + " isAlive=" + isAlive + " trySyncMode=" + this.trySyncMode);
668 if (isAlive) { // For FailSafePing
669 if (this.inAliveTransition) { // For FailSafeAsync
670 this.shallCallToAliveSync = true;
671 }
672 else {
673 callToAliveSync();
674 }
675 }
676 }
677 }
678
679 private void callToAliveSync() {
680 this.shallCallToAliveSync = false;
681 I_ConnectionStatusListener[] listeners = getConnectionStatusListeners();
682 for (int i=0; i<listeners.length; i++)
683 listeners[i].toAliveSync(this, ConnectionStateEnum.ALIVE);
684 }
685
686 /**
687 * Switch back to asynchronous mode.
688 * Our thread pool will take the messages out of the queue
689 * and deliver them in asynchronous mode.
690 */
691 public void switchToASyncMode() {
692 if (!this.isSyncMode) return;
693
694 synchronized (this) {
695 if (!this.isSyncMode) return;
696 //this.msgQueue.removePutListener(this);
697 this.isSyncMode = false;
698 activateDispatchWorker(); // just in case there are some messages pending in the queue
699 log.info(ME+": Switched to asynchronous message delivery");
700 }
701 }
702
703 /**
704 * @see I_QueuePutListener#putPre(I_QueueEntry)
705 */
706 public boolean putPre(I_QueueEntry queueEntry) throws XmlBlasterException {
707 //I_QueueEntry[] queueEntries = new I_QueueEntry[1];
708 //queueEntries[0] = queueEntry;
709 return putPre(new I_QueueEntry[] { queueEntry });
710 }
711
712 /**
713 * @see #putPre(I_QueueEntry)
714 * @see I_QueuePutListener#putPre(I_QueueEntry[])
715 */
716 public boolean putPre(I_QueueEntry[] queueEntries) throws XmlBlasterException {
717 if (!this.isSyncMode) {
718 /*
719 for (int i=0; i < queueEntries.length; i++) {
720 if (queueEntries[i] instanceof MsgQueueEntry) {
721 MsgQueueEntry msgQueueEntry = (MsgQueueEntry)queueEntries[i];
722 if (MethodName.SUBSCRIBE == msgQueueEntry.getMethodName()) {
723 if (getSessionName().getPublicSessionId() < 1) {
724 // we should never allow a subscription without a positive sessionId if the
725 // server is not accessible
726 throw new XmlBlasterException(glob, ErrorCode.RESOURCE_TEMPORARY_UNAVAILABLE, ME,
727 "Manager: The Subscription for '" + getSessionName().toString() + "' failed since the server is currently not available");
728 }
729 }
730 }
731 }
732 */
733 if (this.inAliveTransition) {
734 // Do not allow other threads to put messages to queue during transition to alive
735 synchronized (ALIVE_TRANSITION_MONITOR) {
736 // don't allow
737 }
738 }
739 return true; // Add entry to queue
740 }
741
742 if (log.isLoggable(Level.FINE)) log.fine(ME+": putPre() - Got " + queueEntries.length + " QueueEntries to deliver synchronously ...");
743 ArrayList entryList = new ArrayList(queueEntries.length);
744 for (int ii=0; ii<queueEntries.length; ii++) {
745 if (this.trySyncMode && !this.isSyncMode && queueEntries[ii] instanceof MsgQueueGetEntry) { // this.trySyncMode === isClientSide
746 throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "You can't call get() in asynchronous mode (gets can't be queued because we don't know its return value)");
747 }
748 entryList.add(queueEntries[ii]);
749 }
750 this.syncDispatchWorker.run(entryList);
751 return false;
752 }
753
754 /**
755 * @see I_QueuePutListener#putPost(I_QueueEntry)
756 */
757 public void putPost(I_QueueEntry queueEntry) throws XmlBlasterException {
758 if (!this.isSyncMode) {
759 if (this.dispatcherActive) notifyAboutNewEntry();
760 if (((MsgQueueEntry)queueEntry).wantReturnObj()) {
761 // Simulate return values, and manipulate missing informations into entries ...
762 I_QueueEntry[] entries = new I_QueueEntry[] { queueEntry };
763 getDispatchConnectionsHandler().createFakedReturnObjects(entries, Constants.STATE_OK, Constants.INFO_QUEUED);
764 }
765 }
766 }
767
768 /**
769 * @see #putPost(I_QueueEntry)
770 * @see I_QueuePutListener#putPost(I_QueueEntry[])
771 */
772 public void putPost(I_QueueEntry[] queueEntries) throws XmlBlasterException {
773 if (!this.isSyncMode) {
774 if (this.dispatcherActive) notifyAboutNewEntry();
775 if (queueEntries.length > 0 && ((MsgQueueEntry)queueEntries[0]).wantReturnObj()) {
776 // Simulate return values, and manipulate missing informations into entries ...
777 getDispatchConnectionsHandler().createFakedReturnObjects(queueEntries, Constants.STATE_OK, Constants.INFO_QUEUED);
778 }
779 }
780 }
781
782 /**
783 * Here we prepare messages which are coming directly from the queue.
784 * <ol>
785 * <li>We eliminate destroyed messages</li>
786 * <li>We make a shallow copy of the message.
787 * We need to do this, out messages are references directly into the queue.
788 * The delivery framework is later changing the QoS
789 * and plugins may change the content - and this should not modify the queue entries</li>
790 * </ol>
791 */
792 public ArrayList prepareMsgsFromQueue(List<I_Entry> entryList) {
793
794 if (entryList == null || entryList.size() < 1) {
795 if (log.isLoggable(Level.FINE)) log.fine(ME+": Got zero messages from queue, expected at least one, can happen if client disconnected in the mean time: " + toXml(""));
796 return null;
797 }
798 return prepareMsgsFromQueue(ME, log, this.msgQueue, entryList);
799 }
800
801 public static ArrayList prepareMsgsFromQueue(String logId, Logger log, I_Queue queue, List<I_Entry> entryList) {
802 // Remove all expired messages and do a shallow copy
803 int size = entryList.size();
804 ArrayList result = new ArrayList(size);
805 for (int ii=0; ii<size; ii++) {
806 MsgQueueEntry entry = (MsgQueueEntry)entryList.get(ii);
807 // Take care to remove the filtered away messages from the queue as well
808 if (entry.isDestroyed()) {
809 log.info(logId+": Message " + entry.getLogId() + " is destroyed, ignoring it");
810 if (log.isLoggable(Level.FINE)) log.fine("Message " + entry.getLogId() + " is destroyed, ignoring it: " + entry.toXml());
811 try {
812 queue.removeRandom(entry); // Probably change to use [] for better performance
813 }
814 catch (Throwable e) {
815 log.severe(logId+": Internal error when removing expired message " + entry.getLogId() + " from queue, no recovery implemented, we continue: " + e.toString());
816 }
817 continue;
818 }
819 result.add(entry.clone()); // expired messages are sent as well
820 }
821 return result;
822 }
823
824 /**
825 * When somebody puts a new entry into the queue, we want to be
826 * notified about this after the entry is fed.
827 * <p>
828 * Called by I_Queue.putPost()
829 */
830 public void notifyAboutNewEntry() {
831 if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering notifyAboutNewEntry("+this.notifyCounter+")");
832 this.notifyCounter++;
833 //activateDispatchWorker();
834
835 if (checkSending(true) == false)
836 return;
837
838 if (useBurstModeTimer() == true)
839 return;
840
841 startWorkerThread(false);
842 }
843
844 /**
845 * Counts how often a new entry was added since the current worker thread was started.
846 */
847 public int getNotifyCounter() {
848 return this.notifyCounter;
849 }
850
851 /**
852 * Give the callback worker thread a kick to deliver the messages.
853 * Throws no exception.
854 */
855 private void activateDispatchWorker() {
856
857 if (checkSending(false) == false)
858 return;
859
860 if (useBurstModeTimer() == true)
861 return;
862
863 startWorkerThread(false);
864 }
865
866 /**
867 * @return true if a burst mode timer was activated
868 */
869 private boolean useBurstModeTimer() {
870 if (collectTime <= 0L) return false;
871
872 // Messages are sent delayed on timeout (burst mode)
873
874 if (log.isLoggable(Level.FINE)) log.fine(ME+": Executing useBurstModeTimer() collectTime=" + collectTime + " dispatchWorkerIsActive=" + dispatchWorkerIsActive);
875 synchronized (this) {
876 if (this.isShutdown) return false;
877 if (this.timerKey == null) {
878 if (log.isLoggable(Level.FINE)) log.fine(ME+": Starting burstMode timer with " + collectTime + " msec");
879 this.timerKey = this.glob.getBurstModeTimer().addTimeoutListener(this, collectTime, null);
880 }
881 }
882 return true;
883 }
884
885 /**
886 * Remove the burst mode timer
887 */
888 private void removeBurstModeTimer() {
889 synchronized (this) {
890 if (this.timerKey != null) {
891 this.glob.getBurstModeTimer().removeTimeoutListener(timerKey);
892 this.timerKey = null;
893 }
894 }
895 }
896
897 /**
898 * @param fromTimeout for logging only
899 */
900 private void startWorkerThread(boolean fromTimeout) {
901 if (this.dispatchWorkerIsActive == false) {
902 synchronized (this) {
903 if (this.isShutdown) {
904 if (log.isLoggable(Level.FINE)) log.fine(ME+": startWorkerThread() failed, we are shutdown: " + toXml(""));
905 return;
906 }
907 if (this.dispatchWorkerIsActive == false) { // send message directly
908 this.dispatchWorkerIsActive = true;
909 this.notifyCounter = 0;
910 try {
911 this.glob.getDispatchWorkerPool().execute(new DispatchWorker(glob, this));
912 }
913 catch (Throwable e) {
914 this.dispatchWorkerIsActive = false;
915 log.severe(ME+": Unexpected error occurred: " + e.toString());
916 e.printStackTrace();
917 }
918 }
919 }
920 return;
921 }
922
923 if (fromTimeout) {
924 if (log.isLoggable(Level.FINE)) log.fine(ME+": Burst mode timeout occurred, last callback worker thread is not finished - we do nothing (the worker thread will give us a kick)");
925 }
926 else {
927 if (log.isLoggable(Level.FINE)) log.fine(ME+": Last callback worker thread is not finished - we do nothing (the worker thread will give us a kick)");
928 }
929 }
930
931 public boolean isDead() {
932 return this.dispatchConnectionsHandler.isDead();
933 }
934
935 public boolean isPolling() {
936 return this.dispatchConnectionsHandler.isPolling();
937 }
938
939 public boolean isAlive() {
940 return this.dispatchConnectionsHandler.isAlive();
941 }
942
943 /**
944 * Can be called when client connection is lost (NOT the callback connection).
945 * Currently only detected by the SOCKET protocol plugin.
946 * Others can only detect lost clients with their callback protocol pings
947 */
948 public void lostClientConnection() {
949 log.warning(ME+": Lost client connection");
950 // If SOCKET: the cb connection is lost as well and we can go to polling mode
951 pingCallbackServer(false, true);
952 }
953
954 public boolean pingCallbackServer(boolean sync, boolean connectionIsDown) {
955 DispatchConnection dispatchConnection = this.dispatchConnectionsHandler.getCurrentDispatchConnection();
956 if (dispatchConnection != null) {
957 dispatchConnection.setConnectionWasDown(connectionIsDown);
958 if (sync) {
959 dispatchConnection.timeout(null); // force a ping
960 }
961 else {
962 // force a ping via another thread
963 this.glob.getPingTimer().addTimeoutListener(dispatchConnection, 0L, null);
964 }
965 return true;
966 }
967 return false;
968 }
969
970 /**
971 * @param isPublisherThread We take care that the publisher thread, coming through putPost()
972 * does never too much work to return fast enough and avoid possible dead locks.
973 * @return true is status is OK and we can try to send a message
974 */
975 private boolean checkSending(boolean isPublisherThread) {
976 if (this.isShutdown) {
977 if (log.isLoggable(Level.FINE)) log.fine(ME+": The dispatcher is shutdown, can't activate callback worker thread" + toXml(""));
978 return false; // assert
979 }
980
981 if (this.isSyncMode) {
982 return false;
983 }
984
985 if (!this.dispatcherActive) {
986 return false;
987 }
988
989 if (msgQueue.isShutdown() && !isPublisherThread) { // assert
990 if (log.isLoggable(Level.FINE)) log.fine(ME+": The queue is shutdown, can't activate callback worker thread.");
991 // e.g. client has disconnected on the mean time.
992 //Thread.currentThread().dumpStack();
993 shutdown();
994 return false;
995 }
996
997 if (this.dispatchConnectionsHandler.isUndef()) {
998 if (log.isLoggable(Level.FINE)) log.fine(ME+": Not connected yet, state is UNDEF");
999 return false;
1000 }
1001
1002 if (this.dispatchConnectionsHandler.isDead() && !isPublisherThread) {
1003 String text = "No recoverable remote connection available, giving up queue " + msgQueue.getStorageId() + ".";
1004 if (log.isLoggable(Level.FINE)) log.fine(ME+": "+text);
1005 givingUpDelivery(new XmlBlasterException(glob,ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, text));
1006 return false;
1007 }
1008
1009 if (msgQueue.getNumOfEntries() == 0L) {
1010 return false;
1011 }
1012
1013 if (this.msgInterceptor != null) {
1014 if (this.msgInterceptor.doActivate(this) == false) {
1015 if (log.isLoggable(Level.FINE)) log.fine(ME+": this.msgInterceptor.doActivate==false");
1016 return false; // A plugin told us to suppress sending the message
1017 }
1018 return true;
1019 }
1020
1021 /*
1022 * The msgInterceptor plugin needs to have a chance to take care of this even in polling mode
1023 */
1024 if (this.dispatchConnectionsHandler.isPolling()) {
1025 if (log.isLoggable(Level.FINE)) log.fine(ME+": Can't send message as connection is lost and we are polling");
1026 return false;
1027 }
1028
1029 return true;
1030 }
1031
1032 /**
1033 * We are notified about the burst mode timeout through this method.
1034 * @param userData You get bounced back your userData which you passed
1035 * with Timeout.addTimeoutListener()
1036 */
1037 public void timeout(Object userData) {
1038 this.timerKey = null;
1039 if (log.isLoggable(Level.FINE)) log.fine(ME+": Burst mode timeout occurred, queue entries=" + msgQueue.getNumOfEntries() + ", starting callback worker thread ...");
1040 startWorkerThread(true);
1041 }
1042
1043
1044 /**
1045 * @return The interceptor plugin if available, otherwise null
1046 */
1047 public I_MsgDispatchInterceptor getMsgDispatchInterceptor() {
1048 return this.msgInterceptor;
1049 }
1050
1051 /**
1052 * Set new callback addresses, typically after a session login/logout
1053 */
1054 public void setAddresses(AddressBase[] addr) throws XmlBlasterException {
1055 this.dispatchConnectionsHandler.initialize(addr);
1056 }
1057
1058 /**
1059 * Switch on/off the sending of messages.
1060 */
1061 private void initDispatcherActive(AddressBase[] addrArr) {
1062 if (addrArr != null) {
1063 for (int ii=0; ii<addrArr.length; ii++) { // TODO: How to handle setting of multiple addresses??
1064 this.dispatcherActive = addrArr[ii].isDispatcherActive();
1065 }
1066 }
1067 }
1068
1069 /**
1070 * The worker notifies us that it is finished, if messages are available
1071 * it is triggered again.
1072 */
1073 public void setDispatchWorkerIsActive(boolean val) {
1074 this.dispatchWorkerIsActive = val;
1075 if (val == false) {
1076 if (this.isShutdown) {
1077 if (log.isLoggable(Level.FINE)) log.fine(ME+": setDispatchWorkerIsActive(" + val + ") failed, we are shutdown: " + toXml(""));
1078 return;
1079 }
1080
1081 if (msgQueue.getNumOfEntries() > 0) {
1082 if (log.isLoggable(Level.FINE)) log.fine(ME+": Finished callback job. Giving a kick to send the remaining " + msgQueue.getNumOfEntries() + " messages.");
1083 try {
1084 activateDispatchWorker();
1085 }
1086 catch(Throwable e) {
1087 log.severe(ME+": "+e.toString()); e.printStackTrace(); // Assure the queue is flushed with another worker
1088 }
1089 }
1090 else {
1091 if (this.trySyncMode && !this.isSyncMode) {
1092 switchToSyncMode();
1093 }
1094 }
1095 }
1096 }
1097
1098 /**
1099 * Called locally and from TopicHandler when internal error (Throwable) occurred to avoid infinite looping
1100 */
1101 public void internalError(Throwable throwable) {
1102 givingUpDelivery((throwable instanceof XmlBlasterException) ? (XmlBlasterException)throwable :
1103 new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "", throwable));
1104 log.severe(ME+": PANIC: Internal error, doing shutdown: " + throwable.getMessage());
1105 shutdown();
1106 }
1107
1108 /**
1109 * @return A container holding some statistical delivery information
1110 */
1111 public DispatchStatistic getDispatchStatistic() {
1112 return this.dispatchConnectionsHandler.getDispatchStatistic();
1113 }
1114
1115 public boolean isShutdown() {
1116 return this.isShutdown;
1117 }
1118
1119 /**
1120 * Stop all callback drivers of this client.
1121 * Possibly invoked twice (givingUpDelivery() calls it indirectly as well)
1122 * We don't shutdown the corresponding queue.
1123 */
1124 public void shutdown() {
1125 if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering shutdown ...");
1126 if (this.isShutdown) return;
1127 synchronized (this) {
1128 if (this.isShutdown) return;
1129 this.isShutdown = true;
1130
1131 this.msgQueue.removePutListener(this);
1132
1133 // remove all ConnectionStatusListeners
1134 this.connectionStatusListeners.clear();
1135
1136 removeBurstModeTimer();
1137
1138 // NOTE: We would need to remove the 'final' qualifier to be able to set to null
1139
1140 if (this.msgInterceptor != null) {
1141 try {
1142 this.msgInterceptor.shutdown(this);
1143 }
1144 catch (XmlBlasterException e) {
1145 log.warning(ME+": Ignoring problems during shutdown of plugin: " + e.getMessage());
1146 }
1147 //this.msgInterceptor = null;
1148 }
1149 if (this.dispatchConnectionsHandler != null) {
1150 this.dispatchConnectionsHandler.shutdown();
1151 //this.dispatchConnectionsHandler = null;
1152 }
1153 //this.msgQueue = null;
1154 //this.failureListener = null;
1155 //this.securityInterceptor = null;
1156
1157 //if (this.dispatchWorkerPool != null) {
1158 // this.dispatchWorkerPool.shutdown(); NO: not here, is the scope and duty of Global
1159 // this.dispatchWorkerPool = null;
1160 //}
1161
1162 if (this.syncDispatchWorker != null)
1163 this.syncDispatchWorker.shutdown();
1164 }
1165 }
1166
1167 /**
1168 * For logging
1169 */
1170 public String getId() {
1171 return this.msgQueue.getStorageId().getId();
1172 }
1173
1174 /**
1175 * Dump state of this object into a XML ASCII string.
1176 * <br>
1177 * @param extraOffset indenting of tags for nice output
1178 * @return internal state as a XML ASCII string
1179 */
1180 public String toXml(String extraOffset) {
1181 StringBuffer sb = new StringBuffer(2000);
1182 if (extraOffset == null) extraOffset = "";
1183 String offset = Constants.OFFSET + extraOffset;
1184
1185 sb.append(offset).append("<DispatchManager id='").append(getId());
1186 if (this.msgQueue != null)
1187 sb.append("' numEntries='").append(this.msgQueue.getNumOfEntries());
1188 sb.append("' isShutdown='").append(this.isShutdown).append("'>");
1189 sb.append(this.dispatchConnectionsHandler.toXml(extraOffset+Constants.INDENT));
1190 sb.append(offset).append(" <dispatchWorkerIsActive>").append(dispatchWorkerIsActive).append("</dispatchWorkerIsActive>");
1191 sb.append(offset).append("</DispatchManager>");
1192
1193 return sb.toString();
1194 }
1195
1196 /**
1197 * Inhibits/activates the delivery of asynchronous dispatches of messages.
1198 * @param dispatcherActive
1199 */
1200 public void setDispatcherActive(boolean dispatcherActive) {
1201 if (log.isLoggable(Level.FINE)) log.fine(ME+": Changed dispatcherActive from " + this.dispatcherActive + " to " + dispatcherActive);
1202 this.dispatcherActive = dispatcherActive;
1203 if (this.dispatcherActive) notifyAboutNewEntry();
1204 }
1205
1206 /**
1207 *
1208 * @return true if the dispacher is currently activated, i.e. if it is
1209 * able to deliver asynchronousy messages from the callback queue.
1210 */
1211 public boolean isDispatcherActive() {
1212 return this.dispatcherActive;
1213 }
1214
1215 public ArrayList filterDistributorEntries(ArrayList entries, Throwable ex) {
1216 return this.dispatchConnectionsHandler.filterDistributorEntries(entries, ex);
1217 }
1218
1219 }
syntax highlighted by Code2HTML, v. 0.9.1