events.py (12576B)
1 # events.py 2 # 3 # Copyright (C) 2017 Brian C. Lane 4 # 5 # This program is free software; you can redistribute it and/or modify 6 # it under the terms of the GNU General Public License as published by 7 # the Free Software Foundation; either version 2 of the License, or 8 # (at your option) any later version. 9 # 10 # This program is distributed in the hope that it will be useful, 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 # GNU General Public License for more details. 14 # 15 # You should have received a copy of the GNU General Public License 16 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 from datetime import datetime, timedelta 18 from glob import glob 19 import json 20 import multiprocessing as mp 21 import os 22 from pathlib import Path 23 import re 24 import shutil 25 import tempfile 26 import threading 27 28 import structlog 29 30 class EventCacheClass: 31 def __init__(self): 32 self._log = None 33 self._base_dir = "/invalid/path/for/expire" 34 self._last_check = datetime.now() 35 self._check_cache = 60 36 self._keep_days = 9999 37 self._lock = threading.Lock() 38 self._cache = {} 39 40 def cleanup_dq(self): 41 """ 42 Cleanup any delete_queue subdirectories that may be leftover from previous run 43 """ 44 45 def dth_fn(dq_dirs): 46 for dq in dq_dirs: 47 if not dq.startswith(self._base_dir): 48 raise RuntimeError(f"Invalid dq path: {dq}") 49 50 shutil.rmtree(dq, ignore_errors=True) 51 52 base = os.path.join(self._base_dir, "delete_queue") 53 dq_dirs = [os.path.join(base, dq) for dq in os.listdir(base)] 54 55 # Start a thread to do the actual delete in the background 56 dth = mp.Process(name="delete-thread", 57 target=dth_fn, 58 args=(dq_dirs,)) 59 dth.start() 60 61 def get(self, key): 62 with self._lock: 63 return self._cache[key] 64 65 def set(self, key, value): 66 with self._lock: 67 # Convert start/end to datetime object 68 if "start" in value and type(value["start"]) == type(""): 69 value["start"] = datetime.fromisoformat(value["start"]) 70 if "end" in value and type(value["end"]) == type(""): 71 value["end"] = datetime.fromisoformat(value["end"]) 72 73 # Add missing title (old cache will not have this new field) 74 if "title" not in value: 75 value["title"] = value["start"].strftime("%a %b %d %I:%M:%S %p"), 76 77 self._cache[key] = value 78 79 # This can potentially remove the key just added if it is an old event 80 self._expire_events() 81 82 # Return True if it was added to the cache 83 return key in self._cache 84 85 def events(self, camera=None, reverse=False): 86 """ 87 Return a sorted list of the events 88 """ 89 if not camera: 90 return sorted(self._cache.keys(), reverse=reverse) 91 92 cp = self._base_dir + "/" + camera 93 # limit results to the selected camera 94 return sorted((p for p in self._cache.keys() if p.startswith(cp)), reverse=reverse) 95 96 def base_dir(self, base_dir): 97 with self._lock: 98 self._base_dir = base_dir 99 100 def logger(self, logger): 101 with self._lock: 102 self._log = logger 103 104 def keep(self, days): 105 with self._lock: 106 self._keep_days = days 107 108 def check_cache(self, minutes): 109 with self._lock: 110 self._check_cache = minutes 111 112 def reset_check(self): 113 with self._lock: 114 self._last_check = datetime(1985, 10, 26, 1, 22, 0) 115 116 def log_info(self, *args): 117 if self._log: 118 self._log.info(*args) 119 120 def log_error(self, *args): 121 if self._log: 122 self._log.error(*args) 123 124 def _expire_events(self): 125 start = datetime.now() 126 127 if start - self._last_check < timedelta(minutes=self._check_cache): 128 return 129 self._last_check = datetime.now() 130 131 self.log_info("Checking cache...") 132 133 remove = {} 134 for e in self._cache: 135 if self._cache[e]["start"] < datetime.now() - timedelta(days=self._keep_days): 136 if "event_path" in self._cache[e] \ 137 and self._cache[e]["event_path"].startswith(self._base_dir): 138 daypath = os.path.dirname(self._cache[e]["event_path"].rstrip("/")) 139 if daypath in remove: 140 remove[daypath].append(e) 141 else: 142 remove[daypath] = [e] 143 144 self.log_info(f"Done checking cache in {datetime.now()-start}") 145 146 if len(remove) == 0: 147 return 148 149 # The result of the above is a dict (remove) with daily lists of events to be 150 # removed. NOTE that this may not be ALL the day's events so it needs to move 151 # them individually, but needs to use the Camera and date to prevent collisions 152 # with other cameras while waiting for the delete to run in the background. 153 154 # Create the temporary delete_queue directory 155 delete_queue = tempfile.mkdtemp(dir=os.path.join(self._base_dir, "delete_queue")) 156 157 # Move each day's directory to the temporary delete_queue directory 158 for daypath in remove: 159 # All paths should have a Camera* component 160 cm = re.search("(Camera\d+)", daypath) 161 if not cm: 162 self.log_error(f"Camera* missing from path {daypath}") 163 164 if cm and os.path.exists(daypath): 165 # Make a directory for the day's events 166 daydir = os.path.basename(daypath) 167 dqdir = os.path.join(delete_queue, cm.group(), daydir) 168 if not os.path.exists(dqdir): 169 os.makedirs(dqdir) 170 171 # Move the expired events into the delete_queue/Camera*/YYYY-MM-DD/ directory 172 for e in remove[daypath]: 173 self.log_info(f"MOVE: {e} -> {dqdir}") 174 shutil.move(e, dqdir) 175 176 # Remove the events from the cache 177 self.log_info(f"Removing {len(remove[daypath])} events from {daypath}") 178 for e in remove[daypath]: 179 del self._cache[e] 180 181 self.log_info(f"Expire of {len(remove)} directories took: {datetime.now()-start}") 182 183 def dth_fn(delete_queue): 184 shutil.rmtree(delete_queue, ignore_errors=True) 185 186 # Start a thread to do the actual delete in the background 187 dth = mp.Process(name="delete-thread", 188 target=dth_fn, 189 args=(delete_queue,)) 190 dth.start() 191 192 193 # Singleton 194 EventCache = EventCacheClass() 195 196 197 def preload_cache(log, base_dir): 198 log.info("Pre-loading event cache...") 199 200 total = timedelta() 201 for camera in sorted(c for c in os.listdir(base_dir) if c.startswith("Camera")): 202 start = datetime.now() 203 204 # YYYY-MM-DD/HH-MM-SS is the format of the event directories. 205 glob_path="%s/%s/????-??-??/??-??-??" % (base_dir, camera) 206 all_camera_events = sorted(glob(glob_path), reverse=True) 207 for event_path in all_camera_events: 208 _ = event_details(log, event_path) 209 log.info(f"{camera} event cache loaded in {datetime.now()-start} seconds") 210 total += datetime.now()-start 211 212 # Check for unprocessed directories that don't fit the pattern 213 # These are likely events that were never processed due to a restart 214 glob_all_path="%s/%s/????-??-??/*" % (base_dir, camera) 215 all_dirs = sorted(glob(glob_all_path), reverse=True) 216 diff_dirs = set(all_dirs) - set(all_camera_events) 217 218 # Write new queue entries for these unprocessed directories 219 # /strix/media/queue/Camera%t_%Y-%m-%d_%v 220 # Remove the base dir, Replace the / with _, and touch a file 221 for d in diff_dirs: 222 event = d.removeprefix(base_dir).lstrip("/").replace("/", "_") 223 td = Path(os.path.join(base_dir, "queue", event)) 224 log.debug(f"Recreating unprocessed event - {td}") 225 td.touch() 226 227 log.info(f"Event cache loaded in {total} seconds") 228 229 # Next event will check for expired entries 230 EventCache.reset_check() 231 232 233 def path_to_dt(path): 234 # Use the last 2 elements of the path to construct a Datatime 235 (date, time) = path.split("/")[-2:] 236 time = time.replace("-", ":") 237 dt_str = "{0} {1}".format(date, time) 238 return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S") 239 240 def image_to_dt(event_date, image): 241 """ Convert an event date (YYYY-MM-DD) and image HH-MM-SS-FF 242 243 returns a datetime object 244 """ 245 # Trim off .jpg and the frame count 246 image_time = image.rsplit('-', 1)[0] 247 return datetime.strptime(event_date+"/"+image_time, "%Y-%m-%d/%H-%M-%S") 248 249 250 def event_details(log, event_path): 251 # Check the cache for the details 252 try: 253 return EventCache.get(event_path) 254 except KeyError: 255 pass 256 257 # Try the file cache next 258 try: 259 if os.path.exists(event_path+"/.details.json"): 260 with open(event_path+"/.details.json") as f: 261 details = json.load(f) 262 263 # Adding to the cache can potentially expire old events 264 ok = EventCache.set(event_path, details) 265 if ok: 266 return details 267 else: 268 return None 269 except json.decoder.JSONDecodeError: 270 log.error("Error reading .details.json from %s", event_path) 271 272 (camera_name, event_date, event_time) = event_path.rsplit("/", 3)[-3:] 273 274 # Grab the camera, date, and time and build the URL path 275 url = "motion/"+"/".join([camera_name, event_date, event_time]) 276 277 # Get the list of images, skipping thumbnail.jpg 278 images = [] 279 for i in sorted(glob(event_path+"/*.jpg")): 280 if "thumbnail.jpg" in i: 281 continue 282 images.append(os.path.basename(i)) 283 284 if os.path.exists(event_path+"/thumbnail.jpg"): 285 thumbnail = url+"/thumbnail.jpg" 286 elif images: 287 # No thumbnail, use the 25% position image 288 thumbnail = url+"/"+images[len(images)//4] 289 else: 290 thumbnail = "images/missing.jpg" 291 292 if images: 293 start_time = image_to_dt(event_date, images[0]) 294 end_time = image_to_dt(event_date, images[-1]) 295 else: 296 # XXX How to handle an empty directory? 297 start_time = datetime.now() 298 end_time = datetime.now() 299 300 # Find the videos, if they exist 301 video = [] 302 for pth in [event_path, event_path+"/debug"]: 303 for ext in ["m4v", "webm", "mp4", "ogg"]: 304 if os.path.exists(pth+"/video."+ext): 305 video.append(url+"/video."+ext) 306 break 307 else: 308 video.append("images/missing.jpg") 309 310 is_saved = os.path.exists(event_path+"/.saved") 311 312 details = { 313 "start": start_time, 314 "end": end_time, 315 "title": start_time.strftime("%a %b %d %I:%M:%S %p"), 316 "video": video[0], 317 "debug_video": video[1], 318 "thumbnail": thumbnail, 319 "images": [], 320 "saved": is_saved, 321 "event_path": event_path, 322 } 323 324 # Adding to the cache can potentially expire it if it was an old event 325 ok = EventCache.set(event_path, details) 326 if not ok: 327 return None 328 329 with open(event_path+"/.details.json", "w") as f: 330 json.dump(details, f, default=str) 331 return details 332 333 334 def camera_events(log, base_dir, camera, start, end, offset, limit): 335 # Newest to oldest, limited by offset and limit 336 skipped = 0 337 added = 0 338 events = [] 339 for event_path in EventCache.events(camera=camera, reverse=True): 340 dt = path_to_dt(event_path) 341 if dt < start or dt > end: 342 continue 343 if skipped < offset: 344 skipped += 1 345 continue 346 347 details = event_details(log, event_path) 348 if details is not None: 349 events.insert(0, details) 350 351 added += 1 352 if limit > 0 and added >= limit: 353 break 354 355 return events 356 357 def queue_events(log, queue_rx): 358 """ 359 Loop, reading new event paths from the Pipe (the queue mp thread is at the other end) 360 and adding their details to the EventCache 361 """ 362 while True: 363 try: 364 if not queue_rx.poll(10): 365 continue 366 367 event_path = queue_rx.recv() 368 except EOFError: 369 break 370 371 _ = event_details(log, event_path)