#!/usr/bin/env python """Directory synchronizer, designed to sync Plone FileSystem Storage with a remote CurlFTPfs locally mounted mirror. It has a real-time INotify watcher that automatically propagate any change to the mirror, and a batch re-synchronizer that checks every file to assure it is up-to-date.""" ends=['_image','_file','_image.rdf','_file.rdf'] base='/hdb/zope/instance/var/' fss1=base+'agescimo_fss/' fss2=base+'modena1_fss/' dest='/media/agescimo/FileSystemStorage/' mountpoint='/media/agescimo' # Curlftpfs mount point timeout=30 # Timeout to force reconnection (a mountpoint listing) from pyinotify import WatchManager, Notifier, ThreadedNotifier, EventsCodes, ProcessEvent from commands import getstatusoutput as go from os import makedirs,stat,walk from os.path import isdir,isfile,dirname,exists from time import strftime,time,sleep from traceback import format_exc from shutil import copy2 as cp from sys import argv,exit last=time() ####################### # INotify-based real-time synchronizer (FSSyncd) ####################### verbosity=10 loglevel=10 class logging: """logging aid""" def __init__(self,path,mode): self.path=path self.mode=mode def __call__(self,words,level=0): if type(words)==type('string'): words=words.split('\n') msg=strftime('%X %x')+' '+words[0]+'\n' if len(words)>1: for line in words[1:]: msg+='\t|||\t '+line+'\n' if verbosity>=level: print msg if loglevel>=level: open(self.path,self.mode).write(msg) log=logging('/var/log/fssyncd.log','a') def proc(s): """should the file be synchronized?""" for e in ends: if s.endswith(e): return True return False def reconnect(): """force curlftpfs reconncetion. It simply lists the mounted path: after a long time, the connection falls and coping/removing something under the mountpoint will result in an error.""" global last if time()>last+timeout: o=go('ls %s;ls %s' % (mountpoint,mountpoint) ) last=time() # Following tutorial: http://pyinotify.sourceforge.net/ class PTmp(ProcessEvent): def process_IN_CREATE(self, event,desc='Create'): """Runs when new file/dir has been created or modified""" C=event.path+'/'+event.name to=C.replace(base,dest) sleep(3) reconnect() if isfile(C) and proc(C): if not exists(dirname(to)): makedirs(dirname(to)) cp(C,to) log("%s: %s, %s" % (desc,C,to)) def process_IN_DELETE(self, event): """Runs when a file/dir has been deleted""" D=event.path+'/'+event.name rm=D.replace(base,dest) reconnect() if exists(rm): o=go('rm -r %s' %rm)[1] log('Remove: %s, %s (%s)' % (D,rm,o)) def process_IN_MODIFY(self,event): """Redirect to IN_CREATE function (it's the same except for logging info)""" self.process_IN_CREATE(event,desc='Update') def daemon(): wm = WatchManager() mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE | EventsCodes.IN_MODIFY # watched events notifier = Notifier(wm, PTmp()) wdd1 = wm.add_watch(fss1, mask, rec=True, auto_add=True) # I have 2 filesystemstorage dirs to sync with remote mirror wdd2 = wm.add_watch(fss2, mask, rec=True, auto_add=True) while True: # loop forever try: notifier.process_events() if notifier.check_events(): # read notified events and enqeue them notifier.read_events() except KeyboardInterrupt: # destroy the inotify's instance on this interrupt (stop monitoring) notifier.stop() break except: err=format_exc() log('EXCEPTION: '+err) argv+=['',''] if 'fssyncd' in argv[0] or argv[1]=='daemon': daemon() exit(0) ############### # Batch Re-Synchornizer (Atom FSSync) ############### log=logging('/var/log/atom-fssync.log','a') transfer=0 itime=time() done=[] fses=[(fss1,fss1.replace(base,dest)),(fss2,fss2.replace(base,dest))] try: done=open('/var/log/fssync.status','r').read().splitlines() except: pass commit=open('/var/log/fssync.status','a') def copy(f,d): if not ( isfile(f) and proc(f) ) or f in done: return 0 fi=stat(f) if not exists(d): if not exists(dirname(d)): makedirs(dirname(d)) cp(f,d) log('Create: %s, %s' %(f,d)) return fi.st_size di=stat(d) if di.st_ctime!=fi.st_ctime: if not exists(dirname(d)): makedirs(dirname(d)) cp(f,d) log('Update: %s, %s' %(f,d)) return fi.st_size def sync(fs): global done,transfer,commit,passed for root, dirs, files in walk(fs[0]): for f in files: f=root+'/'+f d=f.replace(fs[0],fs[1]) add=copy(f,d) if add!=0: transfer+=add now=time() kb=transfer/8000. print 'Transferred: %.2f Kbytes, %.1f Kbytes/sec' % (kb,kb/(now-itime)) commit.write(f+'\n') def atom(): global commit,fses,itime for fs in fses: print 'Now synchronizing: ',fs[0],fs[1] sync(fs) commit.close() if transfer==0: open('/var/log/fssync.status','w').close() if 'fssync' in argv[0] or argv[1]=='atom': atom() log('Total: %.2f Kbytes, %.1f Kbytes/sec' % (kb,kb/(now-itime)) ) exit(0)