1 /*------------------------------------------------------------------------------
  2 Name:      Publisher.java
  3 Project:   xmlBlaster.org
  4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE filep
  5 ------------------------------------------------------------------------------*/
  6 
  7 package org.xmlBlaster.client.filepoller;
  8 
  9 import java.util.Set;
 10 
 11 import java.util.logging.Logger;
 12 import java.util.logging.Level;
 13 import org.xmlBlaster.client.I_XmlBlasterAccess;
 14 import org.xmlBlaster.client.key.PublishKey;
 15 import org.xmlBlaster.client.qos.ConnectQos;
 16 import org.xmlBlaster.client.qos.DisconnectQos;
 17 import org.xmlBlaster.util.Global;
 18 import org.xmlBlaster.util.I_Timeout;
 19 import org.xmlBlaster.util.MsgUnit;
 20 import org.xmlBlaster.util.Timeout;
 21 import org.xmlBlaster.util.Timestamp;
 22 import org.xmlBlaster.util.XmlBlasterException;
 23 import org.xmlBlaster.util.def.Constants;
 24 import org.xmlBlaster.util.def.ErrorCode;
 25 import org.xmlBlaster.util.plugin.I_PluginConfig;
 26 import org.xmlBlaster.util.qos.ConnectQosData;
 27 
 28 
 29 /**
 30  * Publisher
 31  * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a>
 32  * @deprectated it is now replaced by the corresponding class in org.xmlBlaster.contrib.filewatcher
 33  */
 34 public class Publisher implements I_Timeout {
 35 
 36    private String ME = "Publisher";
 37    private Global global;
 38    private static Logger log = Logger.getLogger(Publisher.class.getName());
 39    private DirectoryManager directoryManager;
 40    private I_XmlBlasterAccess access;
 41    private String publishKey;
 42    private String publishQos;
 43    private ConnectQos connectQos;
 44 
 45    private long pollInterval;
 46    private long maximumFileSize;
 47    private String fileFilter;
 48    private String filterType;
 49    private String directoryName;
 50    private boolean copyOnMove;
 51    private String sent;
 52    private String discarded;
 53    private String lockExtention;
 54    private long delaySinceLastFileChange;
 55    
 56    public static final String USE_REGEX = "regex";
 57 
 58    private Timestamp timeoutHandle;
 59    private static Timeout timeout = new Timeout("FileSystem-Poller");
 60    
 61    /** used to identify if it has shut down (to get a new global) */
 62    private boolean isShutdown;
 63    /** used to break the loop in doPublish when shutting down */
 64    private boolean forceShutdown;
 65    
 66    /** only used as a default login name and logging */
 67    private String name;
 68    
 69    private boolean isActive;
 70    
 71    
 72    // private I_PluginConfig pluginConfig;
 73    
 74    public Publisher(Global globOrig, String name, I_PluginConfig pluginConfig) throws XmlBlasterException {
 75       ME += "-" + name;
 76       this.name = name;
 77       this.isShutdown = false;
 78       this.global = globOrig.getClone(globOrig.getNativeConnectArgs()); // sets session.timeout to 0 etc.
 79       // this.pluginConfig = pluginConfig;
 80 
 81       if (log.isLoggable(Level.FINER)) 
 82          log.finer(ME+": constructor");
 83       // retrieve all necessary properties:
 84       String tmp = null;
 85       tmp = this.global.get("publishKey", (String)null, null, pluginConfig);
 86       String topicName =  this.global.get("topicName", (String)null, null, pluginConfig);
 87       if (tmp != null) {
 88          // this.publishKey = new PublishKey(this.global, new MsgKeyData(this.global, tmp));
 89          this.publishKey = tmp;
 90          if (topicName != null)
 91             log.warning(ME+": constructor: since 'publishKey' is defined, 'topicName' will be ignored");
 92       }
 93       else {
 94          if (topicName == null)
 95             throw new XmlBlasterException(this.global, ErrorCode.USER_CONFIGURATION, ME, "at least one of the properties 'topicName' or 'publishKey' must be defined");
 96          this.publishKey = (new PublishKey(this.global, topicName)).toXml(); 
 97       }
 98       
 99       this.publishQos = this.global.get("publishQos", "<qos/>", null, pluginConfig);
100 
101       tmp  = this.global.get("connectQos", (String)null, null, pluginConfig);
102       if (tmp != null) {
103          ConnectQosData data = this.global.getConnectQosFactory().readObject(tmp);
104          this.connectQos = new ConnectQos(this.global, data);
105       }
106       else {
107          String userId = this.global.get("loginName", "_" + this.name, null, pluginConfig);
108          String password = this.global.get("password", (String)null, null, pluginConfig);
109          this.connectQos = new ConnectQos(this.global, userId, password);
110          this.global.addObjectEntry(Constants.OBJECT_ENTRY_ServerScope, globOrig.getObjectEntry(Constants.OBJECT_ENTRY_ServerScope));
111       }
112 
113       this.fileFilter =  this.global.get("fileFilter", (String)null, null, pluginConfig);
114       this.directoryName = this.global.get("directoryName", (String)null, null, pluginConfig);
115       if (directoryName == null)
116          throw new XmlBlasterException(this.global, ErrorCode.USER_CONFIGURATION, ME, "constructor: 'directoryName' is mandatory");
117       
118       this.maximumFileSize = this.global.get("maximumFileSize", 10000000L, null, pluginConfig);
119       this.delaySinceLastFileChange = this.global.get("delaySinceLastFileChange", 10000L, null, pluginConfig);
120       this.pollInterval = this.global.get("pollInterval", 2000L, null, pluginConfig);
121 
122       this.sent =  this.global.get("sent", (String)null, null, pluginConfig);
123       this.discarded =  this.global.get("discarded", (String)null, null, pluginConfig);
124       this.lockExtention =  this.global.get("lockExtention", (String)null, null, pluginConfig);
125      
126       // this would throw an exception and act as a validation if something is not OK in configuration
127       new MsgUnit(this.publishKey, (byte[])null, this.publishQos);
128       this.filterType = this.global.get("filterType", "simple", null, pluginConfig);
129       this.copyOnMove = this.global.get("copyOnMove", true, null, pluginConfig);
130       
131       createDirectoryManager();
132    }
133    
134    /**
135     * Create the file checker instance with the current configuration. 
136     * @throws XmlBlasterException
137     */
138    private void createDirectoryManager() throws XmlBlasterException {
139       boolean isTrueRegex = USE_REGEX.equalsIgnoreCase(filterType);
140       this.directoryManager = new DirectoryManager(this.global,
141             this.name, this.directoryName, this.delaySinceLastFileChange, 
142             this.fileFilter, this.sent, this.discarded, this.lockExtention, isTrueRegex,
143             this.copyOnMove);
144    }
145 
146    /**
147     * Useful for JMX invocations
148     */
149    private void reCreateDirectoryManager() {
150       try {
151          createDirectoryManager();
152       } catch (XmlBlasterException e) {
153          throw new IllegalArgumentException(e.getMessage());
154       }
155    }
156 
157    public String toString() {
158       return "FilePoller " + this.filterType + " directoryName=" + this.directoryName + " fileFilter='" + this.fileFilter + "'";
159    }
160    
161    /**
162     * Connects to the xmlBlaster.
163     * 
164     * @throws XmlBlasterException
165     */
166    public synchronized void init() throws XmlBlasterException {
167       if (log.isLoggable(Level.FINER)) 
168          log.finer(ME+": init");
169       if (this.isShutdown) { // on a second init
170          this.global = this.global.getClone(null);
171       }
172       this.isShutdown = false;
173       this.forceShutdown = false;
174       this.access = this.global.getXmlBlasterAccess();
175       // no callback listener (we are not subscribing and don't want ptp)
176       this.access.connect(this.connectQos, null);
177       this.isActive = true;
178       if (this.pollInterval >= 0)
179          this.timeoutHandle = timeout.addTimeoutListener(this, this.pollInterval, null);
180    }
181    
182    /**
183     * If an exception occurs it means it could not publish the entry
184     * @throws XmlBlasterException
185     */
186    public void shutdown() throws XmlBlasterException {
187       if (log.isLoggable(Level.FINER)) 
188          log.finer(ME+": shutdown");
189       timeout.removeTimeoutListener(this.timeoutHandle);
190       this.isActive = false;
191       this.forceShutdown = true; // in case doPublish is looping due to an exception
192       synchronized (this) {
193          this.isShutdown = false;
194          this.access.disconnect(new DisconnectQos(this.global));
195          this.global.shutdown();
196       }
197    }
198    
199    /**
200     * Fail-safe sending files. 
201     * @return Comman separated list of send file names
202     */
203    public synchronized void publish() {
204       while (true) {
205          try {
206             doPublish();
207             break;
208          }
209          catch (XmlBlasterException ex) {
210             log.severe(ME+": publish: exception " + ex.getMessage());
211             try {
212                Thread.sleep(this.pollInterval);
213             }
214             catch  (Exception e) {}      
215          }
216          if (this.forceShutdown)
217             break;
218       }
219    }
220 
221    /**
222     * Create a comma separated list of file names. 
223     * @param infos
224     * @param max Max file names to collect
225     * @return
226     */
227    public String toString(FileInfo[] infos, int max) {
228       StringBuffer sb = new StringBuffer();
229       if (max <= 0) max = infos.length;
230       if (max > infos.length) max = infos.length;
231       for (int i=0; i<max; i++) {
232          if (i>0) sb.append(",");
233          sb.append(infos[i].getRelativeName());
234       }
235       return sb.toString();
236    }
237    
238    /**
239     * Publish file to xmlBlaster. 
240     * @return An empty string if nothing was sent, is never null
241     * @throws XmlBlasterException
242     */
243    private FileInfo[] doPublish() throws XmlBlasterException {
244       if (log.isLoggable(Level.FINER)) 
245          log.finer(ME+": doPublish");
246       Set entries = this.directoryManager.getEntries();
247       if (entries == null || entries.size() < 1)
248          return new FileInfo[0];
249       FileInfo[] infos = (FileInfo[])entries.toArray(new FileInfo[entries.size()]);
250       for (int i=0; i < infos.length; i++) {
251          if (this.maximumFileSize <= 0L || infos[i].getSize() <= this.maximumFileSize) {
252             if (infos[i].getSize() > Integer.MAX_VALUE) {
253                log.severe(ME+": doPublish: sizes bigger than '" + Integer.MAX_VALUE + "' are currently not implemented");
254             }
255             else {
256                byte[] content = this.directoryManager.getContent(infos[i]);
257                MsgUnit msgUnit = new MsgUnit(this.publishKey, content, this.publishQos);
258                msgUnit.getQosData().addClientProperty("_fileName", infos[i].getRelativeName());
259                msgUnit.getQosData().addClientProperty("_fileDate", infos[i].getTimestamp());
260                this.access.publish(msgUnit);
261                if (log.isLoggable(Level.FINE)) 
262                   log.fine(ME+": Successfully published file " + infos[i].getRelativeName() + " with size=" +infos[i].getSize());
263             }
264 
265             while (true) { // must repeat until it works or until shut down
266                try {
267                   boolean success = true;
268                   this.directoryManager.deleteOrMoveEntry(infos[i].getName(), success);
269                   break;
270                }
271                catch (XmlBlasterException ex) {
272                   log.severe(ME+": Moving " + infos[i].getName() + " failed, we try again without further publishing (please fix manually): " + ex.getMessage());
273                   try {
274                      Thread.sleep(this.pollInterval);
275                   }
276                   catch (Exception e){}
277                }
278                if (this.forceShutdown)
279                   break;
280             }
281          }
282          else { // delete or move to 'discarded'
283             log.warning(ME+": doPublish: the file '" + infos[i].getName() + "' is too long (" + infos[i].getSize() + "'): I will remove it without publishing");
284             boolean success = false;
285             try {
286                this.directoryManager.deleteOrMoveEntry(infos[i].getName(), success);
287             }
288             catch (XmlBlasterException ex) {
289                log.warning(ME+": doPublish: could not handle file '" + infos[i].getName() + "' which was too big: check file and directories permissions and fix it manually: I will continue working anyway. " + ex.getMessage());
290             }
291          }
292       }
293       return infos;
294    }
295    
296    /**
297     * @see org.xmlBlaster.util.I_Timeout#timeout(java.lang.Object)
298     */
299    public void timeout(Object userData) {
300       try {
301          if (log.isLoggable(Level.FINER)) 
302             log.finer(ME+": timeout");
303          publish();
304       }
305       catch (Throwable ex) {
306          ex.printStackTrace();
307          log.severe(ME+": timeout: " + ex.getMessage());
308       }
309       finally {
310          if (this.pollInterval >= 0)
311             this.timeoutHandle = timeout.addTimeoutListener(this, this.pollInterval, null);
312       }
313    }
314 
315    public void activate() throws Exception {
316       if (!this.isActive) {
317          this.isActive = true;
318          if (this.pollInterval >= 0)
319             this.timeoutHandle = timeout.addTimeoutListener(this, this.pollInterval, null);
320       }
321    }
322 
323    /* (non-Javadoc)
324     * @see org.xmlBlaster.util.admin.I_AdminService#deActivate()
325     */
326    public void deActivate() {
327       timeout.removeTimeoutListener(this.timeoutHandle);
328       this.timeoutHandle = null;
329       this.isActive = false;
330    }
331 
332    /* (non-Javadoc)
333     * @see org.xmlBlaster.util.admin.I_AdminService#isActive()
334     */
335    public boolean isActive() {
336       return this.isActive;
337    }
338    
339    public String triggerScan() {
340       try {
341          //this.timeoutHandle = timeout.addTimeoutListener(this, 0, null);
342          // Hack: I need to call it twice to be effective, why? (Marcel 2006-01)
343          for (int i=0; i<2; i++) {
344             FileInfo[] infos = doPublish();
345             if (infos.length > 0) {
346                return "Published matching files '" + toString(infos, 10) + "'";
347             }
348          }
349          if (this.delaySinceLastFileChange > 0)
350             return "No matching file found to publish, note that it may take delaySinceLastFileChange=" + this.delaySinceLastFileChange + " millis until the file is sent.";
351          else
352             return "No matching file found to publish.";
353       } catch (XmlBlasterException e) {
354          throw new IllegalArgumentException(e.getMessage());
355       }
356    }
357 
358    /**
359     * @return Returns the directoryName.
360     */
361    public String getDirectoryName() {
362       return this.directoryName;
363    }
364 
365    /**
366     * @param directoryName The directoryName to set.
367     */
368    public void setDirectoryName(String directoryName) {
369       this.directoryName = directoryName;
370       reCreateDirectoryManager();
371    }
372 
373    /**
374     * @return Returns the fileFilter.
375     */
376    public String getFileFilter() {
377       return this.fileFilter;
378    }
379 
380    /**
381     * @param fileFilter The fileFilter to set.
382     */
383    public void setFileFilter(String fileFilter) {
384       this.fileFilter = fileFilter;
385       reCreateDirectoryManager();
386    }
387 
388    /**
389     * @return Returns the filterType.
390     */
391    public String getFilterType() {
392       return this.filterType;
393    }
394 
395    /**
396     * @param filterType The filterType to set.
397     */
398    public void setFilterType(String filterType) {
399       this.filterType = filterType;
400       reCreateDirectoryManager();
401    }
402 
403    /**
404     * @return Returns the maximumFileSize.
405     */
406    public long getMaximumFileSize() {
407       return this.maximumFileSize;
408    }
409 
410    /**
411     * @param maximumFileSize The maximumFileSize to set.
412     */
413    public void setMaximumFileSize(long maximumFileSize) {
414       this.maximumFileSize = maximumFileSize;
415    }
416 
417    /**
418     * @return Returns the pollInterval.
419     */
420    public long getPollInterval() {
421       return this.pollInterval;
422    }
423 
424    /**
425     * @param pollInterval The pollInterval to set.
426     */
427    public void setPollInterval(long pollInterval) {
428       this.pollInterval = pollInterval;
429       if (this.pollInterval < 0)
430          deActivate();
431    }
432 
433    /**
434     * @return Returns the copyOnMove.
435     */
436    public boolean isCopyOnMove() {
437       return this.copyOnMove;
438    }
439 
440    /**
441     * @param copyOnMove The copyOnMove to set.
442     */
443    public void setCopyOnMove(boolean copyOnMove) {
444       this.copyOnMove = copyOnMove;
445       reCreateDirectoryManager();
446    }
447 
448    /**
449     * @return Returns the delaySinceLastFileChange.
450     */
451    public long getDelaySinceLastFileChange() {
452       return this.delaySinceLastFileChange;
453    }
454 
455    /**
456     * @param delaySinceLastFileChange The delaySinceLastFileChange to set.
457     */
458    public void setDelaySinceLastFileChange(long delaySinceLastFileChange) {
459       this.delaySinceLastFileChange = delaySinceLastFileChange;
460       reCreateDirectoryManager();
461    }
462 
463    /**
464     * @return Returns the discarded.
465     */
466    public String getDiscarded() {
467       return this.discarded;
468    }
469 
470    /**
471     * @param discarded The discarded to set.
472     */
473    public void setDiscarded(String discarded) {
474       this.discarded = discarded;
475       reCreateDirectoryManager();
476    }
477 
478    /**
479     * @return Returns the lockExtention.
480     */
481    public String getLockExtention() {
482       return this.lockExtention;
483    }
484 
485    /**
486     * @param lockExtention The lockExtention to set.
487     */
488    public void setLockExtention(String lockExtention) {
489       this.lockExtention = lockExtention;
490       reCreateDirectoryManager();
491    }
492 
493    /**
494     * @return Returns the sent.
495     */
496    public String getSent() {
497       return this.sent;
498    }
499 
500    /**
501     * @param sent The sent to set.
502     */
503    public void setSent(String sent) {
504       this.sent = sent;
505       reCreateDirectoryManager();
506    }
507 }


syntax highlighted by Code2HTML, v. 0.9.1