strix

A simple web UI for motion
git clone https://www.brianlane.com/git/strix
Log | Files | Refs | LICENSE

events.py (11673B)


      1 # events.py
      2 #
      3 # Copyright (C) 2017 Brian C. Lane
      4 #
      5 # This program is free software; you can redistribute it and/or modify
      6 # it under the terms of the GNU General Public License as published by
      7 # the Free Software Foundation; either version 2 of the License, or
      8 # (at your option) any later version.
      9 #
     10 # This program is distributed in the hope that it will be useful,
     11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 # GNU General Public License for more details.
     14 #
     15 # You should have received a copy of the GNU General Public License
     16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 from datetime import datetime, timedelta
     18 from glob import glob
     19 import json
     20 import multiprocessing as mp
     21 import os
     22 import re
     23 import shutil
     24 import tempfile
     25 import threading
     26 
     27 import structlog
     28 
     29 class EventCacheClass:
     30     def __init__(self):
     31         self._log = None
     32         self._base_dir = "/invalid/path/for/expire"
     33         self._last_check = datetime.now()
     34         self._check_cache = 60
     35         self._keep_days = 9999
     36         self._lock = threading.Lock()
     37         self._cache = {}
     38 
     39     def cleanup_dq(self):
     40         """
     41         Cleanup any delete_queue subdirectories that may be leftover from previous run
     42         """
     43 
     44         def dth_fn(dq_dirs):
     45             for dq in dq_dirs:
     46                 if not dq.startswith(self._base_dir):
     47                     raise RuntimeError(f"Invalid dq path: {dq}")
     48 
     49                 shutil.rmtree(dq, ignore_errors=True)
     50 
     51         base = os.path.join(self._base_dir, "delete_queue")
     52         dq_dirs = [os.path.join(base, dq) for dq in os.listdir(base)]
     53 
     54         # Start a thread to do the actual delete in the background
     55         dth = mp.Process(name="delete-thread",
     56                             target=dth_fn,
     57                             args=(dq_dirs,))
     58         dth.start()
     59 
     60     def get(self, key):
     61         with self._lock:
     62             return self._cache[key]
     63 
     64     def set(self, key, value):
     65         with self._lock:
     66             # Convert start/end to datetime object
     67             if "start" in value and type(value["start"]) == type(""):
     68                 value["start"] = datetime.fromisoformat(value["start"])
     69             if "end" in value and type(value["end"]) == type(""):
     70                 value["end"] = datetime.fromisoformat(value["end"])
     71 
     72             # Add missing title (old cache will not have this new field)
     73             if "title" not in value:
     74                 value["title"] = value["start"].strftime("%a %b %d %I:%M:%S %p"),
     75 
     76             self._cache[key] = value
     77 
     78             # This can potentially remove the key just added if it is an old event
     79             self._expire_events()
     80 
     81             # Return True if it was added to the cache
     82             return key in self._cache
     83 
     84     def events(self, camera=None, reverse=False):
     85         """
     86         Return a sorted list of the events
     87         """
     88         if not camera:
     89             return sorted(self._cache.keys(), reverse=reverse)
     90 
     91         cp = self._base_dir + "/" + camera
     92         # limit results to the selected camera
     93         return sorted((p for p in self._cache.keys() if p.startswith(cp)), reverse=reverse)
     94 
     95     def base_dir(self, base_dir):
     96         with self._lock:
     97             self._base_dir = base_dir
     98 
     99     def logger(self, logger):
    100         with self._lock:
    101             self._log = logger
    102 
    103     def keep(self, days):
    104         with self._lock:
    105             self._keep_days = days
    106 
    107     def check_cache(self, minutes):
    108         with self._lock:
    109             self._check_cache = minutes
    110 
    111     def reset_check(self):
    112         with self._lock:
    113             self._last_check = datetime(1985, 10, 26, 1, 22, 0)
    114 
    115     def log_info(self, *args):
    116         if self._log:
    117             self._log.info(*args)
    118 
    119     def log_error(self, *args):
    120         if self._log:
    121             self._log.error(*args)
    122 
    123     def _expire_events(self):
    124         start = datetime.now()
    125 
    126         if start - self._last_check < timedelta(minutes=self._check_cache):
    127             return
    128         self._last_check = datetime.now()
    129 
    130         self.log_info("Checking cache...")
    131 
    132         remove = {}
    133         for e in self._cache:
    134             if self._cache[e]["start"] < datetime.now() - timedelta(days=self._keep_days):
    135                 if "event_path" in self._cache[e] \
    136                    and self._cache[e]["event_path"].startswith(self._base_dir):
    137                     daypath = os.path.dirname(self._cache[e]["event_path"].rstrip("/"))
    138                     if daypath in remove:
    139                         remove[daypath].append(e)
    140                     else:
    141                         remove[daypath] = [e]
    142 
    143         self.log_info(f"Done checking cache in {datetime.now()-start}")
    144 
    145         if len(remove) == 0:
    146             return
    147 
    148         # The result of the above is a dict (remove) with daily lists of events to be
    149         # removed. NOTE that this may not be ALL the day's events so it needs to move
    150         # them individually, but needs to use the Camera and date to prevent collisions
    151         # with other cameras while waiting for the delete to run in the background.
    152 
    153         # Create the temporary delete_queue directory
    154         delete_queue = tempfile.mkdtemp(dir=os.path.join(self._base_dir, "delete_queue"))
    155 
    156         # Move each day's directory to the temporary delete_queue directory
    157         for daypath in remove:
    158             # All paths should have a Camera* component
    159             cm = re.search("(Camera\d+)", daypath)
    160             if not cm:
    161                 self.log_error(f"Camera* missing from path {daypath}")
    162 
    163             if cm and os.path.exists(daypath):
    164                 # Make a directory for the day's events
    165                 daydir = os.path.basename(daypath)
    166                 dqdir = os.path.join(delete_queue, cm.group(), daydir)
    167                 if not os.path.exists(dqdir):
    168                     os.makedirs(dqdir)
    169 
    170                 # Move the expired events into the delete_queue/Camera*/YYYY-MM-DD/ directory
    171                 for e in remove[daypath]:
    172                     self.log_info(f"MOVE: {e} -> {dqdir}")
    173                     shutil.move(e, dqdir)
    174 
    175             # Remove the events from the cache
    176             self.log_info(f"Removing {len(remove[daypath])} events from {daypath}")
    177             for e in remove[daypath]:
    178                 del self._cache[e]
    179 
    180         self.log_info(f"Expire of {len(remove)} directories took: {datetime.now()-start}")
    181 
    182         def dth_fn(delete_queue):
    183             shutil.rmtree(delete_queue, ignore_errors=True)
    184 
    185         # Start a thread to do the actual delete in the background
    186         dth = mp.Process(name="delete-thread",
    187                             target=dth_fn,
    188                             args=(delete_queue,))
    189         dth.start()
    190 
    191 
    192 # Singleton
    193 EventCache = EventCacheClass()
    194 
    195 
    196 def preload_cache(log, base_dir):
    197     log.info("Pre-loading event cache...")
    198 
    199     total = timedelta()
    200     for camera in sorted(c for c in os.listdir(base_dir) if c.startswith("Camera")):
    201         start = datetime.now()
    202 
    203         # YYYY-MM-DD/HH-MM-SS is the format of the event directories.
    204         glob_path="%s/%s/????-??-??/??-??-??" % (base_dir, camera)
    205         for event_path in sorted(glob(glob_path), reverse=True):
    206             _ = event_details(log, event_path)
    207         log.info(f"{camera} event cache loaded in {datetime.now()-start} seconds")
    208         total += datetime.now()-start
    209     log.info(f"Event cache loaded in {total} seconds")
    210 
    211     # Next event will check for expired entries
    212     EventCache.reset_check()
    213 
    214 
    215 def path_to_dt(path):
    216     # Use the last 2 elements of the path to construct a Datatime
    217     (date, time) = path.split("/")[-2:]
    218     time = time.replace("-", ":")
    219     dt_str = "{0} {1}".format(date, time)
    220     return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
    221 
    222 def image_to_dt(event_date, image):
    223     """ Convert an event date (YYYY-MM-DD) and image HH-MM-SS-FF
    224 
    225     returns a datetime object
    226     """
    227     # Trim off .jpg and the frame count
    228     image_time = image.rsplit('-', 1)[0]
    229     return datetime.strptime(event_date+"/"+image_time, "%Y-%m-%d/%H-%M-%S")
    230 
    231 
    232 def event_details(log, event_path):
    233     # Check the cache for the details
    234     try:
    235         return EventCache.get(event_path)
    236     except KeyError:
    237         pass
    238 
    239     # Try the file cache next
    240     try:
    241         if os.path.exists(event_path+"/.details.json"):
    242             with open(event_path+"/.details.json") as f:
    243                 details = json.load(f)
    244 
    245             # Adding to the cache can potentially expire old events
    246             ok = EventCache.set(event_path, details)
    247             if ok:
    248                 return details
    249             else:
    250                 return None
    251     except json.decoder.JSONDecodeError:
    252         log.error("Error reading .details.json from %s", event_path)
    253 
    254     (camera_name, event_date, event_time) = event_path.rsplit("/", 3)[-3:]
    255 
    256     # Grab the camera, date, and time and build the URL path
    257     url = "motion/"+"/".join([camera_name, event_date, event_time])
    258 
    259     # Get the list of images, skipping thumbnail.jpg
    260     images = []
    261     for i in sorted(glob(event_path+"/*.jpg")):
    262         if "thumbnail.jpg" in i:
    263             continue
    264         images.append(os.path.basename(i))
    265 
    266     if os.path.exists(event_path+"/thumbnail.jpg"):
    267         thumbnail = url+"/thumbnail.jpg"
    268     elif images:
    269         thumbnail = images[len(images)//4]
    270     else:
    271         thumbnail = "images/missing.jpg"
    272 
    273     if images:
    274         start_time = image_to_dt(event_date, images[0])
    275         end_time   = image_to_dt(event_date, images[-1])
    276     else:
    277         # XXX How to handle an empty directory?
    278         start_time = datetime.now()
    279         end_time = datetime.now()
    280 
    281     # Find the videos, if they exist
    282     video = []
    283     for pth in [event_path, event_path+"/debug"]:
    284         for ext in ["m4v", "webm", "mp4", "ogg"]:
    285             if os.path.exists(pth+"/video."+ext):
    286                 video.append(url+"/video."+ext)
    287                 break
    288         else:
    289             video.append("images/missing.jpg")
    290 
    291     is_saved = os.path.exists(event_path+"/.saved")
    292 
    293     details = {
    294         "start":        start_time,
    295         "end":          end_time,
    296         "title":        start_time.strftime("%a %b %d %I:%M:%S %p"),
    297         "video":        video[0],
    298         "debug_video":  video[1],
    299         "thumbnail":    thumbnail,
    300         "images":       [],
    301         "saved":        is_saved,
    302         "event_path":   event_path,
    303     }
    304 
    305     # Adding to the cache can potentially expire it if it was an old event
    306     ok = EventCache.set(event_path, details)
    307     if not ok:
    308         return None
    309 
    310     with open(event_path+"/.details.json", "w") as f:
    311         json.dump(details, f, default=str)
    312     return details
    313 
    314 
    315 def camera_events(log, base_dir, camera, start, end, offset, limit):
    316     # Newest to oldest, limited by offset and limit
    317     skipped = 0
    318     added = 0
    319     events = []
    320     for event_path in EventCache.events(camera=camera, reverse=True):
    321         dt = path_to_dt(event_path)
    322         if dt < start or dt > end:
    323             continue
    324         if skipped < offset:
    325             skipped += 1
    326             continue
    327 
    328         details = event_details(log, event_path)
    329         if details is not None:
    330             events.insert(0, details)
    331 
    332         added += 1
    333         if limit > 0 and added >= limit:
    334             break
    335 
    336     return events
    337 
    338 def queue_events(log, queue_rx):
    339     """
    340     Loop, reading new event paths from the Pipe (the queue mp thread is at the other end)
    341     and adding their details to the EventCache
    342     """
    343     while True:
    344         try:
    345             if not queue_rx.poll(10):
    346                 continue
    347 
    348             event_path = queue_rx.recv()
    349         except EOFError:
    350             break
    351 
    352         _ = event_details(log, event_path)