1 /*------------------------------------------------------------------------------
2 Name: ClientEntryFactory.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 Comment: Implementation for the I_EntryFactory
6 ------------------------------------------------------------------------------*/
7 package org.xmlBlaster.client.queuemsg;
8
9 import java.io.ByteArrayOutputStream;
10 import java.io.IOException;
11 import java.io.InputStream;
12 import java.io.ObjectInputStream;
13 import java.io.ObjectOutputStream;
14 import java.util.logging.Level;
15 import java.util.logging.Logger;
16
17 import org.xmlBlaster.client.key.EraseKey;
18 import org.xmlBlaster.client.key.UnSubscribeKey;
19 import org.xmlBlaster.client.qos.DisconnectQos;
20 import org.xmlBlaster.client.qos.EraseQos;
21 import org.xmlBlaster.client.qos.UnSubscribeQos;
22 import org.xmlBlaster.util.Global;
23 import org.xmlBlaster.util.MsgUnit;
24 import org.xmlBlaster.util.Timestamp;
25 import org.xmlBlaster.util.XmlBlasterException;
26 import org.xmlBlaster.util.def.ErrorCode;
27 import org.xmlBlaster.util.def.MethodName;
28 import org.xmlBlaster.util.def.PriorityEnum;
29 import org.xmlBlaster.util.key.MsgKeyData;
30 import org.xmlBlaster.util.qos.ConnectQosData;
31 import org.xmlBlaster.util.qos.MsgQosData;
32 import org.xmlBlaster.util.queue.I_Entry;
33 import org.xmlBlaster.util.queue.I_EntryFactory;
34 import org.xmlBlaster.util.queue.StorageId;
35 import org.xmlBlaster.util.queue.jdbc.XBMeat;
36 import org.xmlBlaster.util.queue.jdbc.XBRef;
37 import org.xmlBlaster.util.queue.jdbc.XBStore;
38 import org.xmlBlaster.util.queuemsg.DummyEntry;
39
40
41 /**
42 * The implementation of the interface which can be used to convert an object
43 * which implements the interface I_Entry to an Object and back. This is
44 * useful for example if you want to store such entries in persistent storage
45 * like a database or a file system. It might however be used even for other
46 * purposes.
47 * @author michele@laghi.eu
48 * @author xmlBlaster@marcelruff.info
49 */
50 public class ClientEntryFactory implements I_EntryFactory
51 {
52 private final static String ME = "ClientEntryFactory";
53 private Global glob = null;
54 private static Logger log = Logger.getLogger(ClientEntryFactory.class.getName());
55
56 /**
57 * Parses the specified entry to a byte array (serializing).
58 */
59 public byte[] toBlob(I_Entry entry) throws XmlBlasterException {
60 try {
61 Object obj = entry.getEmbeddedObject();
62 ByteArrayOutputStream baos = new ByteArrayOutputStream();
63 ObjectOutputStream objStream = new ObjectOutputStream(baos);
64 objStream.writeObject(obj);
65 return baos.toByteArray();
66 }
67 catch (IOException ex) {
68 log.severe("toBlob: " + ex.getMessage());
69 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "toBlob()", ex);
70 }
71 }
72
73 /**
74 * Parses back the raw data to a I_Entry (deserializing)
75 * @param type see ENTRY_TYPE_MSG etc.
76 */
77 public I_Entry createEntry(int priority, long timestamp, String type,
78 boolean persistent, long sizeInBytes, InputStream is, StorageId storageId)
79 throws XmlBlasterException {
80
81 MethodName methodName = MethodName.toMethodName(type);
82
83 try {
84 ObjectInputStream objStream = new ObjectInputStream(is);
85 Object[] obj = (Object[])objStream.readObject();
86
87 if (methodName == MethodName.PUBLISH_ONEWAY || methodName == MethodName.PUBLISH) {
88 if (obj.length != 3) {
89 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
90 "Expected 3 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
91 }
92 String qos = (String)obj[0];
93 String key = (String)obj[1];
94 byte[] content = (byte[])obj[2];
95 MsgQosData msgQosData = glob.getMsgQosFactory().readObject(qos);
96 MsgKeyData msgKeyData = glob.getMsgKeyFactory().readObject(key);
97 MsgUnit msgUnit = new MsgUnit(msgKeyData, content, msgQosData);
98 return new MsgQueuePublishEntry(glob, methodName, PriorityEnum.toPriorityEnum(priority), storageId,
99 new Timestamp(timestamp), sizeInBytes, msgUnit);
100 }
101 else if (methodName == MethodName.SUBSCRIBE) {
102 if (obj.length != 2) {
103 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
104 "Expected 2 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
105 }
106 String qos = (String)obj[0];
107 String key = (String)obj[1];
108 return new MsgQueueSubscribeEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
109 new Timestamp(timestamp), sizeInBytes,
110 glob.getQueryKeyFactory().readObject(key),
111 glob.getQueryQosFactory().readObject(qos));
112
113 }
114 else if (methodName == MethodName.UNSUBSCRIBE) {
115 if (obj.length != 2) {
116 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
117 "Expected 2 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
118 }
119 String qos = (String)obj[0];
120 String key = (String)obj[1];
121 return new MsgQueueUnSubscribeEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
122 new Timestamp(timestamp), sizeInBytes,
123 new UnSubscribeKey(glob, glob.getQueryKeyFactory().readObject(key)),
124 new UnSubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)) );
125
126 }
127 else if (methodName == MethodName.ERASE) {
128 if (obj.length != 2) {
129 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
130 "Expected 2 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
131 }
132 String qos = (String)obj[0];
133 String key = (String)obj[1];
134 return new MsgQueueEraseEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
135 new Timestamp(timestamp), sizeInBytes,
136 new EraseKey(glob, glob.getQueryKeyFactory().readObject(key)),
137 new EraseQos(glob, glob.getQueryQosFactory().readObject(qos)) );
138
139 }
140 else if (methodName == MethodName.GET) {
141 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Object '" + type + "' not implemented, you can't use synchronous GET requests in queues.");
142 }
143 else if (methodName == MethodName.CONNECT) {
144 if (obj.length != 1) {
145 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
146 "Expected 1 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
147 }
148 String qos = (String)obj[0];
149 ConnectQosData connectQosData = glob.getConnectQosFactory().readObject(qos);
150 return new MsgQueueConnectEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
151 new Timestamp(timestamp), sizeInBytes, connectQosData);
152 }
153 else if (methodName == MethodName.DISCONNECT) {
154 if (obj.length != 1) {
155 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME,
156 "Expected 1 entries in serialized object '" + type + "' but got " + obj.length + " for priority=" + priority + " timestamp=" + timestamp + ". Could be a version incompatibility.");
157 }
158 String qos = (String)obj[0];
159 DisconnectQos disconnectQos = new DisconnectQos(glob, glob.getDisconnectQosFactory().readObject(qos));
160 return new MsgQueueDisconnectEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
161 new Timestamp(timestamp), sizeInBytes, disconnectQos);
162 }
163 else if (methodName == MethodName.DUMMY) { // for testsuite only
164 byte[] bytes = (byte[])obj[0];
165 DummyEntry entry = new DummyEntry(glob, PriorityEnum.toPriorityEnum(priority), new Timestamp(timestamp),
166 storageId, bytes.length, bytes, persistent);
167 //entry.setUniqueId(timestamp);
168 return entry;
169 }
170
171 }
172 catch (Exception ex) {
173 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "createEntry-" + methodName, ex);
174 }
175
176 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Object '" + type + "' not implemented");
177 }
178
179 /**
180 * Is called after the instance is created.
181 * @param name A name identifying this plugin.
182 */
183 public void initialize(Global glob) {
184 this.glob = glob;
185 if (log.isLoggable(Level.FINE)) log.fine("successfully initialized");
186 }
187
188 /**
189 * Allows to overwrite properties which where passed on initialize()
190 * The properties which support hot configuration are depending on the used implementation
191 */
192 public void setProperties(Object userData) {
193 }
194
195 /**
196 * Access the current Parser configuration
197 */
198 public Object getProperties() {
199 return null;
200 }
201
202 private StorageId getStorageId(XBStore store) {
203 return new StorageId(glob, store.getNode(), store.getType(), store.getPostfix());
204 // return new StorageId(glob, store.getType(), store.getType() +
205 // store.getPostfix());
206 }
207
208
209 public I_Entry createEntry(XBStore store, XBMeat meat, XBRef ref) throws XmlBlasterException {
210 String type = (ref != null) ? ref.getMethodName() : meat.getDataType();
211 StorageId storageId = getStorageId(store);
212 MethodName methodName = MethodName.toMethodName(type);
213 String key = meat.getKey();
214 String qos = meat.getQos();
215 byte[] content = meat.getContent();
216 long timestamp = (ref != null) ? ref.getId() : meat.getId();
217 int priority = (ref != null) ? ref.getPrio() : PriorityEnum.NORM_PRIORITY.getInt();
218 long sizeInBytes = meat.getByteSize();
219 boolean durable = (ref != null) ? ref.isDurable() : meat.isDurable();
220 try {
221 // MethodName.PING
222 // MethodName.UPDATE
223 // MethodName.UPDATE_ONEWAY
224 // MethodName.EXCEPTION
225 if (methodName == MethodName.PUBLISH_ONEWAY || methodName == MethodName.PUBLISH
226 || methodName == MethodName.PUBLISH_ARR) {
227 MsgQosData msgQosData = glob.getMsgQosFactory().readObject(qos);
228 MsgKeyData msgKeyData = glob.getMsgKeyFactory().readObject(key);
229 MsgUnit msgUnit = new MsgUnit(msgKeyData, content, msgQosData);
230 return new MsgQueuePublishEntry(glob, methodName, PriorityEnum.toPriorityEnum(priority), storageId,
231 new Timestamp(timestamp), sizeInBytes, msgUnit);
232 }
233 else if (methodName == MethodName.SUBSCRIBE) {
234 return new MsgQueueSubscribeEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
235 new Timestamp(timestamp), sizeInBytes,
236 glob.getQueryKeyFactory().readObject(key),
237 glob.getQueryQosFactory().readObject(qos));
238
239 }
240 else if (methodName == MethodName.UNSUBSCRIBE) {
241 return new MsgQueueUnSubscribeEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
242 new Timestamp(timestamp), sizeInBytes,
243 new UnSubscribeKey(glob, glob.getQueryKeyFactory().readObject(key)),
244 new UnSubscribeQos(glob, glob.getQueryQosFactory().readObject(qos)) );
245
246 }
247 else if (methodName == MethodName.ERASE) {
248 return new MsgQueueEraseEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
249 new Timestamp(timestamp), sizeInBytes,
250 new EraseKey(glob, glob.getQueryKeyFactory().readObject(key)),
251 new EraseQos(glob, glob.getQueryQosFactory().readObject(qos)) );
252
253 }
254 else if (methodName == MethodName.GET) {
255 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Object '" + type + "' not implemented, you can't use synchronous GET requests in queues.");
256 }
257 else if (methodName == MethodName.CONNECT) {
258 ConnectQosData connectQosData = glob.getConnectQosFactory().readObject(qos);
259 return new MsgQueueConnectEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
260 new Timestamp(timestamp), sizeInBytes, connectQosData);
261 }
262 else if (methodName == MethodName.DISCONNECT) {
263 DisconnectQos disconnectQos = new DisconnectQos(glob, glob.getDisconnectQosFactory().readObject(qos));
264 return new MsgQueueDisconnectEntry(glob, PriorityEnum.toPriorityEnum(priority), storageId,
265 new Timestamp(timestamp), sizeInBytes, disconnectQos);
266 }
267 else if (methodName == MethodName.DUMMY) { // for testsuite only
268 DummyEntry entry = null;
269 if (content != null)
270 entry = new DummyEntry(glob, PriorityEnum.toPriorityEnum(priority), new Timestamp(timestamp), storageId,
271 sizeInBytes, content, durable);
272 else
273 entry = new DummyEntry(glob, PriorityEnum.toPriorityEnum(priority), new Timestamp(timestamp), storageId, sizeInBytes, durable);
274 return entry;
275 }
276
277 }
278 catch (Exception ex) {
279 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "createEntry-" + methodName, ex);
280 }
281
282 throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "Object '" + type + "' not implemented");
283 }
284
285
286
287 }
syntax highlighted by Code2HTML, v. 0.9.1