queue.py (6610B)
1 # queue.py 2 # 3 # Copyright (C) 2017 Brian C. Lane 4 # 5 # This program is free software; you can redistribute it and/or modify 6 # it under the terms of the GNU General Public License as published by 7 # the Free Software Foundation; either version 2 of the License, or 8 # (at your option) any later version. 9 # 10 # This program is distributed in the hope that it will be useful, 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 # GNU General Public License for more details. 14 # 15 # You should have received a copy of the GNU General Public License 16 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 from glob import glob 18 import json 19 import multiprocessing as mp 20 import os 21 import shutil 22 import subprocess 23 import time 24 25 from PIL import Image 26 import structlog 27 28 from . import logger 29 30 THUMBNAIL_SIZE = (640, 480) 31 32 # More than 5 minute events have a timelapse created 33 TIMELAPSE_MIN = 5 * 60 * 5 34 35 def max_cores() -> int: 36 return max(1, mp.cpu_count() // 2) 37 38 39 def GetImageDescriptions(path): 40 """ 41 Extract EXIF ImageDescription for all the images in the directory 42 """ 43 ## Run exiftool on the files 44 cmd = ["exiftool", "-json", "-q", "-ImageDescription", path] 45 try: 46 out = subprocess.check_output(cmd) 47 j = json.loads(out) 48 return [d for d in j if "ImageDescription" in d] 49 except subprocess.CalledProcessError: 50 pass 51 52 return [] 53 54 55 def DescriptionDict(desc): 56 """ 57 Split the motion info into dict entries 58 59 <changed>-<noise>-<width>-<height>-<X>-<Y> 60 """ 61 try: 62 changed, noise, width, height, x, y = desc.split("-") 63 return { 64 "changed": int(changed), 65 "noise": int(noise), 66 "width": int(width), 67 "height": int(height), 68 "x": int(x), 69 "y": int(y), 70 "area": int(width) * int(height) 71 } 72 except ValueError: 73 return { 74 "changed": 0, 75 "noise": 0, 76 "width": 0, 77 "height": 0, 78 "x": 0, 79 "y": 0, 80 "area": 0 81 } 82 83 84 def BestThumbnail(path): 85 """ 86 Make a best guess at the image to use for a thumbnail 87 88 Use the one with the most changes. 89 """ 90 data = GetImageDescriptions(path) 91 92 images = [] 93 for i in data: 94 images.append({"name": i["SourceFile"], "description": DescriptionDict(i["ImageDescription"])}) 95 sorted_images = sorted(images, key=lambda i: i["description"]["changed"], reverse=True) 96 return sorted_images[0]["name"] 97 98 99 ## Handle watching the queue and dispatching movie creation and directory moving 100 101 def process_event(log: structlog.BoundLogger, base_dir: str, event: str, queue_tx) -> None: 102 log.info(event_path=event, base_dir=base_dir) 103 104 # The actual path is the event with _ replaced by / 105 event_path = os.path.join(base_dir, event.replace("_", os.path.sep)) 106 if not os.path.isdir(event_path): 107 log.error("event_path doesn't exist", event_path=event_path) 108 return 109 110 debug_path = os.path.join(event_path, "debug") 111 try: 112 os.mkdir(debug_path, mode=0o755) 113 except Exception as e: 114 log.error("Failed to create debug directory", exception=str(e)) 115 return 116 117 # Move the debug images into ./debug/ 118 try: 119 for debug_img in glob(os.path.join(event_path, "*m.jpg")): 120 shutil.move(debug_img, debug_path) 121 except Exception as e: 122 log.debug("Failed to move debug images into ./debug/") 123 124 ffmpeg_cmd = ["ffmpeg", "-f", "image2", "-pattern_type", "glob", "-framerate", "5", 125 "-i", "*.jpg", "-vf", "scale=1280:-2"] 126 127 # Make a timelapse for events that are too long 128 if len(glob(f"{event_path}/*jpg")) > TIMELAPSE_MIN: 129 ffmpeg_cmd += ["-vf", "setpts=0.0625*PTS"] 130 131 ffmpeg_cmd += ["-c:v", "h264", "-b:v", "2M", "video.m4v"] 132 log.debug("ffmpeg cmdline", ffmpeg_cmd=ffmpeg_cmd) 133 134 # Make a movie out of the jpg images with ffmpeg 135 try: 136 subprocess.run(ffmpeg_cmd, cwd=event_path, check=True) 137 except Exception as e: 138 log.error("Failed to create video", exception=str(e)) 139 140 # Make a movie out of the debug jpg images with ffmpeg 141 try: 142 subprocess.run(ffmpeg_cmd, cwd=debug_path, check=True) 143 except Exception as e: 144 log.error("Failed to create debug video", exception=str(e)) 145 146 try: 147 # Get the image with the highest change value 148 thumbnail = BestThumbnail(event_path) 149 im = Image.open(thumbnail) 150 # im.size will get the actual size of the image 151 im.thumbnail(THUMBNAIL_SIZE) 152 im.save(os.path.join(event_path, "thumbnail.jpg"), "JPEG") 153 except Exception as e: 154 log.error("Failed to create thumbnail", exception=str(e)) 155 156 # Move the directory to its final location 157 try: 158 # Use the time of the first image 159 images = sorted(list(glob(os.path.join(event_path, "*-*-*-*.jpg")))) 160 first_jpg = os.path.split(images[0])[1] 161 first_time = first_jpg.rsplit("-", 1)[0] 162 event_path_base = os.path.split(event_path)[0] 163 dest_path = os.path.join(event_path_base, first_time) 164 if not os.path.exists(dest_path): 165 os.rename(event_path, dest_path) 166 log.info("Moved event to final location", dest_path=dest_path) 167 168 # Tell the event thread/process about the new path 169 queue_tx.send(dest_path) 170 except Exception as e: 171 log.error("Moving to destination failed", event_path=event_path, exception=str(e)) 172 173 def monitor_queue(logging_queue, base_dir, quit, max_threads, queue_tx): 174 threads = [] 175 log = logger.log(logging_queue) 176 177 queue_path = os.path.abspath(os.path.join(base_dir, "queue/")) 178 log.info("Started queue monitor", queue_path=queue_path) 179 while not quit.is_set(): 180 time.sleep(5) 181 # Remove any threads from the list that have finished 182 for t in threads[:]: 183 if not t.is_alive(): 184 threads.remove(t) 185 186 for event_file in glob(os.path.join(queue_path, "*")): 187 # Limit the number of processes to 1/2 the number of cpus (or 1) 188 if len(threads) >= max_threads: 189 break 190 191 os.unlink(event_file) 192 event = os.path.split(event_file)[-1] 193 thread = mp.Process(target=process_event, args=(log, base_dir, event, queue_tx)) 194 threads.append(thread) 195 thread.start() 196 197 log.info("monitor_queue waiting for threads to finish") 198 for t in threads: 199 t.join() 200 201 log.info("monitor_queue is quitting")