Purfe

Learn, create and have fun with Python

How to monitor file system for changes with watchdog

With class abstraction
It is a follow up for my previous post on monitoring the file system for changes using Python and cross-platform library watchdog. In the post, I mentioned that it would be nice to abstract the code into separate classes to make it more reusable. Here it is.

import time
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
class MyWatch():
    """ The Observer is the class that watches for any file 
        system change and dispatches the event to the event handler."""    
    
    def __init__(self, watch_directory = '.'):
        self.observer = Observer()
        self.watch_directory = watch_directory
        self.go_recursively = True

    def run(self):
        my_event_handler = Handler()
        self.observer.schedule(my_event_handler, self.watch_directory, recursive=self.go_recursively)
        self.observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            self.observer.stop()
        self.observer.join()

class Handler(PatternMatchingEventHandler):
    """ Handler will be notified when something happens 
        to the h_patterns in the MyWatch.watch_directory."""    

    def __init__(self, h_patterns = '', h_ignore_patterns = "", h_ignore_directories = False, h_case_sensitive = True):
        self.h_patterns = h_patterns # file patterns we want to handle (all the files)
        self.h_ignore_patterns = h_ignore_patterns # patterns that we don’t want to handle
        self.h_ignore_directories = h_ignore_directories # True to be notified for regular files (not for directories)
        self.h_case_sensitive = h_case_sensitive # made the patterns “case sensitive”
        PatternMatchingEventHandler.__init__(self, patterns=self.h_patterns,
                                            ignore_patterns=self.h_ignore_patterns,
                                            ignore_directories=self.h_ignore_directories, case_sensitive=self.h_case_sensitive)

    def on_created(self, event):
        print(f"{event.src_path} has been created!")
    def on_deleted(self, event):
        print(f"Someone deleted {event.src_path}!")
    def on_modified(self, event):
        print(f"{event.src_path} has been modified")
    def on_moved(self, event):
        print(f"{event.src_path} moved to {event.dest_path}")

if __name__ == "__main__":
    watch = MyWatch()
    watch.run()

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top