init
This commit is contained in:
146
main.py
Normal file
146
main.py
Normal file
@@ -0,0 +1,146 @@
|
||||
import cv2
|
||||
from numpy import extract
|
||||
import pytesseract
|
||||
import logging
|
||||
import re
|
||||
import os
|
||||
import json
|
||||
|
||||
|
||||
CROP = (
|
||||
10, # x
|
||||
10, # y
|
||||
32, # h
|
||||
120, # w
|
||||
)
|
||||
|
||||
RX = r"\d+"
|
||||
|
||||
RESULTS_DIR = "results.json"
|
||||
VIDEOS_DIR = "videos"
|
||||
|
||||
|
||||
def get_log_level(level):
|
||||
level = level.strip().upper()
|
||||
if level.isdigit():
|
||||
return int(level)
|
||||
if lvl := logging.getLevelNamesMapping().get(level):
|
||||
return lvl
|
||||
raise Exception("invalid log level")
|
||||
|
||||
|
||||
def setup_logger(level):
|
||||
logging.basicConfig(
|
||||
level=level,
|
||||
format="%(asctime)s [%(levelname).3s] <%(filename)s:%(funcName)s> %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
|
||||
|
||||
def extract_count_at_frame(cap, idx):
|
||||
logging.debug(f"getting info at frame idx={idx}")
|
||||
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
|
||||
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
raise Exception(f"failed to select frame idx={idx}")
|
||||
|
||||
fh, fw, _ = frame.shape
|
||||
|
||||
(x, y, h, w) = CROP
|
||||
cropped = frame[x:h, fw - w : fw - y]
|
||||
|
||||
gray = cv2.cvtColor(cropped, cv2.COLOR_RGB2GRAY)
|
||||
|
||||
text = pytesseract.image_to_string(gray, config="--psm 6") # "psm 6" = assume a block of text
|
||||
matches = re.findall(RX, text)
|
||||
if not matches:
|
||||
return -1
|
||||
return int(matches[-1])
|
||||
|
||||
|
||||
def get_pivot(start, end):
|
||||
return int((end - start) / 2) + start
|
||||
|
||||
|
||||
def extract_timestamps(video_path):
|
||||
cap = cv2.VideoCapture(video_path)
|
||||
if not cap.isOpened():
|
||||
raise Exception("failed to open file")
|
||||
|
||||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - int(fps)
|
||||
|
||||
c_start = extract_count_at_frame(cap, 0)
|
||||
c_end = extract_count_at_frame(cap, total_frames)
|
||||
if c_end <= c_start:
|
||||
logging.info(f"no additional deaths in this video start={c_start} end={c_end}")
|
||||
return []
|
||||
|
||||
timeframes = []
|
||||
|
||||
start = 0
|
||||
end = total_frames
|
||||
last_count = c_start
|
||||
while True:
|
||||
pivot = get_pivot(start, end)
|
||||
count = extract_count_at_frame(cap, pivot)
|
||||
if count < c_start or count > c_end:
|
||||
logging.warning(f"faulty value found count={count} c_start={c_start} c_end={c_end} frame={pivot}")
|
||||
start += 1
|
||||
continue
|
||||
logging.debug(f"s={start} e={end} p={pivot} c={count}")
|
||||
if end - start < fps:
|
||||
logging.debug(f"found window s={start} e={end} p={pivot} c={count}")
|
||||
timeframes.append(start / fps)
|
||||
last_count += 1
|
||||
if len(timeframes) == c_end - c_start:
|
||||
break
|
||||
start = end + fps
|
||||
end = total_frames
|
||||
elif count <= last_count:
|
||||
start = pivot
|
||||
else:
|
||||
end = pivot
|
||||
|
||||
cap.release()
|
||||
|
||||
return timeframes
|
||||
|
||||
|
||||
def format_timestamp(secs):
|
||||
mins = int(secs / 60)
|
||||
secs = int(secs % 60)
|
||||
return f"{mins:0<2}:{secs:0<2}"
|
||||
|
||||
|
||||
def load_results():
|
||||
if not os.path.exists(RESULTS_DIR):
|
||||
return {}
|
||||
with open(RESULTS_DIR, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def store_results(results):
|
||||
with open(RESULTS_DIR, "w", encoding="utf-8") as f:
|
||||
json.dump(results, f, indent=2)
|
||||
|
||||
|
||||
def main():
|
||||
setup_logger(get_log_level("info"))
|
||||
|
||||
results = load_results()
|
||||
|
||||
for f in os.listdir(VIDEOS_DIR):
|
||||
if f in results:
|
||||
logging.info(f"already analyzed, skipping file={f}")
|
||||
continue
|
||||
logging.info(f"extracting timestamps ... file={f}")
|
||||
timestamps = extract_timestamps(os.path.join(VIDEOS_DIR, f))
|
||||
results[f] = [format_timestamp(t) for t in timestamps]
|
||||
store_results(results)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user