import cv2 import pytesseract import logging import re import os import json pytesseract.pytesseract.tesseract_cmd = 'C:\\Program Files\\Tesseract-OCR\\tesseract.exe' CROP = ( 5, # x 5, # y 32, # h 120, # w ) RX = r"\d+" RESULTS_DIR = "results.json" VIDEOS_DIR = "videos" VIDEO_EXTENSIONS = [".webm", ".mp4"] def setup_logger(level): logging.basicConfig( level=level, format="%(asctime)s [%(levelname).3s] <%(filename)s:%(funcName)s> %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) def extract_count_at_frame(cap, idx): logging.debug(f"getting info at frame idx={idx}") cap.set(cv2.CAP_PROP_POS_FRAMES, idx) ret, frame = cap.read() if not ret: raise Exception(f"failed to select frame idx={idx}") fh, fw, _ = frame.shape (x, y, h, w) = CROP cropped = frame[x:h, fw - w: fw - y] gray = cv2.cvtColor(cropped, cv2.COLOR_RGB2GRAY) # "psm 6" = assume a block of text text = pytesseract.image_to_string(gray, config="--psm 6") matches = re.findall(RX, text) if not matches: return -1 return int(matches[-1]) def get_pivot(start, end): return int((end - start) / 2) + start def extract_timestamps(video_path): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise Exception("failed to open file") fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - int(fps) c_start = extract_count_at_frame(cap, 0) c_end = extract_count_at_frame(cap, total_frames) if c_end <= c_start: logging.info( f"no additional deaths in this video start={c_start} end={c_end}") return [] timeframes = [] start = 0 end = total_frames last_count = c_start while True: pivot = get_pivot(start, end) count = extract_count_at_frame(cap, pivot) if count < 100 or count < c_start or count > c_end: logging.warning( f"faulty value found count={count} c_start={c_start} c_end={c_end} frame={pivot}") start += fps continue logging.info(f"s={start} e={end} p={pivot} c={count}") if end - start < fps: logging.info( f"found window s={start} e={end} p={pivot} c={count}") timeframes.append(start / fps) last_count += 1 if len(timeframes) == c_end - c_start: break start = end + fps end = total_frames elif count <= last_count: start = pivot else: end = pivot cap.release() return timeframes def format_timestamp(secs): mins = int(secs / 60) secs = int(secs % 60) return f"{mins:0>2}:{secs:0>2}" def load_results(): if not os.path.exists(RESULTS_DIR): return {} with open(RESULTS_DIR, "r", encoding="utf-8") as f: return json.load(f) def store_results(results): with open(RESULTS_DIR, "w", encoding="utf-8") as f: json.dump(results, f, indent=2, ensure_ascii=False) def main(): setup_logger("INFO") results = load_results() for f in os.listdir(VIDEOS_DIR): _, ext = os.path.splitext(f) if ext not in VIDEO_EXTENSIONS: logging.info(f"skipping: file extension file={f}") continue if f in results: logging.info(f"skipping: already analyzed file={f}") continue logging.info(f"extracting timestamps ... file={f}") timestamps = extract_timestamps(os.path.join(VIDEOS_DIR, f)) results[f] = [format_timestamp(t) for t in timestamps] store_results(results) if __name__ == "__main__": main()