1 /*------------------------------------------------------------------------------
2 Name: Publisher.java
3 Project: xmlBlaster.org
4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE filep
5 ------------------------------------------------------------------------------*/
6
7 package org.xmlBlaster.client.filepoller;
8
9 import java.util.Set;
10
11 import java.util.logging.Logger;
12 import java.util.logging.Level;
13 import org.xmlBlaster.client.I_XmlBlasterAccess;
14 import org.xmlBlaster.client.key.PublishKey;
15 import org.xmlBlaster.client.qos.ConnectQos;
16 import org.xmlBlaster.client.qos.DisconnectQos;
17 import org.xmlBlaster.util.Global;
18 import org.xmlBlaster.util.I_Timeout;
19 import org.xmlBlaster.util.MsgUnit;
20 import org.xmlBlaster.util.Timeout;
21 import org.xmlBlaster.util.Timestamp;
22 import org.xmlBlaster.util.XmlBlasterException;
23 import org.xmlBlaster.util.def.Constants;
24 import org.xmlBlaster.util.def.ErrorCode;
25 import org.xmlBlaster.util.plugin.I_PluginConfig;
26 import org.xmlBlaster.util.qos.ConnectQosData;
27
28
29 /**
30 * Publisher
31 * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
32 * @deprectated it is now replaced by the corresponding class in org.xmlBlaster.contrib.filewatcher
33 */
34 public class Publisher implements I_Timeout {
35
36 private String ME = "Publisher";
37 private Global global;
38 private static Logger log = Logger.getLogger(Publisher.class.getName());
39 private DirectoryManager directoryManager;
40 private I_XmlBlasterAccess access;
41 private String publishKey;
42 private String publishQos;
43 private ConnectQos connectQos;
44
45 private long pollInterval;
46 private long maximumFileSize;
47 private String fileFilter;
48 private String filterType;
49 private String directoryName;
50 private boolean copyOnMove;
51 private String sent;
52 private String discarded;
53 private String lockExtention;
54 private long delaySinceLastFileChange;
55
56 public static final String USE_REGEX = "regex";
57
58 private Timestamp timeoutHandle;
59 private static Timeout timeout = new Timeout("FileSystem-Poller");
60
61 /** used to identify if it has shut down (to get a new global) */
62 private boolean isShutdown;
63 /** used to break the loop in doPublish when shutting down */
64 private boolean forceShutdown;
65
66 /** only used as a default login name and logging */
67 private String name;
68
69 private boolean isActive;
70
71
72 // private I_PluginConfig pluginConfig;
73
74 public Publisher(Global globOrig, String name, I_PluginConfig pluginConfig) throws XmlBlasterException {
75 ME += "-" + name;
76 this.name = name;
77 this.isShutdown = false;
78 this.global = globOrig.getClone(globOrig.getNativeConnectArgs()); // sets session.timeout to 0 etc.
79 // this.pluginConfig = pluginConfig;
80
81 if (log.isLoggable(Level.FINER))
82 log.finer(ME+": constructor");
83 // retrieve all necessary properties:
84 String tmp = null;
85 tmp = this.global.get("publishKey", (String)null, null, pluginConfig);
86 String topicName = this.global.get("topicName", (String)null, null, pluginConfig);
87 if (tmp != null) {
88 // this.publishKey = new PublishKey(this.global, new MsgKeyData(this.global, tmp));
89 this.publishKey = tmp;
90 if (topicName != null)
91 log.warning(ME+": constructor: since 'publishKey' is defined, 'topicName' will be ignored");
92 }
93 else {
94 if (topicName == null)
95 throw new XmlBlasterException(this.global, ErrorCode.USER_CONFIGURATION, ME, "at least one of the properties 'topicName' or 'publishKey' must be defined");
96 this.publishKey = (new PublishKey(this.global, topicName)).toXml();
97 }
98
99 this.publishQos = this.global.get("publishQos", "<qos/>", null, pluginConfig);
100
101 tmp = this.global.get("connectQos", (String)null, null, pluginConfig);
102 if (tmp != null) {
103 ConnectQosData data = this.global.getConnectQosFactory().readObject(tmp);
104 this.connectQos = new ConnectQos(this.global, data);
105 }
106 else {
107 String userId = this.global.get("loginName", "_" + this.name, null, pluginConfig);
108 String password = this.global.get("password", (String)null, null, pluginConfig);
109 this.connectQos = new ConnectQos(this.global, userId, password);
110 this.global.addObjectEntry(Constants.OBJECT_ENTRY_ServerScope, globOrig.getObjectEntry(Constants.OBJECT_ENTRY_ServerScope));
111 }
112
113 this.fileFilter = this.global.get("fileFilter", (String)null, null, pluginConfig);
114 this.directoryName = this.global.get("directoryName", (String)null, null, pluginConfig);
115 if (directoryName == null)
116 throw new XmlBlasterException(this.global, ErrorCode.USER_CONFIGURATION, ME, "constructor: 'directoryName' is mandatory");
117
118 this.maximumFileSize = this.global.get("maximumFileSize", 10000000L, null, pluginConfig);
119 this.delaySinceLastFileChange = this.global.get("delaySinceLastFileChange", 10000L, null, pluginConfig);
120 this.pollInterval = this.global.get("pollInterval", 2000L, null, pluginConfig);
121
122 this.sent = this.global.get("sent", (String)null, null, pluginConfig);
123 this.discarded = this.global.get("discarded", (String)null, null, pluginConfig);
124 this.lockExtention = this.global.get("lockExtention", (String)null, null, pluginConfig);
125
126 // this would throw an exception and act as a validation if something is not OK in configuration
127 new MsgUnit(this.publishKey, (byte[])null, this.publishQos);
128 this.filterType = this.global.get("filterType", "simple", null, pluginConfig);
129 this.copyOnMove = this.global.get("copyOnMove", true, null, pluginConfig);
130
131 createDirectoryManager();
132 }
133
134 /**
135 * Create the file checker instance with the current configuration.
136 * @throws XmlBlasterException
137 */
138 private void createDirectoryManager() throws XmlBlasterException {
139 boolean isTrueRegex = USE_REGEX.equalsIgnoreCase(filterType);
140 this.directoryManager = new DirectoryManager(this.global,
141 this.name, this.directoryName, this.delaySinceLastFileChange,
142 this.fileFilter, this.sent, this.discarded, this.lockExtention, isTrueRegex,
143 this.copyOnMove);
144 }
145
146 /**
147 * Useful for JMX invocations
148 */
149 private void reCreateDirectoryManager() {
150 try {
151 createDirectoryManager();
152 } catch (XmlBlasterException e) {
153 throw new IllegalArgumentException(e.getMessage());
154 }
155 }
156
157 public String toString() {
158 return "FilePoller " + this.filterType + " directoryName=" + this.directoryName + " fileFilter='" + this.fileFilter + "'";
159 }
160
161 /**
162 * Connects to the xmlBlaster.
163 *
164 * @throws XmlBlasterException
165 */
166 public synchronized void init() throws XmlBlasterException {
167 if (log.isLoggable(Level.FINER))
168 log.finer(ME+": init");
169 if (this.isShutdown) { // on a second init
170 this.global = this.global.getClone(null);
171 }
172 this.isShutdown = false;
173 this.forceShutdown = false;
174 this.access = this.global.getXmlBlasterAccess();
175 // no callback listener (we are not subscribing and don't want ptp)
176 this.access.connect(this.connectQos, null);
177 this.isActive = true;
178 if (this.pollInterval >= 0)
179 this.timeoutHandle = timeout.addTimeoutListener(this, this.pollInterval, null);
180 }
181
182 /**
183 * If an exception occurs it means it could not publish the entry
184 * @throws XmlBlasterException
185 */
186 public void shutdown() throws XmlBlasterException {
187 if (log.isLoggable(Level.FINER))
188 log.finer(ME+": shutdown");
189 timeout.removeTimeoutListener(this.timeoutHandle);
190 this.isActive = false;
191 this.forceShutdown = true; // in case doPublish is looping due to an exception
192 synchronized (this) {
193 this.isShutdown = false;
194 this.access.disconnect(new DisconnectQos(this.global));
195 this.global.shutdown();
196 }
197 }
198
199 /**
200 * Fail-safe sending files.
201 * @return Comman separated list of send file names
202 */
203 public synchronized void publish() {
204 while (true) {
205 try {
206 doPublish();
207 break;
208 }
209 catch (XmlBlasterException ex) {
210 log.severe(ME+": publish: exception " + ex.getMessage());
211 try {
212 Thread.sleep(this.pollInterval);
213 }
214 catch (Exception e) {}
215 }
216 if (this.forceShutdown)
217 break;
218 }
219 }
220
221 /**
222 * Create a comma separated list of file names.
223 * @param infos
224 * @param max Max file names to collect
225 * @return
226 */
227 public String toString(FileInfo[] infos, int max) {
228 StringBuffer sb = new StringBuffer();
229 if (max <= 0) max = infos.length;
230 if (max > infos.length) max = infos.length;
231 for (int i=0; i<max; i++) {
232 if (i>0) sb.append(",");
233 sb.append(infos[i].getRelativeName());
234 }
235 return sb.toString();
236 }
237
238 /**
239 * Publish file to xmlBlaster.
240 * @return An empty string if nothing was sent, is never null
241 * @throws XmlBlasterException
242 */
243 private FileInfo[] doPublish() throws XmlBlasterException {
244 if (log.isLoggable(Level.FINER))
245 log.finer(ME+": doPublish");
246 Set entries = this.directoryManager.getEntries();
247 if (entries == null || entries.size() < 1)
248 return new FileInfo[0];
249 FileInfo[] infos = (FileInfo[])entries.toArray(new FileInfo[entries.size()]);
250 for (int i=0; i < infos.length; i++) {
251 if (this.maximumFileSize <= 0L || infos[i].getSize() <= this.maximumFileSize) {
252 if (infos[i].getSize() > Integer.MAX_VALUE) {
253 log.severe(ME+": doPublish: sizes bigger than '" + Integer.MAX_VALUE + "' are currently not implemented");
254 }
255 else {
256 byte[] content = this.directoryManager.getContent(infos[i]);
257 MsgUnit msgUnit = new MsgUnit(this.publishKey, content, this.publishQos);
258 msgUnit.getQosData().addClientProperty("_fileName", infos[i].getRelativeName());
259 msgUnit.getQosData().addClientProperty("_fileDate", infos[i].getTimestamp());
260 this.access.publish(msgUnit);
261 if (log.isLoggable(Level.FINE))
262 log.fine(ME+": Successfully published file " + infos[i].getRelativeName() + " with size=" +infos[i].getSize());
263 }
264
265 while (true) { // must repeat until it works or until shut down
266 try {
267 boolean success = true;
268 this.directoryManager.deleteOrMoveEntry(infos[i].getName(), success);
269 break;
270 }
271 catch (XmlBlasterException ex) {
272 log.severe(ME+": Moving " + infos[i].getName() + " failed, we try again without further publishing (please fix manually): " + ex.getMessage());
273 try {
274 Thread.sleep(this.pollInterval);
275 }
276 catch (Exception e){}
277 }
278 if (this.forceShutdown)
279 break;
280 }
281 }
282 else { // delete or move to 'discarded'
283 log.warning(ME+": doPublish: the file '" + infos[i].getName() + "' is too long (" + infos[i].getSize() + "'): I will remove it without publishing");
284 boolean success = false;
285 try {
286 this.directoryManager.deleteOrMoveEntry(infos[i].getName(), success);
287 }
288 catch (XmlBlasterException ex) {
289 log.warning(ME+": doPublish: could not handle file '" + infos[i].getName() + "' which was too big: check file and directories permissions and fix it manually: I will continue working anyway. " + ex.getMessage());
290 }
291 }
292 }
293 return infos;
294 }
295
296 /**
297 * @see org.xmlBlaster.util.I_Timeout#timeout(java.lang.Object)
298 */
299 public void timeout(Object userData) {
300 try {
301 if (log.isLoggable(Level.FINER))
302 log.finer(ME+": timeout");
303 publish();
304 }
305 catch (Throwable ex) {
306 ex.printStackTrace();
307 log.severe(ME+": timeout: " + ex.getMessage());
308 }
309 finally {
310 if (this.pollInterval >= 0)
311 this.timeoutHandle = timeout.addTimeoutListener(this, this.pollInterval, null);
312 }
313 }
314
315 public void activate() throws Exception {
316 if (!this.isActive) {
317 this.isActive = true;
318 if (this.pollInterval >= 0)
319 this.timeoutHandle = timeout.addTimeoutListener(this, this.pollInterval, null);
320 }
321 }
322
323 /* (non-Javadoc)
324 * @see org.xmlBlaster.util.admin.I_AdminService#deActivate()
325 */
326 public void deActivate() {
327 timeout.removeTimeoutListener(this.timeoutHandle);
328 this.timeoutHandle = null;
329 this.isActive = false;
330 }
331
332 /* (non-Javadoc)
333 * @see org.xmlBlaster.util.admin.I_AdminService#isActive()
334 */
335 public boolean isActive() {
336 return this.isActive;
337 }
338
339 public String triggerScan() {
340 try {
341 //this.timeoutHandle = timeout.addTimeoutListener(this, 0, null);
342 // Hack: I need to call it twice to be effective, why? (Marcel 2006-01)
343 for (int i=0; i<2; i++) {
344 FileInfo[] infos = doPublish();
345 if (infos.length > 0) {
346 return "Published matching files '" + toString(infos, 10) + "'";
347 }
348 }
349 if (this.delaySinceLastFileChange > 0)
350 return "No matching file found to publish, note that it may take delaySinceLastFileChange=" + this.delaySinceLastFileChange + " millis until the file is sent.";
351 else
352 return "No matching file found to publish.";
353 } catch (XmlBlasterException e) {
354 throw new IllegalArgumentException(e.getMessage());
355 }
356 }
357
358 /**
359 * @return Returns the directoryName.
360 */
361 public String getDirectoryName() {
362 return this.directoryName;
363 }
364
365 /**
366 * @param directoryName The directoryName to set.
367 */
368 public void setDirectoryName(String directoryName) {
369 this.directoryName = directoryName;
370 reCreateDirectoryManager();
371 }
372
373 /**
374 * @return Returns the fileFilter.
375 */
376 public String getFileFilter() {
377 return this.fileFilter;
378 }
379
380 /**
381 * @param fileFilter The fileFilter to set.
382 */
383 public void setFileFilter(String fileFilter) {
384 this.fileFilter = fileFilter;
385 reCreateDirectoryManager();
386 }
387
388 /**
389 * @return Returns the filterType.
390 */
391 public String getFilterType() {
392 return this.filterType;
393 }
394
395 /**
396 * @param filterType The filterType to set.
397 */
398 public void setFilterType(String filterType) {
399 this.filterType = filterType;
400 reCreateDirectoryManager();
401 }
402
403 /**
404 * @return Returns the maximumFileSize.
405 */
406 public long getMaximumFileSize() {
407 return this.maximumFileSize;
408 }
409
410 /**
411 * @param maximumFileSize The maximumFileSize to set.
412 */
413 public void setMaximumFileSize(long maximumFileSize) {
414 this.maximumFileSize = maximumFileSize;
415 }
416
417 /**
418 * @return Returns the pollInterval.
419 */
420 public long getPollInterval() {
421 return this.pollInterval;
422 }
423
424 /**
425 * @param pollInterval The pollInterval to set.
426 */
427 public void setPollInterval(long pollInterval) {
428 this.pollInterval = pollInterval;
429 if (this.pollInterval < 0)
430 deActivate();
431 }
432
433 /**
434 * @return Returns the copyOnMove.
435 */
436 public boolean isCopyOnMove() {
437 return this.copyOnMove;
438 }
439
440 /**
441 * @param copyOnMove The copyOnMove to set.
442 */
443 public void setCopyOnMove(boolean copyOnMove) {
444 this.copyOnMove = copyOnMove;
445 reCreateDirectoryManager();
446 }
447
448 /**
449 * @return Returns the delaySinceLastFileChange.
450 */
451 public long getDelaySinceLastFileChange() {
452 return this.delaySinceLastFileChange;
453 }
454
455 /**
456 * @param delaySinceLastFileChange The delaySinceLastFileChange to set.
457 */
458 public void setDelaySinceLastFileChange(long delaySinceLastFileChange) {
459 this.delaySinceLastFileChange = delaySinceLastFileChange;
460 reCreateDirectoryManager();
461 }
462
463 /**
464 * @return Returns the discarded.
465 */
466 public String getDiscarded() {
467 return this.discarded;
468 }
469
470 /**
471 * @param discarded The discarded to set.
472 */
473 public void setDiscarded(String discarded) {
474 this.discarded = discarded;
475 reCreateDirectoryManager();
476 }
477
478 /**
479 * @return Returns the lockExtention.
480 */
481 public String getLockExtention() {
482 return this.lockExtention;
483 }
484
485 /**
486 * @param lockExtention The lockExtention to set.
487 */
488 public void setLockExtention(String lockExtention) {
489 this.lockExtention = lockExtention;
490 reCreateDirectoryManager();
491 }
492
493 /**
494 * @return Returns the sent.
495 */
496 public String getSent() {
497 return this.sent;
498 }
499
500 /**
501 * @param sent The sent to set.
502 */
503 public void setSent(String sent) {
504 this.sent = sent;
505 reCreateDirectoryManager();
506 }
507 }
syntax highlighted by Code2HTML, v. 0.9.1