1 /*------------------------------------------------------------------------------
2 Name: StreamingCallback.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6
7 package org.xmlBlaster.client;
8
9 import java.io.ByteArrayInputStream;
10 import java.io.IOException;
11 import java.io.OutputStream;
12 import java.io.PipedInputStream;
13 import java.io.PipedOutputStream;
14 import java.util.List;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.locks.ReentrantLock;
17 import java.util.logging.Level;
18 import java.util.logging.Logger;
19
20 import org.xmlBlaster.client.key.UpdateKey;
21 import org.xmlBlaster.client.qos.ConnectQos;
22 import org.xmlBlaster.client.qos.UpdateQos;
23 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
24 import org.xmlBlaster.jms.XBConnectionMetaData;
25 import org.xmlBlaster.util.Global;
26 import org.xmlBlaster.util.I_Timeout;
27 import org.xmlBlaster.util.MsgUnit;
28 import org.xmlBlaster.util.Timeout;
29 import org.xmlBlaster.util.Timestamp;
30 import org.xmlBlaster.util.XmlBlasterException;
31 import org.xmlBlaster.util.def.Constants;
32 import org.xmlBlaster.util.def.ErrorCode;
33 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
34 import org.xmlBlaster.util.key.MsgKeyData;
35 import org.xmlBlaster.util.qos.ClientProperty;
36 import org.xmlBlaster.util.qos.MsgQosData;
37 import org.xmlBlaster.util.qos.storage.ClientQueueProperty;
38 import org.xmlBlaster.util.queue.I_Entry;
39 import org.xmlBlaster.util.queue.I_Queue;
40 import org.xmlBlaster.util.queue.StorageId;
41
42 //import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
43 //import EDU.oswego.cs.dl.util.concurrent.Mutex;
44
45 /**
46 * StreamingCallback
47 * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
48 */
49 public class StreamingCallback implements I_Callback, I_Timeout, I_ConnectionStateListener {
50
51 /**
52 *
53 * Writer needed since the out stream must be written from a thread which does not
54 * die before the thread which reads the in counterpart. For some "strange" reason
55 * the implementation of the Pipe streams makes a check if the thread which has
56 * made the last write operation on the out stream still is valid. If not, a Dead
57 * End IO Exception is thrown when reading.
58 *
59 * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
60 */
61 class Writer extends Thread {
62
63 class WriterData extends ReentrantLock {
64 private static final long serialVersionUID = -8014185442026247255L;
65 private OutputStream outStrm;
66 private byte[] data;
67 private Throwable exception;
68
69 public WriterData(OutputStream out, byte[] data) {
70 super();
71 this.outStrm = out;
72 this.data = data;
73 }
74 }
75
76 private LinkedBlockingQueue<WriterData> channel;
77
78 public Writer(String name) {
79 super(name);
80 this.channel = new LinkedBlockingQueue<WriterData>();
81 setDaemon(true);
82 start();
83 }
84
85 public Writer() {
86 super();
87 this.channel = new LinkedBlockingQueue<WriterData>();
88 setDaemon(true);
89 start();
90 }
91
92 public synchronized void write(OutputStream outStream, byte[] buf) throws InterruptedException, XmlBlasterException {
93 WriterData data = new WriterData(outStream, buf);
94 try {
95 data.lockInterruptibly();
96 this.channel.put(data);
97 data.lockInterruptibly(); // waits until the other thread is finished
98 if (data.exception != null)
99 throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "write: a throwable occured", "", data.exception);
100 }
101 finally {
102 data.unlock();
103 }
104 }
105
106 public synchronized void close(OutputStream outStream) throws InterruptedException, XmlBlasterException {
107 WriterData data = new WriterData(outStream, null);
108 try {
109 data.lockInterruptibly();
110 this.channel.put(data);
111 data.lockInterruptibly(); // waits until the other thread is finished
112 if (data.exception != null)
113 throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "close: a throwable occured", "", data.exception);
114 }
115 finally {
116 data.unlock();
117 }
118 }
119
120 /**
121 * @see java.lang.Thread#run()
122 */
123 public void run() {
124 while (true) {
125 try {
126 WriterData writerData = (WriterData)this.channel.take();
127 try {
128 if (writerData.outStrm != null) {
129 if (writerData.data != null) {
130
131 int bytesLeft = writerData.data.length;
132 int bytesRead = 0;
133 final int MAX_CHUNK_SIZE = 4096;
134 while (bytesLeft > 0) {
135 int toRead = bytesLeft > MAX_CHUNK_SIZE ? MAX_CHUNK_SIZE : bytesLeft;
136 writerData.outStrm.write(writerData.data, bytesRead, toRead);
137 writerData.outStrm.flush();
138 bytesRead += toRead;
139 bytesLeft -= toRead;
140 }
141 // writerData.out.write(0);
142 // writerData.out.flush();
143
144 // these would block (probably the pipes are not the best in the world
145 // writerData.out.write(writerData.data);
146 // writerData.out.flush();
147 }
148 else
149 writerData.outStrm.close();
150 }
151 }
152 catch (Throwable e) {
153 writerData.exception = e;
154 }
155 finally {
156 writerData.unlock();
157 }
158 }
159 catch (Throwable e) {
160 if (e.getMessage().indexOf("Pipe closed") < 0) {
161 log.warning("An exception occured when writing to the stream: ' " + e.getMessage());
162 e.printStackTrace();
163 }
164 else if (log.isLoggable(Level.FINE)) {
165 log.fine("The pipe was closed, which resulted in an IO Exception. It can happen when the client has returned before reading the complete message");
166 e.printStackTrace();
167 }
168 }
169 }
170 }
171 }
172
173
174 class ExecutionThread extends Thread {
175
176 private String cbSessionId_;
177 private UpdateKey updateKey_;
178 private byte[] content_;
179 private UpdateQos updateQos_;
180
181 public ExecutionThread(String cbSessId, UpdateKey updKey, byte[] content, UpdateQos updQos) {
182 this.cbSessionId_ = cbSessId;
183 this.updateKey_ = updKey;
184 this.content_ = content;
185 this.updateQos_ = updQos;
186
187 }
188
189 public void run() {
190 try {
191 ret = updateNewMessage(cbSessionId_, updateKey_, content_, updateQos_);
192 clearQueue();
193 }
194 catch (Throwable e) {
195 setException(e);
196 e.printStackTrace();
197 }
198 finally {
199 try {
200 if (in != null)
201 in.close();
202 }
203 catch (IOException e) {
204 e.printStackTrace();
205 }
206 mutex.unlock();
207 }
208 }
209 };
210
211 private static Logger log = Logger.getLogger(StreamingCallback.class.getName());
212 public final static String ENTRY_CB_SESSION_ID = "__entryCbSessionId";
213
214 private I_StreamingCallback callback;
215
216 private Global global;
217 private PipedOutputStream out;
218 private PipedInputStream in;
219 private XmlBlasterException ex;
220 private String ret;
221 private String cbSessionId;
222 private Writer writer;
223 /** The time to wait in ms until returning when waiting (if zero or negative inifinite) */
224 private long waitForChunksTimeout;
225 // private long waitForClientReturnTimeout;
226 private Timeout timer;
227 private Timestamp timestamp; // the key for the timeout timer (can be null)
228 private I_Queue queue; // optional client side queue
229 private boolean useQueue;
230 private boolean initialized;
231 private boolean lastMessageCompleted = true;
232 private final ReentrantLock mutex;
233
234 private void reset() throws XmlBlasterException {
235 this.out = null;
236 this.in = null;
237 this.ret = null;
238 this.cbSessionId = null;
239 }
240
241 public StreamingCallback(Global global, I_StreamingCallback callback) throws XmlBlasterException {
242 this(global, callback, 0L, 0L, false);
243 }
244
245 /**
246 *
247 * @param callback
248 */
249 public StreamingCallback(Global global, I_StreamingCallback callback, long waitForChunksTimeout, long waitForClientReturnTimeout, boolean useQueue)
250 throws XmlBlasterException {
251 this.callback = callback;
252 this.global = global;
253 this.mutex = new ReentrantLock();
254 String writerName = StreamingCallback.class.getName() + "-writer";
255 synchronized(this.global) {
256 this.writer = (Writer)this.global.getObjectEntry(writerName);
257 if (this.writer == null) {
258 this.writer = new Writer();
259 this.global.addObjectEntry(writerName, this.writer);
260 }
261 }
262 this.waitForChunksTimeout = waitForChunksTimeout;
263 // this.waitForClientReturnTimeout = waitForClientReturnTimeout;
264 if (this.waitForChunksTimeout > 0L) {
265 String timerName = StreamingCallback.class.getName() + "-timer";
266 synchronized(this.global) {
267 this.timer = (Timeout)this.global.getObjectEntry(timerName);
268 if (this.timer == null) {
269 this.timer = new Timeout(timerName);
270 this.global.addObjectEntry(timerName, this.timer);
271 }
272 }
273 }
274 this.useQueue = useQueue;
275 // TODO latch until connected to avoit early updates
276 }
277
278 /**
279 *
280 * @return the number of delivered entries from local client update queue.
281 */
282 public final int sendInitialQueueEntries() throws XmlBlasterException {
283 if (this.queue == null)
284 return 0;
285 List<I_Entry> list = this.queue.peek(-1, -1L);
286 for (int i=0; i < list.size(); i++) {
287 MsgQueuePublishEntry entry = (MsgQueuePublishEntry)list.get(i);
288 MsgKeyData key = entry.getMsgKeyData();
289 MsgQosData qos =(MsgQosData)entry.getMsgUnit().getQosData();
290 byte[] cont = entry.getMsgUnit().getContent();
291 String entryCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null);
292 qos.getClientProperties().remove(ENTRY_CB_SESSION_ID);
293 final boolean isExternal = false; // we don't want to store these entries since already here
294 updateInternal(entryCbSessionId, new UpdateKey(key), cont, new UpdateQos(this.global, qos), isExternal);
295 }
296 this.queue.clear();
297 return list.size();
298 }
299
300 private final void storeEntry(String cbSessId, UpdateKey key, byte[] cont, UpdateQos qos) throws XmlBlasterException {
301 if (this.queue == null)
302 return;
303 final boolean ignorePutInterceptor = false;
304 if (cbSessId != null) {
305 String oldCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null);
306 if (oldCbSessionId != null && !oldCbSessionId.equals(cbSessId)) {
307 log.warning("the client property '" + ENTRY_CB_SESSION_ID + "' is a reserved word, we will overwrite its value='" + oldCbSessionId + "' to be '" + cbSessionId + "'");
308 ClientProperty prop = new ClientProperty(ENTRY_CB_SESSION_ID, null, null, cbSessId);
309 qos.getClientProperties().put(prop.getName(), prop);
310 }
311 }
312 MsgUnit msgUnit = new MsgUnit(key.getData(), cont, qos.getData());
313 MsgQueuePublishEntry entry = new MsgQueuePublishEntry(this.global, msgUnit, this.queue.getStorageId());
314 this.queue.put(entry, ignorePutInterceptor);
315 }
316
317 /**
318 * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
319 */
320 public String updateStraight(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException {
321 log.fine("cbSessionId='" + cbSessId + "'");
322 ByteArrayInputStream bais = new ByteArrayInputStream(cont);
323 return this.callback.update(cbSessId, updKey, bais, updQos);
324 }
325
326 /**
327 * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
328 */
329 public String updateNewMessage(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException {
330 log.fine("cbSessionId='" + cbSessId + "'");
331 return this.callback.update(cbSessId, updKey, in, updQos);
332 }
333
334 private final boolean isFirstChunk(UpdateQos qos) {
335 int seq = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_SEQ, log), 0);
336 return seq == 0;
337 }
338
339 private final boolean isLastChunk(UpdateQos qos) {
340 boolean hasGroupId = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_ID, log), (String)null) != null;
341 if (!hasGroupId)
342 return true;
343 return qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_EOF, log), false);
344 }
345
346 private final ClientProperty getProp(String key, UpdateQos qos) {
347 return qos.getClientProperty(Constants.addJmsPrefix(key, log));
348 }
349
350 /**
351 * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
352 */
353 public String update(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException {
354 boolean sendInitial = this.queue != null && this.lastMessageCompleted && this.queue.getNumOfEntries() > 0;
355 if (sendInitial)
356 sendInitialQueueEntries();
357
358 final boolean isExternal = true;
359 log.fine("cbSessionId='" + cbSessId + "'");
360 return updateInternal(cbSessId, updKey, cont, updQos, isExternal);
361 }
362
363 /**
364 * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
365 */
366 private final String updateInternal(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos, boolean isExternal) throws XmlBlasterException {
367 this.lastMessageCompleted = false;
368 boolean doStore = isExternal;
369 boolean isLastChunk = false;
370 try {
371 log.fine("entering with cbSessionId='" + cbSessId + "'");
372 if (this.timer != null && this.timestamp != null) { // no need to be threadsafe since update is single thread
373 this.timer.removeTimeoutListener(this.timestamp);
374 this.timestamp = null;
375 }
376 ClientProperty exProp = getProp(XBConnectionMetaData.JMSX_GROUP_EX, updQos);
377 // TODO Check if this exception really should be thrown: I think it shall not be thrown since it is an exception
378 // which occured when publishing and this is the information that the update should return
379 if (exProp != null)
380 throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_INTERNALERROR, "update", "An exception occured on a chunk when updating. " + updQos.toXml());
381 isLastChunk = isLastChunk(updQos);
382
383 synchronized(this) {
384 consumeExceptionIfNotNull();
385 if (this.ret != null) {
386 clearQueue();
387 return ret;
388 }
389 }
390
391 if (isLastChunk) { // no need to store the last message since sync return
392 if (isFirstChunk(updQos)) {
393 // TODO a sync to wait until cleared (the updateStraight after the sync, not inside).
394 try {
395 return updateStraight(cbSessId, updKey, cont, updQos);
396 }
397 catch (IOException e) {
398 throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: exception occured.", e);
399 }
400
401 }
402
403 try {
404 if (cont != null && cont.length > 0) {
405 this.writer.write(this.out, cont);
406 }
407
408 this.writer.close(this.out);
409 // wait until the client has returned his method.
410 try {
411 mutex.lockInterruptibly();
412 consumeExceptionIfNotNull();
413 clearQueue();
414 return this.ret;
415 }
416 finally {
417 mutex.unlock();
418 }
419 }
420 catch (InterruptedException e) {
421 throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
422 }
423 finally {
424 reset();
425 }
426 }
427 else { // it is not the last message
428 if (this.timer != null)
429 this.timestamp = this.timer.addTimeoutListener(this, this.waitForChunksTimeout, null);
430 try {
431 if (isFirstChunk(updQos)) {
432 this.mutex.lockInterruptibly();
433 this.cbSessionId = cbSessId;
434 this.out = new PipedOutputStream();
435 this.in = new PipedInputStream(this.out);
436 ExecutionThread thread = new ExecutionThread(cbSessId, updKey, cont, updQos);
437 thread.start();
438 }
439 else { // check if the message is complete
440 /*
441 if (this.oldGroupId == null) {
442 try {
443 mutex.lockInterruptibly();
444 throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: The message is not the first of a group but the previous one was already completed.");
445 }
446 finally {
447 mutex.unlock();
448 }
449 }
450 */
451 }
452 this.writer.write(this.out, cont);
453 }
454 catch (InterruptedException e) {
455 throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
456 }
457 catch (IOException e) {
458 throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
459 }
460 if (doStore)
461 storeEntry(cbSessId, updKey, cont, updQos);
462 // and return a fake positive response.
463 return Constants.RET_OK;
464 }
465
466 }
467 catch (XmlBlasterException e) {
468 try {
469 this.writer.close(this.out);
470 }
471 catch (InterruptedException e1) {
472 e1.printStackTrace();
473 }
474 this.lastMessageCompleted = true;
475 throw e;
476 }
477 catch (Throwable e) {
478 e.printStackTrace();
479 throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_HOLDBACK, "throwable in updateInternal", "", e);
480 }
481 finally {
482 if (isLastChunk) {
483 this.lastMessageCompleted = true;
484 }
485 log.fine("Leaving method");
486 }
487 }
488
489 /**
490 * It is used here to inform the user update method that a timeout occured, it will throw
491 * an IOException when reading the in stream of the update method.
492 * @see org.xmlBlaster.util.I_Timeout#timeout(java.lang.Object)
493 */
494 public void timeout(Object userData) {
495 try {
496 this.writer.close(this.out);
497 }
498 catch (Throwable e) {
499 // we can not make it threadsafe so we must protect against possible NPE Exceptions
500 e.printStackTrace();
501 }
502
503 }
504
505 private final void clearQueue() {
506 if (queue != null) {
507 log.fine("Clear the queue " + this.queue.getStorageId());
508 queue.clear();
509 }
510 }
511
512 /**
513 * Always makes a USER_UPDATE_HOLDBACK Exception out of it, no matter what the original exception
514 * was.
515 * @param ex
516 */
517 private synchronized void setException(Throwable ex) {
518 if (ex instanceof XmlBlasterException) {
519 XmlBlasterException tmp = (XmlBlasterException)ex;
520 if (tmp.getErrorCode().equals(ErrorCode.USER_UPDATE_HOLDBACK))
521 this.ex = tmp;
522 else
523 this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex);
524 }
525 else {
526 this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex);
527 }
528 }
529
530 /**
531 * returns the exception (if any) and resets it.
532 * @return
533 */
534 private synchronized void consumeExceptionIfNotNull() throws XmlBlasterException {
535 XmlBlasterException e = this.ex;
536 if (e != null) {
537 this.ex = null;
538 throw e;
539 }
540 }
541
542 // implementation of interface I_ConnectionStateListener
543
544 /**
545 * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedAlive(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
546 */
547 public synchronized void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
548 log.fine("I am alive now");
549 // only used on first connect after it is ignored.
550 if (this.initialized)
551 return;
552
553 if (this.useQueue) {
554 log.info("going to instance the queue");
555 ConnectQos connectQos = connection.getConnectQos();
556 ClientQueueProperty prop = connectQos.getClientQueueProperty();
557 StorageId storageId = ((XmlBlasterAccess) connection).createStorageId(Constants.RELATING_CLIENT_UPDATE);
558 try {
559 this.queue = this.global.getQueuePluginManager().getPlugin(prop.getType(), prop.getVersion(), storageId,
560 connectQos.getClientQueueProperty());
561 if (((XmlBlasterAccess)connection).isCallbackDispatcherActive())
562 sendInitialQueueEntries();
563 }
564 catch (XmlBlasterException e) {
565 log.severe("An exception occured when trying to initialize the callback client queue: " + e.getMessage());
566 e.printStackTrace();
567 }
568 }
569
570 this.initialized = true;
571 }
572
573 /**
574 * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedAlive(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
575 */
576 public synchronized void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
577 log.fine("I am alive and sync now");
578 }
579
580 /* (non-Javadoc)
581 * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedDead(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
582 */
583 public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
584 // TODO Auto-generated method stub
585
586 }
587
588 /* (non-Javadoc)
589 * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedPolling(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
590 */
591 public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
592 // TODO Auto-generated method stub
593
594 }
595
596 }
syntax highlighted by Code2HTML, v. 0.9.1