1 /*------------------------------------------------------------------------------
2 Name: ClientDispatchConnectionsHandler.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
5 ------------------------------------------------------------------------------*/
6 package org.xmlBlaster.client.dispatch;
7
8 import java.util.ArrayList;
9 import java.util.logging.Level;
10 import java.util.logging.Logger;
11
12 import org.xmlBlaster.util.Global;
13 import org.xmlBlaster.util.XmlBlasterException;
14 import org.xmlBlaster.util.dispatch.DispatchConnection;
15 import org.xmlBlaster.util.dispatch.DispatchConnectionsHandler;
16 import org.xmlBlaster.util.queue.I_QueueEntry;
17 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
18 import org.xmlBlaster.client.queuemsg.MsgQueueConnectEntry;
19 import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;
20 import org.xmlBlaster.client.queuemsg.MsgQueueSubscribeEntry;
21 import org.xmlBlaster.client.queuemsg.MsgQueueUnSubscribeEntry;
22 import org.xmlBlaster.client.queuemsg.MsgQueueEraseEntry;
23 import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;
24 import org.xmlBlaster.util.qos.StatusQosData;
25 import org.xmlBlaster.util.def.ErrorCode;
26 import org.xmlBlaster.util.def.MethodName;
27 import org.xmlBlaster.util.def.Constants;
28 import org.xmlBlaster.util.qos.address.AddressBase;
29 import org.xmlBlaster.client.qos.PublishReturnQos;
30 import org.xmlBlaster.client.qos.SubscribeReturnQos;
31 import org.xmlBlaster.client.qos.UnSubscribeReturnQos;
32 import org.xmlBlaster.client.qos.EraseReturnQos;
33 import org.xmlBlaster.client.qos.ConnectReturnQos;
34
35 /**
36 * Holding all necessary infos to establish a remote
37 * connection and invoke publish(), subscribe(), connect() etc.
38 * @see DispatchConnectionsHandler
39 * @author xmlBlaster@marcelruff.info
40 */
41 public final class ClientDispatchConnectionsHandler extends DispatchConnectionsHandler
42 {
43 private static Logger log = Logger.getLogger(ClientDispatchConnectionsHandler.class.getName());
44 public final String ME;
45
46 /**
47 * @param dispatchManager The message queue witch i belong to
48 * @param cbAddr The addresses i shall connect to
49 */
50 public ClientDispatchConnectionsHandler(Global glob, ClientDispatchManager dispatchManager) throws XmlBlasterException {
51 super(glob, dispatchManager);
52 this.ME = "ClientDispatchConnectionsHandler-" + dispatchManager.getQueue().getStorageId();
53 }
54
55 public boolean isUserThread() {
56 return !DispatchConnection.PING_THREAD_NAME.equals(Thread.currentThread().getName());
57 }
58
59 /**
60 * @return a new ClientDispatchConnection instance which has its plugin loaded
61 */
62 public DispatchConnection createDispatchConnection(AddressBase address) throws XmlBlasterException {
63 ClientDispatchConnection c = new ClientDispatchConnection(glob, this, address);
64 c.loadPlugin();
65 return c;
66 }
67
68 /**
69 * If no connection is available but the message is for example save queued,
70 * we can generate here valid return objects
71 * @param state e.g. Constants.STATE_OK
72 */
73 public void createFakedReturnObjects(I_QueueEntry[] entries, String state, String stateInfo) throws XmlBlasterException {
74 if (log.isLoggable(Level.FINER)) log.finer("Entering createFakedReturnObjects() for " + entries.length + " entries");
75
76 for (int ii=0; ii<entries.length; ii++) {
77 MsgQueueEntry msgQueueEntry = (MsgQueueEntry)entries[ii];
78 if (!msgQueueEntry.wantReturnObj())
79 continue;
80 StatusQosData statRetQos = new StatusQosData(glob, MethodName.UNKNOWN);
81 statRetQos.setStateInfo(stateInfo);
82 statRetQos.setState(state);
83 if (log.isLoggable(Level.FINE)) log.fine("Creating faked return for '" + msgQueueEntry.getMethodName() + "' invocation");
84
85 if (MethodName.PUBLISH_ONEWAY == msgQueueEntry.getMethodName()) {
86 MsgQueuePublishEntry entry = (MsgQueuePublishEntry)msgQueueEntry;
87 entry.setReturnObj(null);
88 }
89
90 else if (MethodName.PUBLISH == msgQueueEntry.getMethodName()) {
91 MsgQueuePublishEntry entry = (MsgQueuePublishEntry)msgQueueEntry;
92 if (!entry.getMsgKeyData().hasOid()) {
93 entry.getMsgKeyData().setOid(entry.getMsgKeyData().generateOid(entry.getSender().getRelativeName()));
94 }
95 statRetQos.setKeyOid(entry.getKeyOid());
96 PublishReturnQos publishReturnQos = new PublishReturnQos(glob, statRetQos);
97 //TODO: How to fake the RcvTimestamp -> it must be unique for an OID in the server
98 //publishReturnQos.getData().setRcvTimestamp(new org.xmlBlaster.util.RcvTimestamp());
99 entry.setReturnObj(publishReturnQos);
100 }
101
102 else if (MethodName.SUBSCRIBE == msgQueueEntry.getMethodName()) {
103 if (getDispatchManager().getSessionName().getPublicSessionId() < 1) {
104 // we should never allow a subscription without a positive sessionId if the
105 // server is not accessible
106 throw new XmlBlasterException(glob, ErrorCode.RESOURCE_TEMPORARY_UNAVAILABLE, ME,
107 "The Subscription for '" + getDispatchManager().getSessionName().toString() + "' failed since the server is currently not available");
108 }
109
110 MsgQueueSubscribeEntry entry = (MsgQueueSubscribeEntry)msgQueueEntry;
111 if (!entry.getSubscribeQosData().hasSubscriptionId()) {
112 entry.getSubscribeQosData().generateSubscriptionId(glob.getXmlBlasterAccess().getSessionName(), entry.getSubscribeKeyData());
113 //String subscriptionId = QueryKeyData.generateSubscriptionId(dispatchManager.getQueue().getStorageId().getPostfix());
114 //entry.getSubscribeQosData().setSubscriptionId(subscriptionId);
115 }
116 statRetQos.setSubscriptionId(entry.getSubscribeQosData().getSubscriptionId());
117 SubscribeReturnQos subscribeReturnQos = new SubscribeReturnQos(glob, statRetQos, true);
118 entry.setReturnObj(subscribeReturnQos);
119 }
120
121 else if (MethodName.UNSUBSCRIBE == msgQueueEntry.getMethodName()) {
122 MsgQueueUnSubscribeEntry entry = (MsgQueueUnSubscribeEntry)msgQueueEntry;
123 String id = entry.getUnSubscribeKey().getOid();
124 if (id != null && id.startsWith(Constants.SUBSCRIPTIONID_PREFIX)) {
125 statRetQos.setSubscriptionId(id);
126 UnSubscribeReturnQos[] unSubscribeReturnQosArr = new UnSubscribeReturnQos[] { new UnSubscribeReturnQos(glob, statRetQos) };
127 entry.setReturnObj(unSubscribeReturnQosArr);
128 }
129 else {
130 throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME,
131 "UnSubscribe on oid='" + id + "' is not possible in offline/polling mode without an exact subscription ID given. " +
132 "See 'http://www.xmlBlaster.org/xmlBlaster/doc/requirements/client.failsafe.html' for more details.");
133 }
134 }
135
136 else if (MethodName.ERASE == msgQueueEntry.getMethodName()) {
137 MsgQueueEraseEntry entry = (MsgQueueEraseEntry)msgQueueEntry;
138 if (entry.getEraseKey().isExact()) {
139 statRetQos.setKeyOid(entry.getEraseKey().getOid());
140 EraseReturnQos[] eraseReturnQosArr = new EraseReturnQos[] { new EraseReturnQos(glob, statRetQos) };
141 entry.setReturnObj(eraseReturnQosArr);
142 }
143 else {
144 throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME,
145 "Erase on oid='" + entry.getEraseKey().getOid() + "' is not possible in offline/polling mode without an exact topic oid given. " +
146 "See 'http://www.xmlBlaster.org/xmlBlaster/doc/requirements/client.failsafe.html' for more details.");
147 }
148 }
149
150 else if (MethodName.CONNECT == msgQueueEntry.getMethodName()) {
151 ConnectReturnQos connectReturnQos = new ConnectReturnQos(glob, ((MsgQueueConnectEntry)msgQueueEntry).getConnectQosData());
152 if (!connectReturnQos.getSessionName().isPubSessionIdUser()) {
153 throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME,
154 "Can't find an xmlBlaster server. Try to provide the server host/port as described in " +
155 "http://www.xmlblaster.org/xmlBlaster/doc/requirements/client.configuration.html " +
156 "or provide a public session ID to support polling for xmlBlaster without an initial connection. " +
157 "See 'http://www.xmlBlaster.org/xmlBlaster/doc/requirements/client.failsafe.html' for more details.");
158 }
159 msgQueueEntry.setReturnObj(connectReturnQos);
160 }
161
162 else if (MethodName.DISCONNECT == msgQueueEntry.getMethodName()) {
163 if (log.isLoggable(Level.FINE)) log.fine("disconnect returns void, nothing to do");
164 }
165
166 else if (MethodName.GET == msgQueueEntry.getMethodName()) {
167 MsgQueueGetEntry entry = (MsgQueueGetEntry)msgQueueEntry;
168 throw new XmlBlasterException(glob, ErrorCode.USER_CONFIGURATION, ME,
169 "Synchronous GET on oid='" + entry.getGetKey().getOid() + "' is not possible in offline/polling mode. " +
170 "See 'http://www.xmlBlaster.org/xmlBlaster/doc/requirements/client.failsafe.html' for more details.");
171 }
172
173 else {
174 log.severe("Internal problem, MsgQueueEntry '" + msgQueueEntry.getEmbeddedType() + "' not expected here");
175 }
176 }
177 }
178
179 public ArrayList filterDistributorEntries(ArrayList entries, Throwable ex) {
180 return entries;
181 }
182
183 }
syntax highlighted by Code2HTML, v. 0.9.1