strix

A simple web UI for motion
git clone https://www.brianlane.com/git/strix
Log | Files | Refs | LICENSE

events.py (12576B)


      1 # events.py
      2 #
      3 # Copyright (C) 2017 Brian C. Lane
      4 #
      5 # This program is free software; you can redistribute it and/or modify
      6 # it under the terms of the GNU General Public License as published by
      7 # the Free Software Foundation; either version 2 of the License, or
      8 # (at your option) any later version.
      9 #
     10 # This program is distributed in the hope that it will be useful,
     11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 # GNU General Public License for more details.
     14 #
     15 # You should have received a copy of the GNU General Public License
     16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 from datetime import datetime, timedelta
     18 from glob import glob
     19 import json
     20 import multiprocessing as mp
     21 import os
     22 from pathlib import Path
     23 import re
     24 import shutil
     25 import tempfile
     26 import threading
     27 
     28 import structlog
     29 
     30 class EventCacheClass:
     31     def __init__(self):
     32         self._log = None
     33         self._base_dir = "/invalid/path/for/expire"
     34         self._last_check = datetime.now()
     35         self._check_cache = 60
     36         self._keep_days = 9999
     37         self._lock = threading.Lock()
     38         self._cache = {}
     39 
     40     def cleanup_dq(self):
     41         """
     42         Cleanup any delete_queue subdirectories that may be leftover from previous run
     43         """
     44 
     45         def dth_fn(dq_dirs):
     46             for dq in dq_dirs:
     47                 if not dq.startswith(self._base_dir):
     48                     raise RuntimeError(f"Invalid dq path: {dq}")
     49 
     50                 shutil.rmtree(dq, ignore_errors=True)
     51 
     52         base = os.path.join(self._base_dir, "delete_queue")
     53         dq_dirs = [os.path.join(base, dq) for dq in os.listdir(base)]
     54 
     55         # Start a thread to do the actual delete in the background
     56         dth = mp.Process(name="delete-thread",
     57                             target=dth_fn,
     58                             args=(dq_dirs,))
     59         dth.start()
     60 
     61     def get(self, key):
     62         with self._lock:
     63             return self._cache[key]
     64 
     65     def set(self, key, value):
     66         with self._lock:
     67             # Convert start/end to datetime object
     68             if "start" in value and type(value["start"]) == type(""):
     69                 value["start"] = datetime.fromisoformat(value["start"])
     70             if "end" in value and type(value["end"]) == type(""):
     71                 value["end"] = datetime.fromisoformat(value["end"])
     72 
     73             # Add missing title (old cache will not have this new field)
     74             if "title" not in value:
     75                 value["title"] = value["start"].strftime("%a %b %d %I:%M:%S %p"),
     76 
     77             self._cache[key] = value
     78 
     79             # This can potentially remove the key just added if it is an old event
     80             self._expire_events()
     81 
     82             # Return True if it was added to the cache
     83             return key in self._cache
     84 
     85     def events(self, camera=None, reverse=False):
     86         """
     87         Return a sorted list of the events
     88         """
     89         if not camera:
     90             return sorted(self._cache.keys(), reverse=reverse)
     91 
     92         cp = self._base_dir + "/" + camera
     93         # limit results to the selected camera
     94         return sorted((p for p in self._cache.keys() if p.startswith(cp)), reverse=reverse)
     95 
     96     def base_dir(self, base_dir):
     97         with self._lock:
     98             self._base_dir = base_dir
     99 
    100     def logger(self, logger):
    101         with self._lock:
    102             self._log = logger
    103 
    104     def keep(self, days):
    105         with self._lock:
    106             self._keep_days = days
    107 
    108     def check_cache(self, minutes):
    109         with self._lock:
    110             self._check_cache = minutes
    111 
    112     def reset_check(self):
    113         with self._lock:
    114             self._last_check = datetime(1985, 10, 26, 1, 22, 0)
    115 
    116     def log_info(self, *args):
    117         if self._log:
    118             self._log.info(*args)
    119 
    120     def log_error(self, *args):
    121         if self._log:
    122             self._log.error(*args)
    123 
    124     def _expire_events(self):
    125         start = datetime.now()
    126 
    127         if start - self._last_check < timedelta(minutes=self._check_cache):
    128             return
    129         self._last_check = datetime.now()
    130 
    131         self.log_info("Checking cache...")
    132 
    133         remove = {}
    134         for e in self._cache:
    135             if self._cache[e]["start"] < datetime.now() - timedelta(days=self._keep_days):
    136                 if "event_path" in self._cache[e] \
    137                    and self._cache[e]["event_path"].startswith(self._base_dir):
    138                     daypath = os.path.dirname(self._cache[e]["event_path"].rstrip("/"))
    139                     if daypath in remove:
    140                         remove[daypath].append(e)
    141                     else:
    142                         remove[daypath] = [e]
    143 
    144         self.log_info(f"Done checking cache in {datetime.now()-start}")
    145 
    146         if len(remove) == 0:
    147             return
    148 
    149         # The result of the above is a dict (remove) with daily lists of events to be
    150         # removed. NOTE that this may not be ALL the day's events so it needs to move
    151         # them individually, but needs to use the Camera and date to prevent collisions
    152         # with other cameras while waiting for the delete to run in the background.
    153 
    154         # Create the temporary delete_queue directory
    155         delete_queue = tempfile.mkdtemp(dir=os.path.join(self._base_dir, "delete_queue"))
    156 
    157         # Move each day's directory to the temporary delete_queue directory
    158         for daypath in remove:
    159             # All paths should have a Camera* component
    160             cm = re.search("(Camera\d+)", daypath)
    161             if not cm:
    162                 self.log_error(f"Camera* missing from path {daypath}")
    163 
    164             if cm and os.path.exists(daypath):
    165                 # Make a directory for the day's events
    166                 daydir = os.path.basename(daypath)
    167                 dqdir = os.path.join(delete_queue, cm.group(), daydir)
    168                 if not os.path.exists(dqdir):
    169                     os.makedirs(dqdir)
    170 
    171                 # Move the expired events into the delete_queue/Camera*/YYYY-MM-DD/ directory
    172                 for e in remove[daypath]:
    173                     self.log_info(f"MOVE: {e} -> {dqdir}")
    174                     shutil.move(e, dqdir)
    175 
    176             # Remove the events from the cache
    177             self.log_info(f"Removing {len(remove[daypath])} events from {daypath}")
    178             for e in remove[daypath]:
    179                 del self._cache[e]
    180 
    181         self.log_info(f"Expire of {len(remove)} directories took: {datetime.now()-start}")
    182 
    183         def dth_fn(delete_queue):
    184             shutil.rmtree(delete_queue, ignore_errors=True)
    185 
    186         # Start a thread to do the actual delete in the background
    187         dth = mp.Process(name="delete-thread",
    188                             target=dth_fn,
    189                             args=(delete_queue,))
    190         dth.start()
    191 
    192 
    193 # Singleton
    194 EventCache = EventCacheClass()
    195 
    196 
    197 def preload_cache(log, base_dir):
    198     log.info("Pre-loading event cache...")
    199 
    200     total = timedelta()
    201     for camera in sorted(c for c in os.listdir(base_dir) if c.startswith("Camera")):
    202         start = datetime.now()
    203 
    204         # YYYY-MM-DD/HH-MM-SS is the format of the event directories.
    205         glob_path="%s/%s/????-??-??/??-??-??" % (base_dir, camera)
    206         all_camera_events = sorted(glob(glob_path), reverse=True)
    207         for event_path in all_camera_events:
    208             _ = event_details(log, event_path)
    209         log.info(f"{camera} event cache loaded in {datetime.now()-start} seconds")
    210         total += datetime.now()-start
    211 
    212         # Check for unprocessed directories that don't fit the pattern
    213         # These are likely events that were never processed due to a restart
    214         glob_all_path="%s/%s/????-??-??/*" % (base_dir, camera)
    215         all_dirs = sorted(glob(glob_all_path), reverse=True)
    216         diff_dirs = set(all_dirs) - set(all_camera_events)
    217 
    218         # Write new queue entries for these unprocessed directories
    219         # /strix/media/queue/Camera%t_%Y-%m-%d_%v
    220         # Remove the base dir, Replace the / with _, and touch a file
    221         for d in diff_dirs:
    222             event = d.removeprefix(base_dir).lstrip("/").replace("/", "_")
    223             td = Path(os.path.join(base_dir, "queue", event))
    224             log.debug(f"Recreating unprocessed event - {td}")
    225             td.touch()
    226 
    227     log.info(f"Event cache loaded in {total} seconds")
    228 
    229     # Next event will check for expired entries
    230     EventCache.reset_check()
    231 
    232 
    233 def path_to_dt(path):
    234     # Use the last 2 elements of the path to construct a Datatime
    235     (date, time) = path.split("/")[-2:]
    236     time = time.replace("-", ":")
    237     dt_str = "{0} {1}".format(date, time)
    238     return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
    239 
    240 def image_to_dt(event_date, image):
    241     """ Convert an event date (YYYY-MM-DD) and image HH-MM-SS-FF
    242 
    243     returns a datetime object
    244     """
    245     # Trim off .jpg and the frame count
    246     image_time = image.rsplit('-', 1)[0]
    247     return datetime.strptime(event_date+"/"+image_time, "%Y-%m-%d/%H-%M-%S")
    248 
    249 
    250 def event_details(log, event_path):
    251     # Check the cache for the details
    252     try:
    253         return EventCache.get(event_path)
    254     except KeyError:
    255         pass
    256 
    257     # Try the file cache next
    258     try:
    259         if os.path.exists(event_path+"/.details.json"):
    260             with open(event_path+"/.details.json") as f:
    261                 details = json.load(f)
    262 
    263             # Adding to the cache can potentially expire old events
    264             ok = EventCache.set(event_path, details)
    265             if ok:
    266                 return details
    267             else:
    268                 return None
    269     except json.decoder.JSONDecodeError:
    270         log.error("Error reading .details.json from %s", event_path)
    271 
    272     (camera_name, event_date, event_time) = event_path.rsplit("/", 3)[-3:]
    273 
    274     # Grab the camera, date, and time and build the URL path
    275     url = "motion/"+"/".join([camera_name, event_date, event_time])
    276 
    277     # Get the list of images, skipping thumbnail.jpg
    278     images = []
    279     for i in sorted(glob(event_path+"/*.jpg")):
    280         if "thumbnail.jpg" in i:
    281             continue
    282         images.append(os.path.basename(i))
    283 
    284     if os.path.exists(event_path+"/thumbnail.jpg"):
    285         thumbnail = url+"/thumbnail.jpg"
    286     elif images:
    287         # No thumbnail, use the 25% position image
    288         thumbnail = url+"/"+images[len(images)//4]
    289     else:
    290         thumbnail = "images/missing.jpg"
    291 
    292     if images:
    293         start_time = image_to_dt(event_date, images[0])
    294         end_time   = image_to_dt(event_date, images[-1])
    295     else:
    296         # XXX How to handle an empty directory?
    297         start_time = datetime.now()
    298         end_time = datetime.now()
    299 
    300     # Find the videos, if they exist
    301     video = []
    302     for pth in [event_path, event_path+"/debug"]:
    303         for ext in ["m4v", "webm", "mp4", "ogg"]:
    304             if os.path.exists(pth+"/video."+ext):
    305                 video.append(url+"/video."+ext)
    306                 break
    307         else:
    308             video.append("images/missing.jpg")
    309 
    310     is_saved = os.path.exists(event_path+"/.saved")
    311 
    312     details = {
    313         "start":        start_time,
    314         "end":          end_time,
    315         "title":        start_time.strftime("%a %b %d %I:%M:%S %p"),
    316         "video":        video[0],
    317         "debug_video":  video[1],
    318         "thumbnail":    thumbnail,
    319         "images":       [],
    320         "saved":        is_saved,
    321         "event_path":   event_path,
    322     }
    323 
    324     # Adding to the cache can potentially expire it if it was an old event
    325     ok = EventCache.set(event_path, details)
    326     if not ok:
    327         return None
    328 
    329     with open(event_path+"/.details.json", "w") as f:
    330         json.dump(details, f, default=str)
    331     return details
    332 
    333 
    334 def camera_events(log, base_dir, camera, start, end, offset, limit):
    335     # Newest to oldest, limited by offset and limit
    336     skipped = 0
    337     added = 0
    338     events = []
    339     for event_path in EventCache.events(camera=camera, reverse=True):
    340         dt = path_to_dt(event_path)
    341         if dt < start or dt > end:
    342             continue
    343         if skipped < offset:
    344             skipped += 1
    345             continue
    346 
    347         details = event_details(log, event_path)
    348         if details is not None:
    349             events.insert(0, details)
    350 
    351         added += 1
    352         if limit > 0 and added >= limit:
    353             break
    354 
    355     return events
    356 
    357 def queue_events(log, queue_rx):
    358     """
    359     Loop, reading new event paths from the Pipe (the queue mp thread is at the other end)
    360     and adding their details to the EventCache
    361     """
    362     while True:
    363         try:
    364             if not queue_rx.poll(10):
    365                 continue
    366 
    367             event_path = queue_rx.recv()
    368         except EOFError:
    369             break
    370 
    371         _ = event_details(log, event_path)