1 /*------------------------------------------------------------------------------
  2 Name:      ClientEntryFactory.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 Comment:   Implementation for the I_EntryFactory
  6 ------------------------------------------------------------------------------*/
  7 package org.xmlBlaster.client.queuemsg;
  8 
  9 import java.io.ByteArrayOutputStream;
 10 import java.io.IOException;
 11 import java.io.InputStream;
 12 import java.io.ObjectInputStream;
 13 import java.io.ObjectOutputStream;
 14 import java.util.logging.Level;
 15 import java.util.logging.Logger;
 16 
 17 import org.xmlBlaster.client.key.EraseKey;
 18 import org.xmlBlaster.client.key.UnSubscribeKey;
 19 import org.xmlBlaster.client.qos.DisconnectQos;
 20 import org.xmlBlaster.client.qos.EraseQos;
 21 import org.xmlBlaster.client.qos.UnSubscribeQos;
 22 import org.xmlBlaster.util.Global;
 23 import org.xmlBlaster.util.MsgUnit;
 24 import org.xmlBlaster.util.Timestamp;
 25 import org.xmlBlaster.util.XmlBlasterException;
 26 import org.xmlBlaster.util.def.ErrorCode;
 27 import org.xmlBlaster.util.def.MethodName;
 28 import org.xmlBlaster.util.def.PriorityEnum;
 29 import org.xmlBlaster.util.key.MsgKeyData;
 30 import org.xmlBlaster.util.qos.ConnectQosData;
 31 import org.xmlBlaster.util.qos.MsgQosData;
 32 import org.xmlBlaster.util.queue.I_Entry;
 33 import org.xmlBlaster.util.queue.I_EntryFactory;
 34 import org.xmlBlaster.util.queue.StorageId;
 35 import org.xmlBlaster.util.queue.jdbc.XBMeat;
 36 import org.xmlBlaster.util.queue.jdbc.XBRef;
 37 import org.xmlBlaster.util.queue.jdbc.XBStore;
 38 import org.xmlBlaster.util.queuemsg.DummyEntry;
 39 
 40 
 41 /**
 42  * The implementation of the interface which can be used to convert an object
 43  * which implements the interface I_Entry to an Object and back. This is
 44  * useful for example if you want to store such entries in persistent storage
 45  * like a database or a file system. It might however be used even for other
 46  * purposes.
 47  * @author michele@laghi.eu
 48  * @author xmlBlaster@marcelruff.info
 49  */
 50 public class ClientEntryFactory implements I_EntryFactory
 51 {
 52    private final static String ME = "ClientEntryFactory";
 53    private Global glob = null;
 54    private static Logger log = Logger.getLogger(ClientEntryFactory.class.getName());
 55 
 56    /**
 57     * Parses the specified entry to a byte array (serializing).
 58     */
 59    public byte[] toBlob(I_Entry entry) throws XmlBlasterException {
 60       try {
 61          Object obj = entry.getEmbeddedObject();
 62          ByteArrayOutputStream baos = new ByteArrayOutputStream();
 63          ObjectOutputStream objStream = new ObjectOutputStream(baos);
 64          objStream.writeObject(obj);
 65          return baos.toByteArray();
 66       }
 67       catch (IOException ex) {
 68          log.severe("toBlob: " + ex.getMessage());
 69          throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "toBlob()", ex);
 70       }
 71    }
 72 
 73    /**
 74     * Parses back the raw data to a I_Entry (deserializing)
 75     * @param type see ENTRY_TYPE_MSG etc.
 76     */
 77    public I_Entry createEntry(int priority, long timestamp, String type,
 78                   boolean persistent, long sizeInBytes, InputStream is, StorageId storageId)
 79       throws XmlBlasterException {
 80 
 81       MethodName methodName = MethodName.toMethodName(type);
 82 
 83       try {
 84          ObjectInputStream objStream = new ObjectInputStream(is);
 85          Object[] obj = (Object[])objStream.readObject();
 86 
 87          if (methodName == MethodName.PUBLISH_ONEWAY || methodName == MethodName.PUBLISH) {
 88             if (obj.length != 3) {
 89                throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
 90                   "Expected 3 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
 91             }
 92             String qos = (String)obj[0];
 93             String key = (String)obj[1];
 94             byte[] content = (byte[])obj[2];
 95             MsgQosData msgQosData = glob.getMsgQosFactory().readObject(qos);
 96             MsgKeyData msgKeyData = glob.getMsgKeyFactory().readObject(key);
 97             MsgUnit msgUnit = new MsgUnit(msgKeyData, content, msgQosData);
 98             return new MsgQueuePublishEntry(glob, methodName, PriorityEnum.toPriorityEnum(priority), storageId,
 99                                             new Timestamp(timestamp), sizeInBytes, msgUnit);
100          }
101          else if (methodName == MethodName.SUBSCRIBE) {
102             if (obj.length != 2) {
103                throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
104                   "Expected 2 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
105             }
106             String qos = (String)obj[0];
107             String key = (String)obj[1];
108             return new MsgQueueSubscribeEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
109                        new Timestamp(timestamp), sizeInBytes,
110                        glob.getQueryKeyFactory().readObject(key),
111                        glob.getQueryQosFactory().readObject(qos));
112 
113          }
114          else if (methodName == MethodName.UNSUBSCRIBE) {
115             if (obj.length != 2) {
116                throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
117                   "Expected 2 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
118             }
119             String qos = (String)obj[0];
120             String key = (String)obj[1];
121             return new MsgQueueUnSubscribeEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
122                        new Timestamp(timestamp), sizeInBytes,
123                        new UnSubscribeKey(glob, glob.getQueryKeyFactory().readObject(key)),
124                        new UnSubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)) );
125 
126          }
127          else if (methodName == MethodName.ERASE) {
128             if (obj.length != 2) {
129                throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
130                   "Expected 2 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
131             }
132             String qos = (String)obj[0];
133             String key = (String)obj[1];
134             return new MsgQueueEraseEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
135                        new Timestamp(timestamp), sizeInBytes,
136                        new EraseKey(glob, glob.getQueryKeyFactory().readObject(key)),
137                        new EraseQos(glob, glob.getQueryQosFactory().readObject(qos)) );
138 
139          }
140          else if (methodName == MethodName.GET) {
141             throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Object '" + type + "' not implemented, you can't use synchronous GET requests in queues.");
142          }
143          else if (methodName == MethodName.CONNECT) {
144             if (obj.length != 1) {
145                throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
146                   "Expected 1 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
147             }
148             String qos = (String)obj[0];
149             ConnectQosData connectQosData = glob.getConnectQosFactory().readObject(qos);
150             return new MsgQueueConnectEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
151                                             new Timestamp(timestamp), sizeInBytes, connectQosData);
152          }
153          else if (methodName == MethodName.DISCONNECT) {
154             if (obj.length != 1) {
155                throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
156                   "Expected 1 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
157             }
158             String qos = (String)obj[0];
159             DisconnectQos disconnectQos = new DisconnectQos(glob, glob.getDisconnectQosFactory().readObject(qos));
160             return new MsgQueueDisconnectEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
161                                             new Timestamp(timestamp), sizeInBytes, disconnectQos);
162          }
163          else if (methodName == MethodName.DUMMY) { // for testsuite only
164             byte[] bytes = (byte[])obj[0];
165             DummyEntry entry = new DummyEntry(glob, PriorityEnum.toPriorityEnum(priority), new Timestamp(timestamp),
166                   storageId, bytes.length, bytes, persistent);
167             //entry.setUniqueId(timestamp);
168             return entry;
169          }
170 
171       }
172       catch (Exception ex) {
173          throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "createEntry-" + methodName, ex);
174       }
175 
176       throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Object '" + type + "' not implemented");
177    }
178 
179    /**
180     * Is called after the instance is created.
181     * @param name A name identifying this plugin.
182     */
183    public void initialize(Global glob) {
184       this.glob = glob;
185       if (log.isLoggable(Level.FINE)) log.fine("successfully initialized");
186    }
187 
188    /**
189     * Allows to overwrite properties which where passed on initialize()
190     * The properties which support hot configuration are depending on the used implementation
191     */
192    public void setProperties(Object userData) {
193    }
194 
195    /**
196     * Access the current Parser configuration
197     */
198    public Object getProperties() {
199       return null;
200    }
201    
202    private StorageId getStorageId(XBStore store) {
203       return new StorageId(glob, store.getNode(), store.getType(), store.getPostfix());
204       // return new StorageId(glob, store.getType(), store.getType() +
205       // store.getPostfix());
206    }
207    
208    
209    public I_Entry createEntry(XBStore store, XBMeat meat, XBRef ref) throws XmlBlasterException {
210       String type = (ref != null) ? ref.getMethodName() : meat.getDataType();
211       StorageId storageId = getStorageId(store);
212       MethodName methodName = MethodName.toMethodName(type);
213       String key = meat.getKey();
214       String qos = meat.getQos();
215       byte[] content = meat.getContent();
216       long timestamp = (ref != null) ? ref.getId() : meat.getId();
217       int priority = (ref != null) ? ref.getPrio() : PriorityEnum.NORM_PRIORITY.getInt();
218       long sizeInBytes = meat.getByteSize();
219       boolean durable = (ref != null) ? ref.isDurable() : meat.isDurable();
220       try {
221          // MethodName.PING
222          // MethodName.UPDATE
223          // MethodName.UPDATE_ONEWAY
224          // MethodName.EXCEPTION
225          if (methodName == MethodName.PUBLISH_ONEWAY || methodName == MethodName.PUBLISH
226                || methodName == MethodName.PUBLISH_ARR) {
227             MsgQosData msgQosData = glob.getMsgQosFactory().readObject(qos);
228             MsgKeyData msgKeyData = glob.getMsgKeyFactory().readObject(key);
229             MsgUnit msgUnit = new MsgUnit(msgKeyData, content, msgQosData);
230             return new MsgQueuePublishEntry(glob, methodName, PriorityEnum.toPriorityEnum(priority), storageId,
231                                             new Timestamp(timestamp), sizeInBytes, msgUnit);
232          }
233          else if (methodName == MethodName.SUBSCRIBE) {
234             return new MsgQueueSubscribeEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
235                        new Timestamp(timestamp), sizeInBytes,
236                        glob.getQueryKeyFactory().readObject(key),
237                        glob.getQueryQosFactory().readObject(qos));
238 
239          }
240          else if (methodName == MethodName.UNSUBSCRIBE) {
241             return new MsgQueueUnSubscribeEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
242                        new Timestamp(timestamp), sizeInBytes,
243                        new UnSubscribeKey(glob, glob.getQueryKeyFactory().readObject(key)),
244                        new UnSubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)) );
245 
246          }
247          else if (methodName == MethodName.ERASE) {
248             return new MsgQueueEraseEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
249                        new Timestamp(timestamp), sizeInBytes,
250                        new EraseKey(glob, glob.getQueryKeyFactory().readObject(key)),
251                        new EraseQos(glob, glob.getQueryQosFactory().readObject(qos)) );
252 
253          }
254          else if (methodName == MethodName.GET) {
255             throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Object '" + type + "' not implemented, you can't use synchronous GET requests in queues.");
256          }
257          else if (methodName == MethodName.CONNECT) {
258             ConnectQosData connectQosData = glob.getConnectQosFactory().readObject(qos);
259             return new MsgQueueConnectEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
260                                             new Timestamp(timestamp), sizeInBytes, connectQosData);
261          }
262          else if (methodName == MethodName.DISCONNECT) {
263             DisconnectQos disconnectQos = new DisconnectQos(glob, glob.getDisconnectQosFactory().readObject(qos));
264             return new MsgQueueDisconnectEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
265                                             new Timestamp(timestamp), sizeInBytes, disconnectQos);
266          }
267          else if (methodName == MethodName.DUMMY) { // for testsuite only
268             DummyEntry entry = null;
269             if (content != null)
270                entry = new DummyEntry(glob, PriorityEnum.toPriorityEnum(priority), new Timestamp(timestamp), storageId,
271                      sizeInBytes, content, durable);
272             else
273                entry = new DummyEntry(glob, PriorityEnum.toPriorityEnum(priority), new Timestamp(timestamp), storageId, sizeInBytes, durable);
274             return entry;
275          }
276 
277       }
278       catch (Exception ex) {
279          throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "createEntry-" + methodName, ex);
280       }
281 
282       throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Object '" + type + "' not implemented");
283    }
284    
285    
286    
287 }


syntax highlighted by Code2HTML, v. 0.9.1