1 /*------------------------------------------------------------------------------
2 Name: StreamingCallback.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6
7 package org.xmlBlaster.client;
8
9 import java.io.ByteArrayInputStream;
10 import java.io.IOException;
11 import java.io.OutputStream;
12 import java.io.PipedInputStream;
13 import java.io.PipedOutputStream;
14 import java.util.ArrayList;
15 import java.util.logging.Level;
16 import java.util.logging.Logger;
17
18 import org.xmlBlaster.client.key.UpdateKey;
19 import org.xmlBlaster.client.qos.ConnectQos;
20 import org.xmlBlaster.client.qos.UpdateQos;
21 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
22 import org.xmlBlaster.jms.XBConnectionMetaData;
23 import org.xmlBlaster.util.Global;
24 import org.xmlBlaster.util.I_Timeout;
25 import org.xmlBlaster.util.MsgUnit;
26 import org.xmlBlaster.util.Timeout;
27 import org.xmlBlaster.util.Timestamp;
28 import org.xmlBlaster.util.XmlBlasterException;
29 import org.xmlBlaster.util.def.Constants;
30 import org.xmlBlaster.util.def.ErrorCode;
31 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
32 import org.xmlBlaster.util.key.MsgKeyData;
33 import org.xmlBlaster.util.qos.ClientProperty;
34 import org.xmlBlaster.util.qos.MsgQosData;
35 import org.xmlBlaster.util.qos.storage.ClientQueueProperty;
36 import org.xmlBlaster.util.queue.I_Queue;
37 import org.xmlBlaster.util.queue.StorageId;
38
39 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
40 import EDU.oswego.cs.dl.util.concurrent.Mutex;
41
42 /**
43 * StreamingCallback
44 * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
45 */
46 public class StreamingCallback implements I_Callback, I_Timeout, I_ConnectionStateListener {
47
48 /**
49 *
50 * Writer needed since the out stream must be written from a thread which does not
51 * die before the thread which reads the in counterpart. For some "strange" reason
52 * the implementation of the Pipe streams makes a check if the thread which has
53 * made the last write operation on the out stream still is valid. If not, a Dead
54 * End IO Exception is thrown when reading.
55 *
56 * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
57 */
58 class Writer extends Thread {
59
60 class WriterData extends Mutex {
61 private OutputStream outStrm;
62 private byte[] data;
63 private Throwable exception;
64
65 public WriterData(OutputStream out, byte[] data) {
66 super();
67 this.outStrm = out;
68 this.data = data;
69 }
70 }
71
72 private LinkedQueue channel;
73
74 public Writer(String name) {
75 super(name);
76 this.channel = new LinkedQueue();
77 setDaemon(true);
78 start();
79 }
80
81 public Writer() {
82 super();
83 this.channel = new LinkedQueue();
84 setDaemon(true);
85 start();
86 }
87
88 public synchronized void write(OutputStream outStream, byte[] buf) throws InterruptedException, XmlBlasterException {
89 WriterData data = new WriterData(outStream, buf);
90 try {
91 data.acquire();
92 this.channel.put(data);
93 data.acquire(); // waits until the other thread is finished
94 if (data.exception != null)
95 throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "write: a throwable occured", "", data.exception);
96 }
97 finally {
98 data.release();
99 }
100 }
101
102 public synchronized void close(OutputStream outStream) throws InterruptedException, XmlBlasterException {
103 WriterData data = new WriterData(outStream, null);
104 try {
105 data.acquire();
106 this.channel.put(data);
107 data.acquire(); // waits until the other thread is finished
108 if (data.exception != null)
109 throw new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "close: a throwable occured", "", data.exception);
110 }
111 finally {
112 data.release();
113 }
114 }
115
116 /**
117 * @see java.lang.Thread#run()
118 */
119 public void run() {
120 while (true) {
121 try {
122 WriterData writerData = (WriterData)this.channel.take();
123 try {
124 if (writerData.outStrm != null) {
125 if (writerData.data != null) {
126
127 int bytesLeft = writerData.data.length;
128 int bytesRead = 0;
129 final int MAX_CHUNK_SIZE = 4096;
130 while (bytesLeft > 0) {
131 int toRead = bytesLeft > MAX_CHUNK_SIZE ? MAX_CHUNK_SIZE : bytesLeft;
132 writerData.outStrm.write(writerData.data, bytesRead, toRead);
133 writerData.outStrm.flush();
134 bytesRead += toRead;
135 bytesLeft -= toRead;
136 }
137 // writerData.out.write(0);
138 // writerData.out.flush();
139
140 // these would block (probably the pipes are not the best in the world
141 // writerData.out.write(writerData.data);
142 // writerData.out.flush();
143 }
144 else
145 writerData.outStrm.close();
146 }
147 }
148 catch (Throwable e) {
149 writerData.exception = e;
150 }
151 finally {
152 writerData.release();
153 }
154 }
155 catch (Throwable e) {
156 if (e.getMessage().indexOf("Pipe closed") < 0) {
157 log.warning("An exception occured when writing to the stream: ' " + e.getMessage());
158 e.printStackTrace();
159 }
160 else if (log.isLoggable(Level.FINE)) {
161 log.fine("The pipe was closed, which resulted in an IO Exception. It can happen when the client has returned before reading the complete message");
162 e.printStackTrace();
163 }
164 }
165 }
166 }
167 }
168
169
170 class ExecutionThread extends Thread {
171
172 private String cbSessionId_;
173 private UpdateKey updateKey_;
174 private byte[] content_;
175 private UpdateQos updateQos_;
176
177 public ExecutionThread(String cbSessId, UpdateKey updKey, byte[] content, UpdateQos updQos) {
178 this.cbSessionId_ = cbSessId;
179 this.updateKey_ = updKey;
180 this.content_ = content;
181 this.updateQos_ = updQos;
182
183 }
184
185 public void run() {
186 try {
187 ret = updateNewMessage(cbSessionId_, updateKey_, content_, updateQos_);
188 clearQueue();
189 }
190 catch (Throwable e) {
191 setException(e);
192 e.printStackTrace();
193 }
194 finally {
195 try {
196 if (in != null)
197 in.close();
198 }
199 catch (IOException e) {
200 e.printStackTrace();
201 }
202 mutex.release();
203 }
204 }
205 };
206
207 private static Logger log = Logger.getLogger(StreamingCallback.class.getName());
208 public final static String ENTRY_CB_SESSION_ID = "__entryCbSessionId";
209
210 private I_StreamingCallback callback;
211
212 private Global global;
213 private PipedOutputStream out;
214 private PipedInputStream in;
215 private XmlBlasterException ex;
216 private String ret;
217 private String cbSessionId;
218 private Writer writer;
219 /** The time to wait in ms until returning when waiting (if zero or negative inifinite) */
220 private long waitForChunksTimeout;
221 // private long waitForClientReturnTimeout;
222 private Timeout timer;
223 private Timestamp timestamp; // the key for the timeout timer (can be null)
224 private I_Queue queue; // optional client side queue
225 private boolean useQueue;
226 private boolean initialized;
227 private boolean lastMessageCompleted = true;
228 private final Mutex mutex;
229
230 private void reset() throws XmlBlasterException {
231 this.out = null;
232 this.in = null;
233 this.ret = null;
234 this.cbSessionId = null;
235 }
236
237 public StreamingCallback(Global global, I_StreamingCallback callback) throws XmlBlasterException {
238 this(global, callback, 0L, 0L, false);
239 }
240
241 /**
242 *
243 * @param callback
244 */
245 public StreamingCallback(Global global, I_StreamingCallback callback, long waitForChunksTimeout, long waitForClientReturnTimeout, boolean useQueue)
246 throws XmlBlasterException {
247 this.callback = callback;
248 this.global = global;
249 this.mutex = new Mutex();
250 String writerName = StreamingCallback.class.getName() + "-writer";
251 synchronized(this.global) {
252 this.writer = (Writer)this.global.getObjectEntry(writerName);
253 if (this.writer == null) {
254 this.writer = new Writer();
255 this.global.addObjectEntry(writerName, this.writer);
256 }
257 }
258 this.waitForChunksTimeout = waitForChunksTimeout;
259 // this.waitForClientReturnTimeout = waitForClientReturnTimeout;
260 if (this.waitForChunksTimeout > 0L) {
261 String timerName = StreamingCallback.class.getName() + "-timer";
262 synchronized(this.global) {
263 this.timer = (Timeout)this.global.getObjectEntry(timerName);
264 if (this.timer == null) {
265 this.timer = new Timeout(timerName);
266 this.global.addObjectEntry(timerName, this.timer);
267 }
268 }
269 }
270 this.useQueue = useQueue;
271 // TODO latch until connected to avoit early updates
272 }
273
274 /**
275 *
276 * @return the number of delivered entries from local client update queue.
277 */
278 public final int sendInitialQueueEntries() throws XmlBlasterException {
279 if (this.queue == null)
280 return 0;
281 ArrayList list = this.queue.peek(-1, -1L);
282 for (int i=0; i < list.size(); i++) {
283 MsgQueuePublishEntry entry = (MsgQueuePublishEntry)list.get(i);
284 MsgKeyData key = entry.getMsgKeyData();
285 MsgQosData qos =(MsgQosData)entry.getMsgUnit().getQosData();
286 byte[] cont = entry.getMsgUnit().getContent();
287 String entryCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null);
288 qos.getClientProperties().remove(ENTRY_CB_SESSION_ID);
289 final boolean isExternal = false; // we don't want to store these entries since already here
290 updateInternal(entryCbSessionId, new UpdateKey(key), cont, new UpdateQos(this.global, qos), isExternal);
291 }
292 this.queue.clear();
293 return list.size();
294 }
295
296 private final void storeEntry(String cbSessId, UpdateKey key, byte[] cont, UpdateQos qos) throws XmlBlasterException {
297 if (this.queue == null)
298 return;
299 final boolean ignorePutInterceptor = false;
300 if (cbSessId != null) {
301 String oldCbSessionId = qos.getClientProperty(ENTRY_CB_SESSION_ID, (String)null);
302 if (oldCbSessionId != null && !oldCbSessionId.equals(cbSessId)) {
303 log.warning("the client property '" + ENTRY_CB_SESSION_ID + "' is a reserved word, we will overwrite its value='" + oldCbSessionId + "' to be '" + cbSessionId + "'");
304 ClientProperty prop = new ClientProperty(ENTRY_CB_SESSION_ID, null, null, cbSessId);
305 qos.getClientProperties().put(prop.getName(), prop);
306 }
307 }
308 MsgUnit msgUnit = new MsgUnit(key.getData(), cont, qos.getData());
309 MsgQueuePublishEntry entry = new MsgQueuePublishEntry(this.global, msgUnit, this.queue.getStorageId());
310 this.queue.put(entry, ignorePutInterceptor);
311 }
312
313 /**
314 * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
315 */
316 public String updateStraight(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException {
317 log.fine("cbSessionId='" + cbSessId + "'");
318 ByteArrayInputStream bais = new ByteArrayInputStream(cont);
319 return this.callback.update(cbSessId, updKey, bais, updQos);
320 }
321
322 /**
323 * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
324 */
325 public String updateNewMessage(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException, IOException {
326 log.fine("cbSessionId='" + cbSessId + "'");
327 return this.callback.update(cbSessId, updKey, in, updQos);
328 }
329
330 private final boolean isFirstChunk(UpdateQos qos) {
331 int seq = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_SEQ, log), 0);
332 return seq == 0;
333 }
334
335 private final boolean isLastChunk(UpdateQos qos) {
336 boolean hasGroupId = qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_ID, log), (String)null) != null;
337 if (!hasGroupId)
338 return true;
339 return qos.getClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_GROUP_EOF, log), false);
340 }
341
342 private final ClientProperty getProp(String key, UpdateQos qos) {
343 return qos.getClientProperty(Constants.addJmsPrefix(key, log));
344 }
345
346 /**
347 * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
348 */
349 public String update(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos) throws XmlBlasterException {
350 boolean sendInitial = this.queue != null && this.lastMessageCompleted && this.queue.getNumOfEntries() > 0;
351 if (sendInitial)
352 sendInitialQueueEntries();
353
354 final boolean isExternal = true;
355 log.fine("cbSessionId='" + cbSessId + "'");
356 return updateInternal(cbSessId, updKey, cont, updQos, isExternal);
357 }
358
359 /**
360 * @see org.xmlBlaster.client.I_Callback#update(java.lang.String, org.xmlBlaster.client.key.UpdateKey, byte[], org.xmlBlaster.client.qos.UpdateQos)
361 */
362 private final String updateInternal(String cbSessId, UpdateKey updKey, byte[] cont, UpdateQos updQos, boolean isExternal) throws XmlBlasterException {
363 this.lastMessageCompleted = false;
364 boolean doStore = isExternal;
365 boolean isLastChunk = false;
366 try {
367 log.fine("entering with cbSessionId='" + cbSessId + "'");
368 if (this.timer != null && this.timestamp != null) { // no need to be threadsafe since update is single thread
369 this.timer.removeTimeoutListener(this.timestamp);
370 this.timestamp = null;
371 }
372 ClientProperty exProp = getProp(XBConnectionMetaData.JMSX_GROUP_EX, updQos);
373 // TODO Check if this exception really should be thrown: I think it shall not be thrown since it is an exception
374 // which occured when publishing and this is the information that the update should return
375 if (exProp != null)
376 throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_INTERNALERROR, "update", "An exception occured on a chunk when updating. " + updQos.toXml());
377 isLastChunk = isLastChunk(updQos);
378
379 synchronized(this) {
380 consumeExceptionIfNotNull();
381 if (this.ret != null) {
382 clearQueue();
383 return ret;
384 }
385 }
386
387 if (isLastChunk) { // no need to store the last message since sync return
388 if (isFirstChunk(updQos)) {
389 // TODO a sync to wait until cleared (the updateStraight after the sync, not inside).
390 try {
391 return updateStraight(cbSessId, updKey, cont, updQos);
392 }
393 catch (IOException e) {
394 throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: exception occured.", e);
395 }
396
397 }
398
399 try {
400 if (cont != null && cont.length > 0) {
401 this.writer.write(this.out, cont);
402 }
403
404 this.writer.close(this.out);
405 // wait until the client has returned his method.
406 try {
407 mutex.acquire();
408 consumeExceptionIfNotNull();
409 clearQueue();
410 return this.ret;
411 }
412 finally {
413 mutex.release();
414 }
415 }
416 catch (InterruptedException e) {
417 throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
418 }
419 finally {
420 reset();
421 }
422 }
423 else { // it is not the last message
424 if (this.timer != null)
425 this.timestamp = this.timer.addTimeoutListener(this, this.waitForChunksTimeout, null);
426 try {
427 if (isFirstChunk(updQos)) {
428 this.mutex.acquire();
429 this.cbSessionId = cbSessId;
430 this.out = new PipedOutputStream();
431 this.in = new PipedInputStream(this.out);
432 ExecutionThread thread = new ExecutionThread(cbSessId, updKey, cont, updQos);
433 thread.start();
434 }
435 else { // check if the message is complete
436 /*
437 if (this.oldGroupId == null) {
438 try {
439 mutex.acquire();
440 throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update: The message is not the first of a group but the previous one was already completed.");
441 }
442 finally {
443 mutex.release();
444 }
445 }
446 */
447 }
448 this.writer.write(this.out, cont);
449 }
450 catch (InterruptedException e) {
451 throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
452 }
453 catch (IOException e) {
454 throw new XmlBlasterException(this.global, ErrorCode.INTERNAL, "StreamingCallback", "update", e);
455 }
456 if (doStore)
457 storeEntry(cbSessId, updKey, cont, updQos);
458 // and return a fake positive response.
459 return Constants.RET_OK;
460 }
461
462 }
463 catch (XmlBlasterException e) {
464 try {
465 this.writer.close(this.out);
466 }
467 catch (InterruptedException e1) {
468 e1.printStackTrace();
469 }
470 this.lastMessageCompleted = true;
471 throw e;
472 }
473 catch (Throwable e) {
474 e.printStackTrace();
475 throw new XmlBlasterException(this.global, ErrorCode.USER_UPDATE_HOLDBACK, "throwable in updateInternal", "", e);
476 }
477 finally {
478 if (isLastChunk) {
479 this.lastMessageCompleted = true;
480 }
481 log.fine("Leaving method");
482 }
483 }
484
485 /**
486 * It is used here to inform the user update method that a timeout occured, it will throw
487 * an IOException when reading the in stream of the update method.
488 * @see org.xmlBlaster.util.I_Timeout#timeout(java.lang.Object)
489 */
490 public void timeout(Object userData) {
491 try {
492 this.writer.close(this.out);
493 }
494 catch (Throwable e) {
495 // we can not make it threadsafe so we must protect against possible NPE Exceptions
496 e.printStackTrace();
497 }
498
499 }
500
501 private final void clearQueue() {
502 if (queue != null) {
503 log.fine("Clear the queue " + this.queue.getStorageId());
504 queue.clear();
505 }
506 }
507
508 /**
509 * Always makes a USER_UPDATE_HOLDBACK Exception out of it, no matter what the original exception
510 * was.
511 * @param ex
512 */
513 private synchronized void setException(Throwable ex) {
514 if (ex instanceof XmlBlasterException) {
515 XmlBlasterException tmp = (XmlBlasterException)ex;
516 if (tmp.getErrorCode().equals(ErrorCode.USER_UPDATE_HOLDBACK))
517 this.ex = tmp;
518 else
519 this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex);
520 }
521 else {
522 this.ex = new XmlBlasterException(global, ErrorCode.USER_UPDATE_HOLDBACK, "StreamingCallback", "update: exception occured.", ex);
523 }
524 }
525
526 /**
527 * returns the exception (if any) and resets it.
528 * @return
529 */
530 private synchronized void consumeExceptionIfNotNull() throws XmlBlasterException {
531 XmlBlasterException e = this.ex;
532 if (e != null) {
533 this.ex = null;
534 throw e;
535 }
536 }
537
538 // implementation of interface I_ConnectionStateListener
539
540 /**
541 * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedAlive(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
542 */
543 public synchronized void reachedAlive(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
544 log.fine("I am alive now");
545 // only used on first connect after it is ignored.
546 if (this.initialized)
547 return;
548
549 if (this.useQueue) {
550 log.info("going to instance the queue");
551 ConnectQos connectQos = connection.getConnectQos();
552 ClientQueueProperty prop = connectQos.getClientQueueProperty();
553 StorageId storageId = ((XmlBlasterAccess) connection).createStorageId(Constants.RELATING_CLIENT_UPDATE);
554 try {
555 this.queue = this.global.getQueuePluginManager().getPlugin(prop.getType(), prop.getVersion(), storageId,
556 connectQos.getClientQueueProperty());
557 if (((XmlBlasterAccess)connection).isCallbackDispatcherActive())
558 sendInitialQueueEntries();
559 }
560 catch (XmlBlasterException e) {
561 log.severe("An exception occured when trying to initialize the callback client queue: " + e.getMessage());
562 e.printStackTrace();
563 }
564 }
565
566 this.initialized = true;
567 }
568
569 /* (non-Javadoc)
570 * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedDead(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
571 */
572 public void reachedDead(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
573 // TODO Auto-generated method stub
574
575 }
576
577 /* (non-Javadoc)
578 * @see org.xmlBlaster.client.I_ConnectionStateListener#reachedPolling(org.xmlBlaster.util.dispatch.ConnectionStateEnum, org.xmlBlaster.client.I_XmlBlasterAccess)
579 */
580 public void reachedPolling(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
581 // TODO Auto-generated method stub
582
583 }
584
585 }
syntax highlighted by Code2HTML, v. 0.9.1