1 /*------------------------------------------------------------------------------
  2 Name:      MsgQueuePublishEntry.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
  5 ------------------------------------------------------------------------------*/
  6 package org.xmlBlaster.client.queuemsg;
  7 
  8 import java.util.ArrayList;
  9 import java.util.logging.Level;
 10 import java.util.logging.Logger;
 11 
 12 import org.xmlBlaster.util.Global;
 13 import org.xmlBlaster.util.MsgUnit;
 14 import org.xmlBlaster.util.SessionName;
 15 import org.xmlBlaster.util.Timestamp;
 16 import org.xmlBlaster.util.XmlBlasterException;
 17 import org.xmlBlaster.util.def.MethodName;
 18 import org.xmlBlaster.util.def.PriorityEnum;
 19 import org.xmlBlaster.util.key.MsgKeyData;
 20 import org.xmlBlaster.util.qos.MsgQosData;
 21 import org.xmlBlaster.util.qos.address.Destination;
 22 import org.xmlBlaster.util.queue.StorageId;
 23 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
 24 
 25 /**
 26  * Wraps an publish() message into an entry for a sorted queue.
 27  * @author michele@laghi.eu
 28  * @author xmlBlaster@marcelruff.info
 29  */
 30 public final class MsgQueuePublishEntry extends MsgQueueEntry
 31 {
 32    /**
 33     * 
 34     */
 35    private static final long serialVersionUID = 1L;
 36    private static Logger log = Logger.getLogger(MsgQueuePublishEntry.class.getName());
 37    private final static String ME = "PublishQueueEntry";
 38    private final MsgQosData msgQosData;
 39    private SessionName receiver;
 40    /** The MsgUnit with key/content/qos (raw struct) */
 41    private MsgUnit msgUnit;
 42    private final long immutableSizeInBytes;
 43 
 44    /**
 45     * Use this constructor if a new message object is fed by method publish() (not oneway). 
 46     * <p />
 47     * @param msgUnit The raw data
 48     */
 49    public MsgQueuePublishEntry(Global glob, MsgUnit msgUnit, StorageId storageId) throws XmlBlasterException {
 50       this(glob, msgUnit, storageId, false);
 51    }
 52 
 53    public MsgQueuePublishEntry(Global glob, MsgUnit msgUnit, StorageId storageId, boolean oneway)
 54          throws XmlBlasterException {
 55       super(glob, oneway ? MethodName.PUBLISH_ONEWAY : MethodName.PUBLISH,
 56             ((MsgQosData)msgUnit.getQosData()).getPriority(), storageId, ((MsgQosData)msgUnit.getQosData()).isPersistent());
 57       if (msgUnit == null) {
 58          log.severe("Invalid constructor parameter");
 59          Thread.dumpStack();
 60          throw new IllegalArgumentException(ME + ": Invalid constructor parameter");
 61       }
 62       if (log.isLoggable(Level.FINER)) log.finer("Created: " + getUniqueId());
 63       this.msgUnit = msgUnit;
 64       this.msgQosData = (MsgQosData)msgUnit.getQosData();
 65 
 66       // Estimated calculation of used memory by one MsgUnitWrapper instance
 67       // = Object memory + payload
 68       // Where following objects need to be created (approx. 660 bytes RAM):
 69       // 6 PropBoolean
 70       // 1 PropLong
 71       // 1 Timestamp
 72       // 1 MsgQosData
 73       // 1 MsgKeyData
 74       // 1 MsgUnit
 75       // 1 MsgQueuePublishEntry
 76       this.immutableSizeInBytes = 660 + this.msgUnit.size();
 77    }
 78 
 79    /**
 80     * For persistence recovery
 81     * @param priority PriorityEnum.MIN1_PRIORITY etc does not work, the priority from qos is used (remove this parameter)
 82     * @param sizeInByte The estimated size of the entry in RAM (can be totally different on HD). 
 83     */
 84    public MsgQueuePublishEntry(Global glob, MethodName entryType, PriorityEnum priority, StorageId storageId,
 85                                Timestamp publishEntryTimestamp, long sizeInBytes,
 86                                MsgUnit msgUnit) {
 87       super(glob, entryType.toString(), ((MsgQosData)msgUnit.getQosData()).getPriority(),
 88             publishEntryTimestamp, storageId, ((MsgQosData)msgUnit.getQosData()).isPersistent());
 89 //      if (msgUnit == null) {
 90 //         log.severe("Invalid constructor parameter");
 91 //         Thread.dumpStack();
 92 //         throw new IllegalArgumentException(ME + ": Invalid constructor parameter");
 93 //      }
 94       this.msgUnit = msgUnit;
 95       this.msgQosData = (MsgQosData)msgUnit.getQosData();
 96       this.immutableSizeInBytes = sizeInBytes;
 97       if (log.isLoggable(Level.FINER)) log.finer("Created from persistence: " + getUniqueId());
 98    }
 99 
100    /**
101     * @see MsgQueueEntry#isExpired
102     */
103    public boolean isExpired() {
104       return this.msgQosData.isExpired();
105    }
106 
107    /**
108     * @see MsgQueueEntry#isDestroyed
109     */
110    public boolean isDestroyed() {
111       return false;
112    }
113 
114    /**
115     * Get the message unit, you must call getUpdateQos(int,int,int) before to generate the update QoS.
116     * <p />
117     * See private getUpdateQos(int,int,int)
118     */
119    public MsgUnit getMsgUnit() {
120       return this.msgUnit;
121    }
122 
123    /**
124     * Try to find out the approximate memory consumption of this message.
125     * <p />
126     * @return The size in bytes
127     */
128    public long getSizeInBytes() {
129       return this.immutableSizeInBytes;
130    }
131 
132    /**
133     * @return If it is an internal message (oid starting with "_"). 
134     */
135    public boolean isInternal() {
136       return (getMsgKeyData().isInternal() || getMsgKeyData().isPluginInternal());
137    }
138 
139    /**
140     * Access the unique login name of the (last) publisher.
141     * <p />
142     * The sender of this message.
143     * @return loginName of the data source which last publishd this message
144     *         or null
145     * @see MsgQueueEntry#getSender()
146     */
147    public SessionName getSender() {
148       return this.msgQosData.getSender();
149    }
150 
151    /**
152     * @return The name of the receiver (data sink) or null
153     * @see MsgQueueEntry#getReceiver()
154     */
155    public void setReceiver(SessionName receiver) {
156       this.receiver = receiver;
157    }
158 
159    /**
160     * @return The name of the receiver (data sink) or null
161     * @see MsgQueueEntry#getReceiver()
162     */
163    public SessionName getReceiver() {
164       if (this.receiver == null) {
165          ArrayList list = this.msgQosData.getDestinations();
166          if (list != null && list.size() >0) {
167             Destination d = (Destination) list.get(0);
168             this.receiver = d.getDestination();
169             if (list.size() > 1)
170                log.warning("Ignoring other receivers with getReceiver()");
171          }
172       }
173       return this.receiver;
174    }
175 
176    public MsgKeyData getMsgKeyData() {
177       return (MsgKeyData)getMsgUnit().getKeyData();
178    }
179 
180    /**
181     * @see MsgQueueEntry#getKeyOid()
182     */
183    public String getKeyOid() {
184       return getMsgKeyData().getOid();
185    }
186 
187    /**
188     * The embedded object. 
189     * @return qos.toXml, key.toXml, contentBytes
190     */
191    public Object getEmbeddedObject() {
192       Object[] obj = { this.msgUnit.getQosData().toXml(),
193                        this.msgUnit.getKeyData().toXml(),
194                        this.msgUnit.getContent() };
195       return obj;
196    }
197 
198    public final void embeddedObjectToXml(java.io.OutputStream out, java.util.Properties props) throws java.io.IOException {
199       MsgUnit msgUnit = this.msgUnit;
200       if (msgUnit != null)
201          msgUnit.toXml(out, props);
202    }
203 
204    /**
205     * Returns a shallow clone
206     */
207    public Object clone() {
208       MsgQueuePublishEntry entry = null;
209       entry = (MsgQueuePublishEntry)super.clone();
210       return entry;
211    }
212 }


syntax highlighted by Code2HTML, v. 0.9.1