events.py (11673B)
1 # events.py 2 # 3 # Copyright (C) 2017 Brian C. Lane 4 # 5 # This program is free software; you can redistribute it and/or modify 6 # it under the terms of the GNU General Public License as published by 7 # the Free Software Foundation; either version 2 of the License, or 8 # (at your option) any later version. 9 # 10 # This program is distributed in the hope that it will be useful, 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 # GNU General Public License for more details. 14 # 15 # You should have received a copy of the GNU General Public License 16 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 from datetime import datetime, timedelta 18 from glob import glob 19 import json 20 import multiprocessing as mp 21 import os 22 import re 23 import shutil 24 import tempfile 25 import threading 26 27 import structlog 28 29 class EventCacheClass: 30 def __init__(self): 31 self._log = None 32 self._base_dir = "/invalid/path/for/expire" 33 self._last_check = datetime.now() 34 self._check_cache = 60 35 self._keep_days = 9999 36 self._lock = threading.Lock() 37 self._cache = {} 38 39 def cleanup_dq(self): 40 """ 41 Cleanup any delete_queue subdirectories that may be leftover from previous run 42 """ 43 44 def dth_fn(dq_dirs): 45 for dq in dq_dirs: 46 if not dq.startswith(self._base_dir): 47 raise RuntimeError(f"Invalid dq path: {dq}") 48 49 shutil.rmtree(dq, ignore_errors=True) 50 51 base = os.path.join(self._base_dir, "delete_queue") 52 dq_dirs = [os.path.join(base, dq) for dq in os.listdir(base)] 53 54 # Start a thread to do the actual delete in the background 55 dth = mp.Process(name="delete-thread", 56 target=dth_fn, 57 args=(dq_dirs,)) 58 dth.start() 59 60 def get(self, key): 61 with self._lock: 62 return self._cache[key] 63 64 def set(self, key, value): 65 with self._lock: 66 # Convert start/end to datetime object 67 if "start" in value and type(value["start"]) == type(""): 68 value["start"] = datetime.fromisoformat(value["start"]) 69 if "end" in value and type(value["end"]) == type(""): 70 value["end"] = datetime.fromisoformat(value["end"]) 71 72 # Add missing title (old cache will not have this new field) 73 if "title" not in value: 74 value["title"] = value["start"].strftime("%a %b %d %I:%M:%S %p"), 75 76 self._cache[key] = value 77 78 # This can potentially remove the key just added if it is an old event 79 self._expire_events() 80 81 # Return True if it was added to the cache 82 return key in self._cache 83 84 def events(self, camera=None, reverse=False): 85 """ 86 Return a sorted list of the events 87 """ 88 if not camera: 89 return sorted(self._cache.keys(), reverse=reverse) 90 91 cp = self._base_dir + "/" + camera 92 # limit results to the selected camera 93 return sorted((p for p in self._cache.keys() if p.startswith(cp)), reverse=reverse) 94 95 def base_dir(self, base_dir): 96 with self._lock: 97 self._base_dir = base_dir 98 99 def logger(self, logger): 100 with self._lock: 101 self._log = logger 102 103 def keep(self, days): 104 with self._lock: 105 self._keep_days = days 106 107 def check_cache(self, minutes): 108 with self._lock: 109 self._check_cache = minutes 110 111 def reset_check(self): 112 with self._lock: 113 self._last_check = datetime(1985, 10, 26, 1, 22, 0) 114 115 def log_info(self, *args): 116 if self._log: 117 self._log.info(*args) 118 119 def log_error(self, *args): 120 if self._log: 121 self._log.error(*args) 122 123 def _expire_events(self): 124 start = datetime.now() 125 126 if start - self._last_check < timedelta(minutes=self._check_cache): 127 return 128 self._last_check = datetime.now() 129 130 self.log_info("Checking cache...") 131 132 remove = {} 133 for e in self._cache: 134 if self._cache[e]["start"] < datetime.now() - timedelta(days=self._keep_days): 135 if "event_path" in self._cache[e] \ 136 and self._cache[e]["event_path"].startswith(self._base_dir): 137 daypath = os.path.dirname(self._cache[e]["event_path"].rstrip("/")) 138 if daypath in remove: 139 remove[daypath].append(e) 140 else: 141 remove[daypath] = [e] 142 143 self.log_info(f"Done checking cache in {datetime.now()-start}") 144 145 if len(remove) == 0: 146 return 147 148 # The result of the above is a dict (remove) with daily lists of events to be 149 # removed. NOTE that this may not be ALL the day's events so it needs to move 150 # them individually, but needs to use the Camera and date to prevent collisions 151 # with other cameras while waiting for the delete to run in the background. 152 153 # Create the temporary delete_queue directory 154 delete_queue = tempfile.mkdtemp(dir=os.path.join(self._base_dir, "delete_queue")) 155 156 # Move each day's directory to the temporary delete_queue directory 157 for daypath in remove: 158 # All paths should have a Camera* component 159 cm = re.search("(Camera\d+)", daypath) 160 if not cm: 161 self.log_error(f"Camera* missing from path {daypath}") 162 163 if cm and os.path.exists(daypath): 164 # Make a directory for the day's events 165 daydir = os.path.basename(daypath) 166 dqdir = os.path.join(delete_queue, cm.group(), daydir) 167 if not os.path.exists(dqdir): 168 os.makedirs(dqdir) 169 170 # Move the expired events into the delete_queue/Camera*/YYYY-MM-DD/ directory 171 for e in remove[daypath]: 172 self.log_info(f"MOVE: {e} -> {dqdir}") 173 shutil.move(e, dqdir) 174 175 # Remove the events from the cache 176 self.log_info(f"Removing {len(remove[daypath])} events from {daypath}") 177 for e in remove[daypath]: 178 del self._cache[e] 179 180 self.log_info(f"Expire of {len(remove)} directories took: {datetime.now()-start}") 181 182 def dth_fn(delete_queue): 183 shutil.rmtree(delete_queue, ignore_errors=True) 184 185 # Start a thread to do the actual delete in the background 186 dth = mp.Process(name="delete-thread", 187 target=dth_fn, 188 args=(delete_queue,)) 189 dth.start() 190 191 192 # Singleton 193 EventCache = EventCacheClass() 194 195 196 def preload_cache(log, base_dir): 197 log.info("Pre-loading event cache...") 198 199 total = timedelta() 200 for camera in sorted(c for c in os.listdir(base_dir) if c.startswith("Camera")): 201 start = datetime.now() 202 203 # YYYY-MM-DD/HH-MM-SS is the format of the event directories. 204 glob_path="%s/%s/????-??-??/??-??-??" % (base_dir, camera) 205 for event_path in sorted(glob(glob_path), reverse=True): 206 _ = event_details(log, event_path) 207 log.info(f"{camera} event cache loaded in {datetime.now()-start} seconds") 208 total += datetime.now()-start 209 log.info(f"Event cache loaded in {total} seconds") 210 211 # Next event will check for expired entries 212 EventCache.reset_check() 213 214 215 def path_to_dt(path): 216 # Use the last 2 elements of the path to construct a Datatime 217 (date, time) = path.split("/")[-2:] 218 time = time.replace("-", ":") 219 dt_str = "{0} {1}".format(date, time) 220 return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S") 221 222 def image_to_dt(event_date, image): 223 """ Convert an event date (YYYY-MM-DD) and image HH-MM-SS-FF 224 225 returns a datetime object 226 """ 227 # Trim off .jpg and the frame count 228 image_time = image.rsplit('-', 1)[0] 229 return datetime.strptime(event_date+"/"+image_time, "%Y-%m-%d/%H-%M-%S") 230 231 232 def event_details(log, event_path): 233 # Check the cache for the details 234 try: 235 return EventCache.get(event_path) 236 except KeyError: 237 pass 238 239 # Try the file cache next 240 try: 241 if os.path.exists(event_path+"/.details.json"): 242 with open(event_path+"/.details.json") as f: 243 details = json.load(f) 244 245 # Adding to the cache can potentially expire old events 246 ok = EventCache.set(event_path, details) 247 if ok: 248 return details 249 else: 250 return None 251 except json.decoder.JSONDecodeError: 252 log.error("Error reading .details.json from %s", event_path) 253 254 (camera_name, event_date, event_time) = event_path.rsplit("/", 3)[-3:] 255 256 # Grab the camera, date, and time and build the URL path 257 url = "motion/"+"/".join([camera_name, event_date, event_time]) 258 259 # Get the list of images, skipping thumbnail.jpg 260 images = [] 261 for i in sorted(glob(event_path+"/*.jpg")): 262 if "thumbnail.jpg" in i: 263 continue 264 images.append(os.path.basename(i)) 265 266 if os.path.exists(event_path+"/thumbnail.jpg"): 267 thumbnail = url+"/thumbnail.jpg" 268 elif images: 269 thumbnail = images[len(images)//4] 270 else: 271 thumbnail = "images/missing.jpg" 272 273 if images: 274 start_time = image_to_dt(event_date, images[0]) 275 end_time = image_to_dt(event_date, images[-1]) 276 else: 277 # XXX How to handle an empty directory? 278 start_time = datetime.now() 279 end_time = datetime.now() 280 281 # Find the videos, if they exist 282 video = [] 283 for pth in [event_path, event_path+"/debug"]: 284 for ext in ["m4v", "webm", "mp4", "ogg"]: 285 if os.path.exists(pth+"/video."+ext): 286 video.append(url+"/video."+ext) 287 break 288 else: 289 video.append("images/missing.jpg") 290 291 is_saved = os.path.exists(event_path+"/.saved") 292 293 details = { 294 "start": start_time, 295 "end": end_time, 296 "title": start_time.strftime("%a %b %d %I:%M:%S %p"), 297 "video": video[0], 298 "debug_video": video[1], 299 "thumbnail": thumbnail, 300 "images": [], 301 "saved": is_saved, 302 "event_path": event_path, 303 } 304 305 # Adding to the cache can potentially expire it if it was an old event 306 ok = EventCache.set(event_path, details) 307 if not ok: 308 return None 309 310 with open(event_path+"/.details.json", "w") as f: 311 json.dump(details, f, default=str) 312 return details 313 314 315 def camera_events(log, base_dir, camera, start, end, offset, limit): 316 # Newest to oldest, limited by offset and limit 317 skipped = 0 318 added = 0 319 events = [] 320 for event_path in EventCache.events(camera=camera, reverse=True): 321 dt = path_to_dt(event_path) 322 if dt < start or dt > end: 323 continue 324 if skipped < offset: 325 skipped += 1 326 continue 327 328 details = event_details(log, event_path) 329 if details is not None: 330 events.insert(0, details) 331 332 added += 1 333 if limit > 0 and added >= limit: 334 break 335 336 return events 337 338 def queue_events(log, queue_rx): 339 """ 340 Loop, reading new event paths from the Pipe (the queue mp thread is at the other end) 341 and adding their details to the EventCache 342 """ 343 while True: 344 try: 345 if not queue_rx.poll(10): 346 continue 347 348 event_path = queue_rx.recv() 349 except EOFError: 350 break 351 352 _ = event_details(log, event_path)