Source code for core.video_synchronization

import multiprocessing as mp
import pickle
import random
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import List, Tuple, Optional, Union, Dict, Any

import ffmpeg
import imageio as iio
import numpy as np
import pandas as pd
import scipy

from .marker_detection import DeeplabcutInterface, ManualAnnotation
from .plotting import AlignmentPlotIndividual, LEDMarkerPlot
from .utils import Coordinates, convert_to_path
from .video_metadata import VideoMetadata


[docs]class TimeseriesTemplate(ABC): @property @abstractmethod def template_attribute_string(self) -> str: pass
[docs] def adjust_template_timeseries_to_fps(self, fps: int) -> List[Tuple[np.ndarray, int]]: """ Adjust template to framerate. Parameters ---------- fps: int The framerate, to which the template will be adjusted. Returns ------- fps_adjusted_templates: list of tuple[np.ndarray, int] List of all possible timeseries that could be observed, given the framerate and the resolution of the template (1 ms) as tuple of template as np.ndarray and offset in ms. """ template_timeseries = getattr(self, self.template_attribute_string) fps_adjusted_templates = [] framerate = fps / 1000 max_frames = int(template_timeseries.shape[0] * framerate) max_offset = 1000 // fps for offset_in_ms in range(max_offset): image_timestamps = np.linspace( 0 + offset_in_ms, template_timeseries.shape[0] + offset_in_ms, max_frames, dtype="int", ) while image_timestamps[-1] >= template_timeseries.shape[0]: image_timestamps = image_timestamps[:-1] adjusted_template = template_timeseries[image_timestamps].copy() fps_adjusted_templates.append((adjusted_template, offset_in_ms)) return fps_adjusted_templates
[docs]class MotifTemplate(TimeseriesTemplate): """ Class to store single motif. Attributes __________ led_on_time_in_ms: int Duration of one on-peak. on_off_period_length_in_ms: int Period between two on-peaks. motif_duration_in_ms: int Total duration of the motif. template_timeseries: np.ndarray Array of binary 0 / 1 values representing a single motif with a resolution of 1 ms. Methods _______ adjust_template_timeseries_to_fps Adjust template to framerate. """ @property def template_attribute_string(self) -> str: return "template_timeseries" def __init__( self, led_on_time_in_ms: int, on_off_period_length_in_ms: int, motif_duration_in_ms: int, ): """ Constructor for class MotifTemplate. Parameters ---------- led_on_time_in_ms: int Duration of one on-peak. on_off_period_length_in_ms: int Period between two on-peaks. motif_duration_in_ms: int Total duration of the motif. """ self.led_on_time_in_ms = led_on_time_in_ms self.on_off_period_length_in_ms = on_off_period_length_in_ms self.motif_duration_in_ms = motif_duration_in_ms self.template_timeseries = self._compute_template_timeseries() def _compute_template_timeseries(self) -> np.ndarray: led_on_off_period = np.zeros(self.on_off_period_length_in_ms, dtype="float") led_on_off_period[1: self.led_on_time_in_ms + 1] = 1 full_repetitions = self.motif_duration_in_ms // self.on_off_period_length_in_ms remaining_ms = self.motif_duration_in_ms % self.on_off_period_length_in_ms motif_template = np.concatenate([led_on_off_period] * (full_repetitions + 1)) adjusted_end_index = ( self.on_off_period_length_in_ms * full_repetitions + remaining_ms ) return motif_template[:adjusted_end_index]
[docs]class MultiMotifTemplate(TimeseriesTemplate): """ Class to combine single motives and store the resulting multi_motif. Attributes __________ multi_motif_template: np.ndarray Array of binary 0 / 1 values representing a multi_motif_template with a resolution of 1 ms. motif_templates: list of MotifTemplates List of single motifs. Methods _______ add_motif_template(motif_template) Append a motif to the already existing multi_motif. adjust_template_timeseries_to_fps Adjust template to framerate. See Also ________ construct_template_motif Construct template from dictionary. """ @property def template_attribute_string(self) -> str: return "multi_motif_template" def __init__(self) -> None: """ Constructor for class MultiMotifTemplate. """ self.multi_motif_template = None self.motif_templates = []
[docs] def add_motif_template(self, motif_template: MotifTemplate) -> None: """ Append a motif to the already existing multi_motif. """ self.motif_templates.append(motif_template) self.multi_motif_template = self._update_session_template()
def _update_session_template(self) -> np.ndarray: individual_motif_template_timeseries = [ elem.template_timeseries for elem in self.motif_templates ] return np.concatenate(individual_motif_template_timeseries)
[docs]def construct_template_motif( blinking_patterns_metadata: Dict ) -> Union[MotifTemplate, MultiMotifTemplate]: """ Construct template from dictionary. Parameters ---------- blinking_patterns_metadata: Dictionaries with patterns as values, that specify arguments for MotifTemplate as key-value pairs: "led_on_time_in_ms", "on_off_period_length_in_ms", "motif_duration_in_ms". Returns ------- template_motif: MotifTemplate or MultiMotifTemplate The constructed template. See Also ________ MultiMotifTemplate Class to combine single motives and store the resulting multi_motif. MotifTemplate Class to store single motif. Examples ________ >>> from core.video_synchronization import construct_template_motif >>> led_pattern = \ ... {0: {'led_on_time_in_ms': 50, ... 'on_off_period_length_in_ms': 100, ... 'motif_duration_in_ms': 3000}, ... 1: {'led_on_time_in_ms': 50, ... 'on_off_period_length_in_ms': 1000, ... 'motif_duration_in_ms': 2000}} >>> template_motif = construct_template_motif(led_pattern) """ motif_templates = [] for pattern_idx, parameters in blinking_patterns_metadata.items(): motif_templates.append( MotifTemplate( led_on_time_in_ms=parameters["led_on_time_in_ms"], on_off_period_length_in_ms=parameters["on_off_period_length_in_ms"], motif_duration_in_ms=parameters["motif_duration_in_ms"], ) ) if len(motif_templates) < 1: raise ValueError( "Could not construct a blinking pattern template. Please validate your config files!" ) elif len(motif_templates) == 1: template_motif = motif_templates[0] else: template_motif = MultiMotifTemplate() for template in motif_templates: template_motif.add_motif_template(motif_template=template) return template_motif
def _find_closest_timestamp_index( original_timestamps: np.ndarray, timestamp: float ) -> int: return np.abs(original_timestamps - timestamp).argmin() def _find_frame_idxs_closest_to_target_timestamps( target_timestamps: np.ndarray, original_timestamps: np.ndarray ) -> List[int]: frame_indices_closest_to_target_timestamps = [] for timestamp in target_timestamps: closest_frame_index = _find_closest_timestamp_index( original_timestamps=original_timestamps, timestamp=timestamp ) frame_indices_closest_to_target_timestamps.append(closest_frame_index) return frame_indices_closest_to_target_timestamps def _get_ms_interval_per_frame(fps: int) -> float: return 1000 / fps def _adjust_frame_idxs_for_synchronization_shift( unadjusted_frame_idxs: List[int], start_idx: int ) -> List[int]: adjusted_frame_idxs = np.asarray(unadjusted_frame_idxs) + start_idx return list(adjusted_frame_idxs) def _compute_fps_adjusted_frame_count( original_n_frames: int, original_fps: int, target_fps: int ) -> int: target_ms_per_frame = _get_ms_interval_per_frame(fps=target_fps) original_ms_per_frame = _get_ms_interval_per_frame(fps=original_fps) return int((original_n_frames * original_ms_per_frame) / target_ms_per_frame) def _compute_timestamps( n_frames: int, fps: int, offset: float = 0.0 ) -> np.ndarray: ms_per_frame = _get_ms_interval_per_frame(fps=fps) timestamps = np.arange(n_frames * ms_per_frame, step=ms_per_frame) return timestamps + offset def _znorm(x, epsilon): return (x - np.mean(x)) / max(np.std(x, ddof=0), epsilon) def _adjust_start_idx_and_offset( start_frame_idx: int, offset: int, fps: int ) -> Tuple[int, float]: original_ms_per_frame = _get_ms_interval_per_frame(fps=fps) n_frames_to_add = int(offset / original_ms_per_frame) adjusted_start_frame_idx = start_frame_idx + n_frames_to_add remaining_offset = offset - n_frames_to_add * original_ms_per_frame return adjusted_start_frame_idx, remaining_offset def _get_start_end_indices_from_center_coord_and_length( center_px: int, length: int ) -> Tuple[int, int]: start_index = center_px - (length // 2) end_index = center_px + (length - (length // 2)) return start_index, end_index def _load_synchro(filepath: Path) -> Tuple[Coordinates, Any, Any, Any]: with open(filepath, "rb") as file: synchro_object = pickle.load(file) return Coordinates(synchro_object["led_center_coordinates"][0], synchro_object["led_center_coordinates"][1]), \ synchro_object["offset_adjusted_start_idx"], synchro_object["remaining_offset"], synchro_object[ "alignment_error"] def _cumsum(x, kahan=0): assert isinstance(kahan, int) and kahan >= 0 y = np.empty(len(x) + 1, dtype=x.dtype) y[0] = 0 np.cumsum(x, out=y[1:]) if kahan: r = x - np.diff(y) if np.max(np.abs(r)): y += np.cumsum(r, kahan - 1) return y def _fft_zdist(q: np.ndarray, s: np.ndarray, epsilon: float): """ Copyright 2020 NVIDIA Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http: // www.apache.org / licenses / LICENSE - 2.0 This function and the local functions called in this function where taken from https://github.com/NVIDIA/rapidAligner.git. Changes were made in a way, that this function is able to use numpy instead of cupy. """ alignment, kahan = 10_000, 0 m, q = len(q), _znorm(q, epsilon) n = (len(s) + alignment - 1) // alignment * alignment is_ = np.zeros(n, dtype=s.dtype) is_[: len(s)] = s delta = n - len(s) x, y = _cumsum(is_, kahan), _cumsum(is_ ** 2, kahan) x = x[+m:] - x[:-m] y = y[+m:] - y[:-m] z = np.sqrt(np.maximum(y / m - np.square(x / m), 0)) e = np.zeros(n, dtype=q.dtype) e[:m] = q r = np.fft.irfft(np.fft.rfft(e).conj() * np.fft.rfft(is_), n=n) np.seterr(divide="ignore") # toggle output np.seterr(invalid="ignore") # toggle output f = np.where(z > 0, 2 * (m - r[: -m + 1] / z), m * np.ones_like(z)) np.seterr(divide="warn") np.seterr(invalid="warn") return f[: len(s) - m + 1] def _run_cpu_aligner(query: np.ndarray, subject: np.ndarray) -> np.ndarray: return _fft_zdist(q=query, s=subject, epsilon=1e-6) def _split_into_ram_digestable_parts(idxs_of_frames_to_sample: List[int], max_ram_digestible_frames: int ) -> List[List[int]]: frame_idxs_to_sample = [] while len(idxs_of_frames_to_sample) > max_ram_digestible_frames: frame_idxs_to_sample.append( idxs_of_frames_to_sample[:max_ram_digestible_frames] ) idxs_of_frames_to_sample = idxs_of_frames_to_sample[ max_ram_digestible_frames: ] frame_idxs_to_sample.append(idxs_of_frames_to_sample) return frame_idxs_to_sample def _delete_individual_video_parts(filepaths_of_video_parts: List[Path] ) -> None: for filepath in filepaths_of_video_parts: filepath.unlink()
[docs]class Synchronizer(ABC): """ Class to synchronize videos. Run alignment between constructed MotifTemplate and detected led blinking pattern. Select frames based on timestemps to match framerate, potentially interpolate and add frames. Start marker detection or write video and save the adjusted files. Parameters __________ video_metadata: VideoMetadata Metadata from video. output_directory: Path or str Directory, in which the files created during the synchronisation are saved. synchro_metadata: Dict Metadata for synchronisation containing SYNCHRO_METADATA_KEYS. Attributes __________ video_metadata: VideoMetadata Metadata from video. use_rapid_aligner: bool If True, synchro pattern alignment will be based on GPU, if False, on CPU. rapid_aligner_path: Path Path to locally installed clone of the rapid_aligner package to use GPU for pattern synchronisation. use_gpu: str Whether to restrict the usage of GPU for DLC analyses. led_box_size: int Pixel range around predicted synchro marker position to calculate mean pixel intensity for blinking pattern from. output_directory: Path or str Directory, in which the files created during the synchronisation are saved. synchro_metadata: Dict Metadata for synchronisation containing SYNCHRO_METADATA_KEYS. led_timeseries_for_cross_video_validation: np.ndarray Mean pixel intensity at predicted synchro marker position adjusted to target fps. led_detection: LEDMarkerPlot Plot to verify correct prediction of LED marker. led_timeseries: np.ndarray Extracted mean pixel intensity at predicted synchro marker position. template_blinking_motif: MotifTemplate or MultiMotifTemplate Constructed template from blinking pattern metadata. Methods _______ run_synchronization(synchronize_only, overwrite_DLC_analysis_and_synchro, verbose) Run alignment between template and detected led blinking pattern. Select frames to adjust framerate. Start marker detection or write video. Examples ________ >>> from core.video_synchronization import RecordingVideoDownSynchronizer >>> from core.video_metadata import VideoMetadata >>> from core.utils import read_config, SYNCHRO_METADATA_KEYS >>> from pathlib import Path >>> project_config = read_config("test_data/project_config.yaml") >>> video_filepath = Path("test_data/Server_structure/VGlut2-flp/September2022/206_F2-63/220922_OTE/220922_206_F2-63_OTE_Side2.mp4") >>> recording_config_dict = read_config("test_data/Server_structure/Calibrations/220922/recording_config_220922.yaml") >>> synchro_metadata = {key:project_config[key] for key in SYNCHRO_METADATA_KEYS}) >>> video = VideoMetadata(video_filepath=video_filepath, recording_config_dict=recording_config_dict, ... project_config_dict=project_config, tag = "recording") >>> synchronizer_object = RecordingVideoDownSynchronizer(video_metadata=video, ... output_directory=video_filepath.parent, synchro_metadata=synchro_metadata) >>> marker_detection_filepath, _ = synchronizer_object.run_synchronization() """ @property def target_fps(self) -> int: return self.video_metadata.target_fps @abstractmethod def _adjust_video_to_target_fps_and_run_marker_detection( self, target_fps: int, start_idx: int, offset: float, overwrite_DLC_analysis_and_synchro: bool, synchronize_only: bool ) -> Tuple[Optional[Path], Optional[Path]]: pass @abstractmethod def _create_h5_filepath(self, tag: str = "_rawfps_unsynchronized", filtered: bool = False) -> Path: pass def __init__( self, video_metadata: VideoMetadata, output_directory: Union[Path or str], synchro_metadata: Dict, ) -> None: """ Construct all necessary attributes for class Synchronizer. Parameters ---------- video_metadata: VideoMetadata Metadata from video. output_directory: Path or str Directory, in which the files created during the synchronisation are saved. synchro_metadata: Dict Metadata for synchronisation containing SYNCHRO_METADATA_KEYS. See Also ________ core.utils.SYNCHRO_METADATA_KEYS core.utils.KEYS_TO_CHECK_PROJECT Notes _____ The keys in synchro_metadata are from project config. For explanation of these attributes, we refer to KEYS_TO_CHECK_PROJECT. """ self.led_timeseries_for_cross_video_validation, self.led_detection = None, None self.led_timeseries, self.template_blinking_motif = None, None self.video_metadata = video_metadata self.use_rapid_aligner = bool(synchro_metadata["rapid_aligner_path"]) if self.use_rapid_aligner: self.rapid_aligner_path = convert_to_path(synchro_metadata["rapid_aligner_path"]) self.output_directory = convert_to_path(output_directory) self.use_gpu = synchro_metadata["use_gpu"] self.led_box_size = synchro_metadata["led_box_size"] self.synchro_metadata = synchro_metadata
[docs] def run_synchronization( self, synchronize_only: bool=False, overwrite_DLC_analysis_and_synchro: bool = False, verbose: bool = True ) -> Tuple[Optional[Path], Optional[Path]]: """ Run alignment between template and detected led blinking pattern. Select frames to adjust framerate. Start marker detection or write video. Parameters ---------- synchronize_only: bool, default False To be used by calibration videos only. If True, then only synchronized videos are created and no marker detection is run. overwrite_DLC_analysis_and_synchro: bool, default False If True (default False), then pre-existing DLC files and synchronisations will be overwritten during analysis. verbose: bool, default True If True (default), then the number of synchronized frames is printed. Returns ------- marker_detection_filepath: Path, optional The path to the synchronised marker prediction file. synchronized_video_filepath: Path, optional The path to the synchronised video file. """ self.template_blinking_motif = construct_template_motif( blinking_patterns_metadata=self.video_metadata.led_pattern) preexisting_output_file = self._get_preexisting_output_filepath() synchro_file = self._get_synchro_filepath() if not overwrite_DLC_analysis_and_synchro and preexisting_output_file.exists(): marker_detection_filepath, synchronized_video_filepath = None, None if preexisting_output_file.suffix == ".h5": marker_detection_filepath, synchronized_video_filepath = preexisting_output_file, None elif preexisting_output_file.suffix == ".mp4": synchronized_video_filepath, marker_detection_filepath = preexisting_output_file, None else: if not overwrite_DLC_analysis_and_synchro and synchro_file.exists(): led_center_coordinates, offset_adjusted_start_idx, remaining_offset, alignment_error = _load_synchro( filepath=synchro_file) else: led_center_coordinates = self._get_led_center_coordinates() self.led_timeseries = self._extract_led_pixel_intensities(led_center_coords=led_center_coordinates) offset_adjusted_start_idx, remaining_offset, alignment_error = self._find_best_match_of_template( template=self.template_blinking_motif, start_time=self.synchro_metadata["start_pattern_match_ms"], end_time=self.synchro_metadata["end_pattern_match_ms"]) if alignment_error > self.synchro_metadata["synchro_error_threshold"]: led_center_coordinates, offset_adjusted_start_idx, remaining_offset, alignment_error = self._handle_synchro_fails() self._plot_led_marker(led_center_coordinates=led_center_coordinates) self.led_timeseries_for_cross_video_validation = \ self._adjust_led_timeseries_for_cross_validation(start_idx=offset_adjusted_start_idx, offset=remaining_offset) self._save_synchro(filepath=synchro_file, led_center_coordinates=led_center_coordinates, offset_adjusted_start_idx=offset_adjusted_start_idx, remaining_offset=remaining_offset, alignment_error=alignment_error) marker_detection_filepath, synchronized_video_filepath = \ self._adjust_video_to_target_fps_and_run_marker_detection(target_fps=self.target_fps, start_idx=offset_adjusted_start_idx, offset=remaining_offset, synchronize_only=synchronize_only, overwrite_DLC_analysis_and_synchro=overwrite_DLC_analysis_and_synchro) self.video_metadata.framenum_synchronized, self.video_metadata.duration_synchronized = \ self._get_framenumber_of_synchronized_files(synchronize_only=synchronize_only, marker_detection_filepath=marker_detection_filepath, synchronized_video_filepath=synchronized_video_filepath, verbose=verbose) return marker_detection_filepath, synchronized_video_filepath
def _get_preexisting_output_filepath(self) -> Path: output_file = None if type(self) == CharucoVideoSynchronizer: output_file = self._construct_video_filepath() elif type(self) == RecordingVideoUpSynchronizer: output_file = self._create_h5_filepath(tag=f"_upsampled{self.target_fps}fps_synchronized", filtered=self.synchro_metadata['use_2D_filter']) elif type(self) == RecordingVideoDownSynchronizer: output_file = self._create_h5_filepath(tag=f"_downsampled{self.target_fps}fps_synchronized", filtered=self.synchro_metadata['use_2D_filter']) return output_file def _get_synchro_filepath(self) -> Path: return self.output_directory.joinpath( f"synchro_{self.video_metadata.recording_date}_{self.video_metadata.cam_id}.p") def _get_led_center_coordinates(self) -> Coordinates: temp_folder = self.output_directory.joinpath("temp") Path.mkdir(temp_folder, exist_ok=True) if self.video_metadata.led_extraction_type == "DLC": video_filepath_out = temp_folder.joinpath( f"{self.video_metadata.recording_date}_{self.video_metadata.cam_id}_LED_detection_samples.mp4" ) if self.video_metadata.calibration: dlc_filepath_out = temp_folder.joinpath( f"{self.video_metadata.recording_date}_{self.video_metadata.cam_id}_LED_detection_predictions.h5" ) else: dlc_filepath_out = temp_folder.joinpath( f"{self.video_metadata.mouse_id}_{self.video_metadata.recording_date}_" f"{self.video_metadata.paradigm}_{self.video_metadata.cam_id}.h5" ) num_frames_to_pick = self.synchro_metadata['num_frames_to_pick'] if num_frames_to_pick > self.video_metadata.framenum: num_frames_to_pick = int(self.video_metadata.framenum / 2) sample_frame_idxs = random.sample(range(self.video_metadata.framenum), num_frames_to_pick, ) selected_frames = [] for idx in sample_frame_idxs: selected_frames.append( iio.v3.imread(self.video_metadata.filepath, index=idx) ) video_array = np.asarray(selected_frames) iio.v3.imwrite(str(video_filepath_out), video_array, fps=1, macro_block_size=1) dlc_interface = DeeplabcutInterface( object_to_analyse=video_filepath_out, output_directory=temp_folder, marker_detection_directory=self.video_metadata.led_extraction_filepath, ) dlc_filepath_out = dlc_interface.analyze_objects(filepath=dlc_filepath_out, filtering=False, use_gpu=self.use_gpu) for file in temp_folder.iterdir(): if file.suffix == ".pickle": dlc_created_picklefile = file dlc_created_picklefile.unlink() df = pd.read_hdf(dlc_filepath_out) x_key, y_key, likelihood_key = ( [key for key in df.keys() if self.synchro_metadata["synchro_marker"] in key and "x" in key], [key for key in df.keys() if self.synchro_metadata["synchro_marker"] in key and "y" in key], [key for key in df.keys() if self.synchro_metadata["synchro_marker"] in key and "likelihood" in key], ) x = int(df.loc[df[likelihood_key].idxmax(), x_key].values) y = int(df.loc[df[likelihood_key].idxmax(), y_key].values) video_filepath_out.unlink() dlc_filepath_out.unlink() elif self.video_metadata.led_extraction_type == "manual": config_filepath = self.video_metadata.led_extraction_filepath if self.video_metadata.calibration: manual_filepath_out = temp_folder.joinpath( f"{self.video_metadata.recording_date}_{self.video_metadata.cam_id}_LED_detection_predictions.h5" ) else: manual_filepath_out = temp_folder.joinpath( f"{self.video_metadata.mouse_id}_{self.video_metadata.recording_date}_{self.video_metadata.paradigm}_{self.video_metadata.cam_id}.h5" ) manual_interface = ManualAnnotation( object_to_analyse=self.video_metadata.filepath, output_directory=self.output_directory, marker_detection_directory=config_filepath, ) manual_interface.analyze_objects(filepath=manual_filepath_out, labels=[self.synchro_metadata["synchro_marker"]], only_first_frame=True) df = pd.read_hdf(manual_filepath_out) x_key, y_key = ( [key for key in df.keys() if self.synchro_metadata["synchro_marker"] in key and "x" in key], [key for key in df.keys() if self.synchro_metadata["synchro_marker"] in key and "y" in key], ) x = int(df.loc[0, x_key].values) y = int(df.loc[0, y_key].values) manual_filepath_out.unlink() else: raise ValueError("For LED extraction only DLC and manual are supported!") temp_folder.rmdir() return Coordinates(y_or_row=y, x_or_column=x) def _extract_led_pixel_intensities( self, led_center_coords: Coordinates ) -> np.ndarray: box_row_indices = _get_start_end_indices_from_center_coord_and_length( center_px=led_center_coords.row, length=self.led_box_size ) box_col_indices = _get_start_end_indices_from_center_coord_and_length( center_px=led_center_coords.column, length=self.led_box_size ) mean_pixel_intensities = self._calculate_mean_pixel_intensities( box_row_indices=box_row_indices, box_col_indices=box_col_indices ) return np.asarray(mean_pixel_intensities) def _calculate_mean_pixel_intensities( self, box_row_indices: Tuple[int, int], box_col_indices: Tuple[int, int] ) -> List[float]: mean_pixel_intensities = [] for frame in iio.v3.imiter(self.video_metadata.filepath): box_mean_intensity = frame[ box_row_indices[0]: box_row_indices[1], box_col_indices[0]: box_col_indices[1], ].mean() mean_pixel_intensities.append(box_mean_intensity) return mean_pixel_intensities def _find_best_match_of_template( self, template: Union[MotifTemplate, MultiMotifTemplate], start_time: int = 0, end_time: int = -1, ) -> Tuple[int, float, Any]: adjusted_motif_timeseries = template.adjust_template_timeseries_to_fps( fps=self.video_metadata.fps ) start_frame_idx = self._get_frame_index_closest_to_time(time=start_time) try: end_frame_idx = self._get_frame_index_closest_to_time(time=end_time) except ValueError: end_frame_idx = -1 # if the given end_time is larger than the length of the corresponding video, # its set to the last frame of the video ( best_match_start_frame_idx, best_match_offset, alignment_error, ) = self._get_start_index_and_offset_of_best_match( adjusted_templates=adjusted_motif_timeseries, start_frame_idx=start_frame_idx, end_frame_idx=end_frame_idx, ) offset_adjusted_start_idx, remaining_offset = _adjust_start_idx_and_offset( start_frame_idx=best_match_start_frame_idx, offset=best_match_offset, fps=self.video_metadata.fps, ) if self.video_metadata.fps > self.video_metadata.target_fps: tag = f"_downsampled{self.video_metadata.target_fps}" else: tag = f"_upsampled{self.video_metadata.target_fps}" if self.video_metadata.calibration: filename_individual = f"{self.video_metadata.recording_date}_{self.video_metadata.cam_id}" \ f"_charuco_synchronization_individual{tag}" else: filename_individual = f"{self.video_metadata.mouse_id}_{self.video_metadata.recording_date}" \ f"_{self.video_metadata.paradigm}_{self.video_metadata.cam_id}" \ f"_synchronization_individual{tag}" self.synchronization_individual = AlignmentPlotIndividual( template=adjusted_motif_timeseries[best_match_offset][0], led_timeseries=self.led_timeseries[best_match_start_frame_idx:], filename=filename_individual, cam_id=self.video_metadata.cam_id, output_directory=self.output_directory, led_box_size=self.led_box_size, alignment_error=alignment_error, ) self.synchronization_individual.create_plot(plot=False, save=True) return offset_adjusted_start_idx, remaining_offset, alignment_error def _get_frame_index_closest_to_time(self, time: int) -> int: time_error_message = ( f"The specified time: {time} is invalid! Both times have to be an integer " "larger than -1, where -1 represents the very last timestamp in the video " "and every other integer (e.g.: 1000) the time in ms. Please be aware, that " '"start_time" has to be larger than "end_time" (with end_time == -1 as only ' "exception) and must be smaller or equal to the total video recording time." ) if time == -1: return -1 else: if time < 0: raise ValueError(time_error_message) framerate = 1000 / self.video_metadata.fps closest_frame_idx = round(time / framerate) if closest_frame_idx >= self.led_timeseries.shape[0]: raise ValueError(time_error_message) return closest_frame_idx def _get_start_index_and_offset_of_best_match( self, adjusted_templates: List[Tuple[np.ndarray, int]], start_frame_idx: int, end_frame_idx: int, ) -> Tuple[Any, int, Any]: lowest_sum_of_squared_error_per_template = [] for template_timeseries, _ in adjusted_templates: alignment_results = self._run_alignment( query=template_timeseries, subject=self.led_timeseries[start_frame_idx:end_frame_idx], ) lowest_sum_of_squared_error_per_template.append(alignment_results.min()) lowest_sum_of_squared_error_per_template = np.asarray( lowest_sum_of_squared_error_per_template ) best_matching_template_index = int( lowest_sum_of_squared_error_per_template.argmin() ) best_alignment_results = self._run_alignment( query=adjusted_templates[best_matching_template_index][0], subject=self.led_timeseries[start_frame_idx:end_frame_idx], ) start_index = best_alignment_results.argmin() return start_index, best_matching_template_index, best_alignment_results.min() def _run_alignment(self, query: np.ndarray, subject: np.ndarray) -> np.ndarray: if self.use_rapid_aligner: alignment_results = self._run_rapid_aligner(query=query, subject=subject) else: alignment_results = _run_cpu_aligner(query=query, subject=subject) return alignment_results def _run_rapid_aligner(self, query: np.ndarray, subject: np.ndarray) -> np.ndarray: import sys sys.path.append(str(self.rapid_aligner_path)) import cupy as cp import rapidAligner as ra subject_timeseries_gpu = cp.asarray(subject) query_timeseries_gpu = cp.asarray(query) alignment_results = ra.ED.zdist( query_timeseries_gpu, subject_timeseries_gpu, mode="fft" ) return alignment_results.get() def _adjust_led_timeseries_for_cross_validation( self, start_idx: int, offset: float ) -> np.ndarray: adjusted_led_timeseries = self.led_timeseries[start_idx:].copy() if self.video_metadata.fps >= self.video_metadata.target_fps: adjusted_led_timeseries = self._downsample_led_timeseries( timeseries=adjusted_led_timeseries, offset=offset ) else: adjusted_led_timeseries = self._upsample_led_timeseries( timeseries=adjusted_led_timeseries, offset=offset ) return adjusted_led_timeseries def _downsample_led_timeseries( self, timeseries: np.ndarray, offset: float ) -> np.ndarray: n_frames_after_downsampling = _compute_fps_adjusted_frame_count( original_n_frames=timeseries.shape[0], original_fps=self.video_metadata.fps, target_fps=self.video_metadata.target_fps, ) original_timestamps = _compute_timestamps( n_frames=timeseries.shape[0], fps=self.video_metadata.fps, offset=offset ) target_timestamps = _compute_timestamps( n_frames=n_frames_after_downsampling, fps=self.video_metadata.target_fps ) frame_idxs_best_matching_timestamps = ( _find_frame_idxs_closest_to_target_timestamps( target_timestamps=target_timestamps, original_timestamps=original_timestamps, ) ) return timeseries[frame_idxs_best_matching_timestamps] def _upsample_led_timeseries( self, timeseries: np.ndarray, offset: float ) -> np.ndarray: len_frame_in_ms = 1 / self.video_metadata.fps * 1000 len_targetframe_in_ms = 1 / self.target_fps * 1000 index_in_ms = np.arange( 0, timeseries.shape[0] * len_frame_in_ms, len_frame_in_ms ) new_indices = np.arange( offset, (timeseries.shape[0] - 1) * len_frame_in_ms, len_targetframe_in_ms ) d = scipy.interpolate.interp1d(index_in_ms, timeseries) upsampled_timeseries = d(new_indices) return upsampled_timeseries def _handle_synchro_fails(self) -> Tuple[Coordinates, Union[int, Any], Union[float, Any], Union[int, Any]]: print(f"Synchronisation failed. Using method " f"{self.synchro_metadata['handle_synchro_fails']} now instead!") if self.synchro_metadata["handle_synchro_fails"] == "repeat": led_center_coordinates = self._get_led_center_coordinates() self.led_timeseries = self._extract_led_pixel_intensities(led_center_coords=led_center_coordinates) offset_adjusted_start_idx, remaining_offset, alignment_error = self._find_best_match_of_template( template=self.template_blinking_motif, start_time=self.synchro_metadata["start_pattern_match_ms"], end_time=self.synchro_metadata["end_pattern_match_ms"]) elif self.synchro_metadata["handle_synchro_fails"] == "default": alignment_error = 0 led_center_coordinates = Coordinates(0, 0) default_offset = self.synchro_metadata["default_offset_ms"] offset_in_framenum = self.video_metadata.fps*default_offset/1000 not_frame_matching_offset = offset_in_framenum % 1 remaining_offset = not_frame_matching_offset * 1000/self.video_metadata.fps offset_adjusted_start_idx = int(offset_in_framenum) elif self.synchro_metadata["handle_synchro_fails"] == "manual": self.video_metadata.led_extraction_type = "manual" led_center_coordinates = self._get_led_center_coordinates() self.led_timeseries = self._extract_led_pixel_intensities(led_center_coords=led_center_coordinates) offset_adjusted_start_idx, remaining_offset, alignment_error = self._find_best_match_of_template( template=self.template_blinking_motif, start_time=self.synchro_metadata["start_pattern_match_ms"], end_time=self.synchro_metadata["end_pattern_match_ms"]) elif self.synchro_metadata["handle_synchro_fails"] == "error": raise ValueError( "Could not synchronize the video. \n" "Make sure, that you chose the right synchronization pattern, \n" "that the LED is visible during the pattern\n" "and that you chose a proper alignment threshold!") else: raise ValueError( "project_config key handle_synchro_fails has to be in ['error', 'manual', 'repeat', 'default']") return led_center_coordinates, offset_adjusted_start_idx, remaining_offset, alignment_error def _plot_led_marker(self, led_center_coordinates: Coordinates) -> None: if self.video_metadata.calibration: led_plot_filename = f"{self.video_metadata.recording_date}_{self.video_metadata.cam_id}" \ f"_charuco_LED_marker" else: led_plot_filename = f"{self.video_metadata.mouse_id}_{self.video_metadata.recording_date}" \ f"_{self.video_metadata.paradigm}_{self.video_metadata.cam_id}_LED_marker" self.led_detection = LEDMarkerPlot( image=iio.v3.imread(self.video_metadata.filepath, index=0), led_center_coordinates=led_center_coordinates, box_size=self.led_box_size, cam_id=self.video_metadata.cam_id, filename=led_plot_filename, output_directory=self.output_directory, ) self.led_detection.create_plot(plot=False, save=True) def _save_synchro(self, filepath: Path, led_center_coordinates: Coordinates, offset_adjusted_start_idx: int, remaining_offset: int, alignment_error: float) -> None: time = datetime.now().strftime("%Y%m%d%H%M%S") synchro_dict = {"filepath": str(self.video_metadata.filepath), "fps": self.video_metadata.fps, "led_center_coordinates": (led_center_coordinates.y, led_center_coordinates.x), "offset_adjusted_start_idx": offset_adjusted_start_idx, "remaining_offset": remaining_offset, "alignment_error": alignment_error, "time": time, "scorer": self.video_metadata.led_extraction_filepath, "synchro_marker": self.synchro_metadata["synchro_marker"], "led_box_size": self.led_box_size, "method": self.video_metadata.led_extraction_type, "pattern": self.video_metadata.led_pattern} with open(filepath, "wb") as file: pickle.dump(synchro_dict, file) def _get_framenumber_of_synchronized_files(self, synchronize_only: bool, marker_detection_filepath: Path, synchronized_video_filepath: Path, verbose: bool = True ) -> Tuple[Any, Any]: if synchronize_only: framenum_synchronized = iio.v2.get_reader(synchronized_video_filepath).count_frames() else: framenum_synchronized = pd.read_hdf(marker_detection_filepath).shape[0] duration_synchronized = (framenum_synchronized / self.video_metadata.target_fps) if verbose: print(f"{self.video_metadata.cam_id} Frames after synchronization: {framenum_synchronized}") return framenum_synchronized, duration_synchronized def _downsample_video( self, start_idx: int, offset: float, target_fps: int = 30, overwrite_video: bool = False, ) -> Path: if not overwrite_video: preexisting_filepath_downsampled_video = self._construct_video_filepath() if preexisting_filepath_downsampled_video.exists(): return preexisting_filepath_downsampled_video frame_idxs_to_sample = self._get_sampling_frame_idxs(start_idx=start_idx, offset=offset, target_fps=target_fps) sampling_frame_idxs_per_part = _split_into_ram_digestable_parts( idxs_of_frames_to_sample=frame_idxs_to_sample, max_ram_digestible_frames=self.video_metadata.max_ram_digestible_frames, ) if len(frame_idxs_to_sample) > 1: filepaths_all_video_parts = self._initiate_iterative_writing_of_individual_video_parts( frame_idxs_to_sample=sampling_frame_idxs_per_part) filepath_downsampled_video = self._concatenate_individual_video_parts_on_disk( filepaths_of_video_parts=filepaths_all_video_parts) _delete_individual_video_parts(filepaths_of_video_parts=filepaths_all_video_parts) elif len(frame_idxs_to_sample) == 1: filepath_downsampled_video = self._write_video_to_disk(idxs_of_frames_to_sample=frame_idxs_to_sample[0], target_fps=target_fps, ) else: print ("Can't synchronize video. Check, whether the file is broken. Unsynchronized video was returned.") filepath_downsampled_video = self.video_metadata.filepath return filepath_downsampled_video def _get_sampling_frame_idxs( self, start_idx: int, offset: float, target_fps: int ) -> List[int]: original_n_frames = self.video_metadata.framenum - start_idx n_frames_after_downsampling = _compute_fps_adjusted_frame_count( original_n_frames=original_n_frames, original_fps=self.video_metadata.fps, target_fps=target_fps, ) original_timestamps = _compute_timestamps( n_frames=original_n_frames, fps=self.video_metadata.fps, offset=offset ) target_timestamps = _compute_timestamps( n_frames=n_frames_after_downsampling, fps=target_fps ) frame_idxs_best_matching_timestamps = ( _find_frame_idxs_closest_to_target_timestamps( target_timestamps=target_timestamps, original_timestamps=original_timestamps, ) ) sampling_frame_idxs = _adjust_frame_idxs_for_synchronization_shift( unadjusted_frame_idxs=frame_idxs_best_matching_timestamps, start_idx=start_idx, ) return sampling_frame_idxs def _initiate_iterative_writing_of_individual_video_parts( self, frame_idxs_to_sample: List[List[int]] ) -> List[Path]: if self.video_metadata.max_cpu_cores_to_pool > 1: available_cpus = mp.cpu_count() if available_cpus > self.video_metadata.max_cpu_cores_to_pool: num_processes = self.video_metadata.max_cpu_cores_to_pool else: num_processes = available_cpus with mp.Pool(num_processes) as p: filepaths_to_all_video_parts = p.map( self._write_videoslice_to_disk_for_multiprocessing, enumerate(frame_idxs_to_sample) ) else: filepaths_to_all_video_parts = [] for idx, idxs_of_frames_to_sample in enumerate(frame_idxs_to_sample): part_id = str(idx).zfill(3) filepath_video_part = self._write_video_to_disk( idxs_of_frames_to_sample=idxs_of_frames_to_sample, target_fps=self.target_fps, part_id=part_id, ) filepaths_to_all_video_parts.append(filepath_video_part) return filepaths_to_all_video_parts def _write_videoslice_to_disk_for_multiprocessing( self, idx_and_idxs_of_frames_to_sample: Tuple ) -> Path: idx, idxs_of_frames_to_sample = idx_and_idxs_of_frames_to_sample part_id = str(idx).zfill(3) filepath_video_part = self._write_video_to_disk( idxs_of_frames_to_sample=idxs_of_frames_to_sample, target_fps=self.target_fps, part_id=part_id, ) return filepath_video_part def _write_video_to_disk( self, idxs_of_frames_to_sample: Union[int, List[int]], target_fps: int, part_id: Optional[str] = None, ) -> Path: selected_frames = [] for i, frame in enumerate(iio.v3.imiter(self.video_metadata.filepath)): if i > idxs_of_frames_to_sample[-1]: break if i in idxs_of_frames_to_sample: selected_frames.append(frame) video_array = np.asarray(selected_frames) filepath_out = self._construct_video_filepath(part_id=part_id) iio.v3.imwrite(filepath_out, video_array, fps=target_fps, macro_block_size=1) self.synchronized_object_filepath = filepath_out return filepath_out def _construct_video_filepath(self, part_id: Optional[int] = None) -> Path: if self.video_metadata.calibration: if part_id is None: filepath = self.output_directory.joinpath( f"{self.video_metadata.recording_date}_{self.video_metadata.cam_id}" f"_synchronized_downsampled{self.target_fps}fps.mp4" ) else: filepath = self.output_directory.joinpath( f"{self.video_metadata.recording_date}_{self.video_metadata.cam_id}" f"_synchronized_part_{part_id}.mp4" ) else: if part_id is None: filepath = self.output_directory.joinpath( f"{self.video_metadata.mouse_id}_{self.video_metadata.recording_date}_" f"{self.video_metadata.paradigm}_{self.video_metadata.cam_id}_synchronized_downsampled" f"{self.target_fps}fps.mp4" ) else: filepath = self.output_directory.joinpath( f"{self.video_metadata.mouse_id}_{self.video_metadata.recording_date}_" f"{self.video_metadata.paradigm}_{self.video_metadata.cam_id}_synchronized_part_{part_id}.mp4" ) return filepath def _concatenate_individual_video_parts_on_disk( self, filepaths_of_video_parts: List[Path] ) -> Path: video_part_streams = [ ffmpeg.input(str(filepath)) for filepath in filepaths_of_video_parts ] filepath_concatenated_video = self._construct_video_filepath(part_id=None) if len(video_part_streams) >= 2: concatenated_video = ffmpeg.concat( video_part_streams[0], video_part_streams[1] ) if len(video_part_streams) >= 3: for part_stream in video_part_streams[2:]: concatenated_video = ffmpeg.concat(concatenated_video, part_stream) output_stream = ffmpeg.output( concatenated_video, filename=str(filepath_concatenated_video) ) else: output_stream = ffmpeg.output( video_part_streams[0], filename=str(filepath_concatenated_video) ) output_stream.run(overwrite_output=True, quiet=True) return filepath_concatenated_video
[docs]class CharucoVideoSynchronizer(Synchronizer): def _create_h5_filepath(self, tag: str = "_rawfps_unsynchronized", filtered: bool = False) -> Path: pass def _adjust_video_to_target_fps_and_run_marker_detection( self, target_fps: int, start_idx: int, offset: float, overwrite_DLC_analysis_and_synchro: bool, synchronize_only: bool = True, ) -> Tuple[None, Path]: return None, self._downsample_video( start_idx=start_idx, offset=offset, target_fps=self.target_fps, overwrite_video=overwrite_DLC_analysis_and_synchro, )
[docs]class RecordingVideoSynchronizer(Synchronizer): def _adjust_video_to_target_fps_and_run_marker_detection(self, target_fps: int, start_idx: int, offset: float, overwrite_DLC_analysis_and_synchro: bool, synchronize_only: bool) -> Tuple[ Optional[Path], Optional[Path]]: pass def _run_deep_lab_cut_for_marker_detection( self, video_filepath: Path, overwrite_DLC_analysis: bool = False ) -> Path: output_filepath = self._create_h5_filepath() if (overwrite_DLC_analysis) or (not output_filepath.exists()): config_filepath = self.video_metadata.processing_filepath dlc_interface = DeeplabcutInterface( object_to_analyse=video_filepath, output_directory=self.output_directory, marker_detection_directory=config_filepath, ) if self.use_gpu=="prevent": use_gpu = "prevent" elif self.use_gpu == "": use_gpu = "" else: use_gpu = "full" dlc_ending = dlc_interface.analyze_objects(filepath=output_filepath, filtering=True, use_gpu=use_gpu) return output_filepath def _run_manual_marker_detection( self, video_filepath: Path, overwrite_DLC_analysis: bool = False ) -> Path: output_filepath = self._create_h5_filepath() if overwrite_DLC_analysis or (not output_filepath.exists()): config_filepath = self.video_metadata.processing_filepath manual_interface = ManualAnnotation( object_to_analyse=video_filepath, output_directory=self.output_directory, marker_detection_directory=config_filepath, ) manual_interface.analyze_objects(filepath=output_filepath.with_suffix(".h5")) return output_filepath def _create_h5_filepath( self, tag: str = "_rawfps_unsynchronized", filtered: bool = False ) -> Path: if filtered: h5_filepath = self.output_directory.joinpath( f"{self.video_metadata.mouse_id}_{self.video_metadata.recording_date}_" f"{self.video_metadata.paradigm}_{self.video_metadata.cam_id}{tag}_filtered.h5" ) else: h5_filepath = self.output_directory.joinpath( f"{self.video_metadata.mouse_id}_{self.video_metadata.recording_date}_" f"{self.video_metadata.paradigm}_{self.video_metadata.cam_id}{tag}.h5" ) return h5_filepath
[docs]class RecordingVideoDownSynchronizer(RecordingVideoSynchronizer): def _adjust_video_to_target_fps_and_run_marker_detection( self, start_idx: int, offset: float, target_fps: int = 30, overwrite_DLC_analysis_and_synchro: bool = False, synchronize_only: bool = False ) -> Tuple[Path, None]: downsynchronized_filepath = self._create_h5_filepath(tag=f"_temp") if self.video_metadata.processing_type == "DLC": full_h5_filepath = self._run_deep_lab_cut_for_marker_detection( video_filepath=self.video_metadata.filepath, overwrite_DLC_analysis=overwrite_DLC_analysis_and_synchro ) if self.synchro_metadata["use_2D_filter"]: full_h5_filepath = self._create_h5_filepath(filtered=True) elif self.video_metadata.processing_type == "manual": full_h5_filepath = self._run_manual_marker_detection( video_filepath=self.video_metadata.filepath, overwrite_DLC_analysis=overwrite_DLC_analysis_and_synchro ) else: raise ValueError("For processing only DLC and manual are supported!") df = pd.read_hdf(full_h5_filepath) frame_idxs_to_sample = self._get_sampling_frame_idxs(start_idx=start_idx, offset=offset, target_fps=target_fps) new_df = df.loc[frame_idxs_to_sample, :] new_df.to_hdf(downsynchronized_filepath, "key") return downsynchronized_filepath, None
[docs]class RecordingVideoUpSynchronizer(RecordingVideoSynchronizer): def _adjust_video_to_target_fps_and_run_marker_detection( self, target_fps: int, start_idx: int, offset: float, overwrite_DLC_analysis_and_synchro: bool, synchronize_only: bool = False, ) -> Tuple[Path, None]: upsynchronized_filepath = self._create_h5_filepath(tag=f"_temp") if (overwrite_DLC_analysis_and_synchro) or (not upsynchronized_filepath.exists()): if self.video_metadata.processing_type == "DLC": full_h5_filepath = self._run_deep_lab_cut_for_marker_detection( video_filepath=self.video_metadata.filepath, overwrite_DLC_analysis=overwrite_DLC_analysis_and_synchro ) if self.synchro_metadata["use_2D_filter"]: full_h5_filepath = self._create_h5_filepath(filtered=True) elif self.video_metadata.processing_type == "manual": full_h5_filepath = self._run_manual_marker_detection( video_filepath=self.video_metadata.filepath, overwrite_DLC_analysis=overwrite_DLC_analysis_and_synchro ) else: raise ValueError("For processing only DLC and manual are supported!") df = pd.read_hdf(full_h5_filepath) len_frame_in_ms = 1 / self.video_metadata.fps * 1000 len_targetframe_in_ms = 1 / self.target_fps * 1000 total_offset_in_ms = start_idx * len_frame_in_ms + offset recording_length = (df.shape[0] - 1) * len_frame_in_ms index_in_ms = df.index * len_frame_in_ms new_indices = np.arange( total_offset_in_ms, recording_length, len_targetframe_in_ms ) d = scipy.interpolate.interp1d(index_in_ms, df, axis=0) upsampled = d(new_indices) new_df = pd.DataFrame(upsampled, columns=df.columns) new_df.to_hdf(str(upsynchronized_filepath), "key") return upsynchronized_filepath, None