#!/usr/bin/env python3 """ Nighttime ROI Calibrator Monitors for pump light at night and recalculates ROI1 center to compensate for camera drift """ import cv2 import numpy as np import time import json import os import urllib.request import logging from datetime import datetime, time as dt_time from collections import deque logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/home/al/logs/roi_calibrator.log'), logging.StreamHandler() ] ) class ROICalibrator: def __init__(self): self.stream_url = 'http://localhost:5000/video_feed' self.calibration_file = '/home/al/python/roi_calibration.json' self.seed_file = '/home/al/python/roi_seed.json' # Night time window (8 PM to 6 AM) self.night_start = dt_time(23,30) # 11:30 PM self.night_end = dt_time(6, 0) # 6:00 AM # Load current ROI1 position (or use default) self.roi1_center = self.load_roi1_seed() # Create a larger search area around ROI1 self.search_padding = 15 # pixels to expand search area in each direction self.search_area = self.calculate_search_area() # Brightness detection parameters self.baseline_window = deque(maxlen=30) # 60 seconds of baseline self.brightness_spike_threshold = 15 # brightness increase threshold self.stabilization_wait = 10 # seconds to wait after detection # State tracking self.last_calibration = self.load_last_calibration_time() self.calibration_cooldown = 3600 # 1 hour between calibrations logging.info(f"ROI Calibrator initialized") logging.info(f"Current ROI1 center: {self.roi1_center}") logging.info(f"Search area: {self.search_area}") logging.info(f"Night window: {self.night_start} to {self.night_end}") def load_roi1_seed(self): """Load the current/default ROI1 center position""" if os.path.exists(self.seed_file): try: with open(self.seed_file, 'r') as f: data = json.load(f) center = data.get('roi1_center', [252, 247]) # default center of (249,244,6,6) logging.info(f"Loaded ROI1 seed: {center}") return center except Exception as e: logging.warning(f"Could not load seed file: {e}, using default") # Default: center of ROI1 (249, 244, 6, 6) = (249+3, 244+3) = (252, 247) return [252, 247] def calculate_search_area(self): """Calculate the expanded search area around ROI1""" x_center, y_center = self.roi1_center x_min = max(0, x_center - self.search_padding) y_min = max(0, y_center - self.search_padding) x_max = min(640, x_center + self.search_padding) # assuming 640x480 frame y_max = min(480, y_center + self.search_padding) return (x_min, y_min, x_max - x_min, y_max - y_min) def load_last_calibration_time(self): """Load timestamp of last calibration""" if os.path.exists(self.calibration_file): try: with open(self.calibration_file, 'r') as f: data = json.load(f) return data.get('timestamp', 0) except: pass return 0 def is_nighttime(self): """Check if current time is within night window""" now = datetime.now().time() if self.night_start < self.night_end: # Normal case: 8 AM to 5 PM return self.night_start <= now <= self.night_end else: # Crosses midnight: 8 PM to 6 AM return now >= self.night_start or now <= self.night_end def get_frame_from_stream(self): """Grab a single frame from the video stream""" try: stream = urllib.request.urlopen(self.stream_url, timeout=5) bytes_data = b'' # Read until we get a complete frame timeout = time.time() + 10 while time.time() < timeout: bytes_data += stream.read(1024) a = bytes_data.find(b'\xff\xd8') b = bytes_data.find(b'\xff\xd9') if a != -1 and b != -1: jpg = bytes_data[a:b+2] frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR) stream.close() return frame stream.close() return None except Exception as e: logging.error(f"Error getting frame: {e}") return None def get_search_area_brightness(self, frame): """Calculate average brightness in the search area""" x, y, w, h = self.search_area search_region = frame[y:y+h, x:x+w] return np.mean(search_region) def find_brightness_centroid(self, frame): """ Find the centroid of the brightest region in the search area Returns (x, y) in full frame coordinates """ x, y, w, h = self.search_area search_region = frame[y:y+h, x:x+w] # Convert to grayscale gray = cv2.cvtColor(search_region, cv2.COLOR_BGR2GRAY) # Find the brightest pixels (top 5% or above 200 brightness) threshold = max(200, np.percentile(gray, 95)) _, bright_mask = cv2.threshold(gray, threshold, 255, cv2.THRESH_BINARY) # Calculate moments to find centroid moments = cv2.moments(bright_mask) if moments['m00'] > 0: # Centroid relative to search area cx = int(moments['m10'] / moments['m00']) cy = int(moments['m01'] / moments['m00']) # Convert to full frame coordinates cx_full = x + cx cy_full = y + cy logging.info(f"Found brightness centroid at ({cx_full}, {cy_full})") return (cx_full, cy_full) # If no bright region found, return current center logging.warning("Could not find brightness centroid, returning current center") return tuple(self.roi1_center) def save_calibration(self, new_center): """Save the new ROI1 center to calibration file""" data = { 'roi1_center': list(new_center), 'timestamp': time.time(), 'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'old_center': self.roi1_center } try: with open(self.calibration_file, 'w') as f: json.dump(data, f, indent=2) logging.info(f"Saved calibration: old={self.roi1_center}, new={new_center}") return True except Exception as e: logging.error(f"Failed to save calibration: {e}") return False def run_calibration_cycle(self): """ Main calibration cycle: 1. Establish baseline brightness 2. Detect pump activation (brightness spike) 3. Wait for stabilization 4. Calculate new center 5. Save calibration """ logging.info("Starting calibration cycle...") # Phase 1: Establish baseline logging.info("Phase 1: Establishing baseline brightness...") baseline_samples = 0 while baseline_samples < 30: # 60 seconds frame = self.get_frame_from_stream() if frame is not None: brightness = self.get_search_area_brightness(frame) self.baseline_window.append(brightness) baseline_samples += 1 logging.info(f"Baseline sample {baseline_samples}/30: {brightness:.1f}") time.sleep(2) baseline_avg = np.mean(self.baseline_window) logging.info(f"Baseline established: {baseline_avg:.1f}") # Phase 2: Wait for pump activation logging.info("Phase 2: Waiting for pump activation...") detection_timeout = time.time() + 3600 # 1 hour timeout while time.time() < detection_timeout: if not self.is_nighttime(): logging.info("Exited night window, aborting calibration") return False frame = self.get_frame_from_stream() if frame is not None: current_brightness = self.get_search_area_brightness(frame) # Check for brightness spike if current_brightness - baseline_avg > self.brightness_spike_threshold: logging.info(f"PUMP DETECTED! Brightness: {current_brightness:.1f} (baseline: {baseline_avg:.1f})") # Phase 3: Wait for stabilization logging.info(f"Phase 3: Waiting {self.stabilization_wait} seconds for stabilization...") time.sleep(self.stabilization_wait) # Phase 4: Calculate new center logging.info("Phase 4: Calculating brightness centroid...") stabilized_frame = self.get_frame_from_stream() if stabilized_frame is not None: new_center = self.find_brightness_centroid(stabilized_frame) # Phase 5: Save calibration logging.info("Phase 5: Saving calibration...") if self.save_calibration(new_center): logging.info("Calibration completed successfully!") return True else: logging.error("Failed to save calibration") return False # Update rolling baseline self.baseline_window.append(current_brightness) time.sleep(2) logging.warning("Calibration timeout - no pump activation detected") return False def run(self): """Main loop - waits for night and performs calibration""" logging.info("ROI Calibrator service started") while True: try: # Check if it's nighttime if not self.is_nighttime(): now = datetime.now().time() logging.info(f"Not nighttime (current: {now}), sleeping...") time.sleep(300) # Check every 5 minutes continue # Check if we've calibrated recently time_since_last = time.time() - self.last_calibration if time_since_last < self.calibration_cooldown: logging.info(f"Recently calibrated ({time_since_last/60:.1f} min ago), sleeping...") time.sleep(300) continue # Run calibration logging.info("=== Starting nighttime calibration ===") success = self.run_calibration_cycle() if success: self.last_calibration = time.time() logging.info("Calibration successful, sleeping for cooldown period") time.sleep(self.calibration_cooldown) else: logging.warning("Calibration failed, will retry in 30 minutes") time.sleep(1800) except Exception as e: logging.error(f"Error in calibration loop: {e}") time.sleep(300) if __name__ == "__main__": # Create the seed file if it doesn't exist seed_file = '/home/al/python/roi_seed.json' if not os.path.exists(seed_file): logging.info("Creating ROI seed file with default position") seed_data = { 'roi1_center': [252, 247], # center of (249, 244, 6, 6) 'created': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'note': 'This is the initial ROI1 center position. Modify this if you need to reset the calibration baseline.' } os.makedirs(os.path.dirname(seed_file), exist_ok=True) with open(seed_file, 'w') as f: json.dump(seed_data, f, indent=2) calibrator = ROICalibrator() calibrator.run()