Source code for core.triangulation_calibration_module

import itertools as it
from abc import ABC, abstractmethod
from pathlib import Path
from typing import List, Tuple, Dict, Union, Optional, OrderedDict, Any, Set

import aniposelib as ap_lib
import cv2
import numpy as np
import pandas as pd
from moviepy.editor import VideoClip
from numpy import ndarray
from scipy.spatial.transform import Rotation

from .angles_and_distances import (
    add_reprojection_errors_of_all_calibration_validation_markers,
    set_distances_and_angles_for_evaluation,
    load_distances_from_ground_truth,
    add_errors_between_computed_and_ground_truth_distances_for_different_references,
    add_errors_between_computed_and_ground_truth_angles, get_xyz_distance_in_triangulation_space,
)
from .marker_detection import ManualAnnotation, DeeplabcutInterface
from .plotting import (
    AlignmentPlotCrossvalidation,
    PredictionsPlot,
    CalibrationValidationPlot,
    TriangulationVisualization,
    RotationVisualization,
)
from .utils import (
    convert_to_path,
    create_calibration_key,
    read_config,
    check_keys,
    get_multi_index,
    get_3D_df_keys,
    get_3D_array,
    KEYS_TO_CHECK_PROJECT,
    KEYS_TO_CHECK_RECORDING,
    KEYS_TO_CHECK_CAMERA_PROJECT,
    STANDARD_ATTRIBUTES_TRIANGULATION,
    STANDARD_ATTRIBUTES_CALIBRATION,
    SYNCHRO_METADATA_KEYS
)
from .video_interface import VideoInterface
from .video_metadata import VideoMetadata
from .video_synchronization import (
    RecordingVideoDownSynchronizer,
    RecordingVideoUpSynchronizer,
    CharucoVideoSynchronizer,
)


def _get_metadata_from_configs(recording_config_filepath: Path, project_config_filepath: Path) -> \
Tuple[dict, dict]:
    project_config_dict = read_config(path=project_config_filepath)
    recording_config_dict = read_config(path=recording_config_filepath)

    missing_keys_project = check_keys(
        dictionary=project_config_dict, list_of_keys=KEYS_TO_CHECK_PROJECT
    )
    if missing_keys_project:
        raise KeyError(
            f"Missing metadata information in the project_config_file"
            f" {project_config_filepath} for {missing_keys_project}."
        )
    missing_keys_recording = check_keys(
        dictionary=recording_config_dict, list_of_keys=KEYS_TO_CHECK_RECORDING
    )
    if missing_keys_recording:
        raise KeyError(
            f"Missing information for {missing_keys_recording} "
            f"in the config_file {recording_config_filepath}!"
        )

    for dictionary_key in KEYS_TO_CHECK_CAMERA_PROJECT:
        cameras_with_missing_keys = check_keys(
            dictionary=project_config_dict[dictionary_key],
            list_of_keys=project_config_dict["valid_cam_ids"],
        )
        if cameras_with_missing_keys:
            raise KeyError(
                f"Missing information {dictionary_key} for cam {cameras_with_missing_keys} "
                f"in the config_file {project_config_filepath}!"
            )
    return recording_config_dict, project_config_dict


def _validate_metadata(metadata_from_videos: Dict,
                       attributes_to_check: List[str]) -> \
        Tuple[Any, ...]:
    sets_of_attributes = []
    for attribute_to_check in attributes_to_check:
        set_of_attribute = set(getattr(video_metadata, attribute_to_check)
                               for video_metadata in metadata_from_videos.values()
                               )
        sets_of_attributes.append(set_of_attribute)
    attribute: Set[str]
    for attribute in sets_of_attributes:
        if len(attribute) > 1:
            raise ValueError(
                f"The filenames of the calibration_validation images "
                f"give different metadata! Reasons could be:\n"
                f"  - image belongs to another calibration\n"
                f"  - image filename is valid, but wrong\n"
                f"You should run the filename_checker before to avoid such Errors!"
            )
    return tuple(list(set_of_attribute)[0] for set_of_attribute in sets_of_attributes)


def _exclude_by_framenum(metadata_from_videos: Dict[str, VideoMetadata], allowed_num_diverging_frames: int) -> List[Any]:
    synch_framenum_median = np.median(
        [video_metadata.framenum_synchronized for video_metadata in metadata_from_videos.values()])

    videos_to_exclude = []
    for video_metadata in metadata_from_videos.values():
        if video_metadata.framenum_synchronized < (
                synch_framenum_median - allowed_num_diverging_frames) or video_metadata.framenum_synchronized > (
                synch_framenum_median + allowed_num_diverging_frames):
            video_metadata.exclusion_state = "exclude"
            videos_to_exclude.append(video_metadata.cam_id)
    if videos_to_exclude:
        print(f"{videos_to_exclude} were excluded!")
    return videos_to_exclude


def _create_output_directory(project_config_filepath: Path) -> Path:
    unnamed_idx = 0
    for file in project_config_filepath.parent.iterdir():
        if str(file.name).startswith("unnamed_calibration_"):
            idx = int(file.stem[20:])
            if idx > unnamed_idx:
                unnamed_idx = idx
    output_directory = project_config_filepath.parent.joinpath(
        f"unnamed_calibration_{str(unnamed_idx + 1)}"
    )
    if not output_directory.exists():
        Path.mkdir(output_directory)
    return output_directory


def _check_output_directory(project_config_filepath: Path,
                            output_directory: Optional[Path] = None) -> Path:
    if output_directory is not None:
        if not output_directory.exists():
            output_directory = _create_output_directory(
                project_config_filepath=project_config_filepath
            )
    else:
        output_directory = _create_output_directory(
            project_config_filepath=project_config_filepath
        )
    return output_directory


def _create_video_objects(
        directory: Path,
        recording_config_dict: Dict,
        project_config_dict: Dict,
        videometadata_tag: str,
        output_directory: Path,
        filetypes: List[str],
        filename_tag: str = "",
        recreate_undistorted_plots: bool = True,
) -> Tuple[Dict, Dict]:
    videofiles = [file for file in directory.iterdir() if filename_tag.lower() in file.name.lower()
                  and "synchronized" not in file.name and file.suffix in filetypes]

    video_interfaces = {}
    metadata_from_videos = {}
    for filepath in videofiles:
        video_metadata = VideoMetadata(
            video_filepath=filepath,
            recording_config_dict=recording_config_dict,
            project_config_dict=project_config_dict,
            tag=videometadata_tag,
        )

        video_interfaces[video_metadata.cam_id] = VideoInterface(
            video_metadata=video_metadata,
            output_dir=output_directory,
            recreate_undistorted_plots=recreate_undistorted_plots,
        )
        metadata_from_videos[video_metadata.cam_id] = video_metadata
    return video_interfaces, metadata_from_videos


def _initialize_camera_group(camera_objects: List) -> ap_lib.cameras.CameraGroup:
    return ap_lib.cameras.CameraGroup(camera_objects)


def _add_missing_marker_ids_to_prediction(
        missing_marker_ids: List[str], df: pd.DataFrame(), framenum: int = 1
) -> pd.DataFrame():
    try:
        scorer = list(df.columns)[0][0]
    except IndexError:
        scorer = "zero_likelihood_markers"
    for marker_id in missing_marker_ids:
        for key in ["x", "y", "likelihood"]:
            df.loc[[i for i in range(framenum)], (scorer, marker_id, key)] = 0
    return df


def _find_non_matching_list_elements(list1: List[str], list2: List[str]) -> List[str]:
    return [marker_id for marker_id in list1 if marker_id not in list2]


def _get_duplicate_elems_in_list(list1: List[str]) -> List[str]:
    individual_elems, duplicate_elems = [], []
    for elem in list1:
        if elem in individual_elems:
            duplicate_elems.append(elem)
        else:
            individual_elems.append(elem)


def _remove_marker_ids_not_in_ground_truth(
        marker_ids_to_remove: List[str], df: pd.DataFrame()
) -> pd.DataFrame():
    columns_to_remove = [column_name for column_name in df.columns if
                         column_name[1] in marker_ids_to_remove]
    return df.drop(columns=columns_to_remove)


def _save_dataframe_as_csv(filepath: Union[str, Path], df: pd.DataFrame) -> None:
    filepath = convert_to_path(filepath)
    if filepath.exists():
        filepath.unlink()
    df.to_csv(filepath, index=False)
    

def _get_best_frame_for_normalisation(config: Dict, df: pd.DataFrame) -> int:
    all_normalization_markers = [config['CENTER']]
    for marker in config["REFERENCE_LENGTH_MARKERS"]:
        all_normalization_markers.append(marker)
    for marker in config["REFERENCE_ROTATION_MARKERS"]:
        all_normalization_markers.append(marker)
    if "EQUAL_LENGTHS_GROUND_TRUTH" in config:
        for equal_length in config["EQUAL_LENGTHS_GROUND_TRUTH"]:
            for length in equal_length:
                for marker in length:
                    all_normalization_markers.append(marker)
    all_normalization_markers = set(all_normalization_markers)
    normalization_keys_nested = [get_3D_df_keys(marker) for marker in all_normalization_markers]
    normalization_keys = list(set(it.chain(*normalization_keys_nested)))
    df_normalization_keys = df.loc[:, normalization_keys]
    valid_frames_for_normalization = list(df_normalization_keys.dropna(axis=0).index)

    if valid_frames_for_normalization:
        if "EQUAL_LENGTHS_GROUND_TRUTH" in config:
            list_of_ground_truths = []
            for equal_length in config["EQUAL_LENGTHS_GROUND_TRUTH"]:
                length_a, length_b = equal_length[0], equal_length[1]
                a = get_xyz_distance_in_triangulation_space((length_a[0], length_a[1]), df)
                b = get_xyz_distance_in_triangulation_space((length_b[0], length_b[1]), df)
                list_of_ground_truths.append(abs((a-b))/abs((a+b)))
            result = sum(list_of_ground_truths)
            best_frame = np.argmin(result)
            return best_frame
        else:
            return valid_frames_for_normalization[0]
    else:
        raise ValueError("Could not normalize the dataframe!")


[docs]class Calibration: """ A class, in which videos are calibrated to each other. Temporal synchronization of the videos can be performed based on a pattern. Spatial calibration is performed using aniposelib (ap_lib) with additional methods to validate the calibration based on known ground_truth. Parameters __________ calibration_directory: Path or string Directory, where the calibration videos are stored. project_config_filepath: Path or string Filepath to the project_config .yaml file. recording_config_filepath: Path or string Filepath to the recording_config .yaml file. output_directory: Path or string, optional Directory, in which the files created during the analysis are saved. Per default it will be set the same as the calibration_directory. recreate_undistorted_plots: bool, default True If True (default), then preexisting undistorted plots will be overwritten. Attributes __________ project_config_filepath: Path Filepath to the project_config .yaml file. output_directory: Path Directory, in which the files created during the analysis are saved. camera_group: ap_lib.cameras.CameraGroup Group of ap_lib.cameras.Camera objects. camera_objects: List of ap_lib.cameras.Camera objects. synchronized_charuco_videofiles: {str: Path} Dict of synchronized calibration video per camera. video_interfaces: {str: VideoInterface} Dict of VideoInterface objects for all calibration videos. metadata_from_videos: {str: VideoMetadata} Dict of VideoMetadata objects for all calibration videos. valid_videos: list of str Videos, that were found in the calibration_directory and not excluded due to synchronization issues. cams_to_exclude: list of str Videos, that were excluded due to synchronization issues. recording_date: str Date at which the calibration was done. target_fps: int Fps rate, to which the videos should be synchronized. led_pattern: dict Blinking pattern to use for temporal synchronisation. calibration_index: int Index of a calibration. Together with recording_date, it creates a unique calibration key. calibration_tag: str Filename tag to search for in the files in calibration_directory. reprojerr: float Reprojection error (px) returned by ap_lib calibration. report_filepath: Path Filepath to the report .csv for calibration optimisation. allowed_num_diverging_frames: int Difference of framenumber to the framenumber median of all synchronised videos, that is allowed before a video has to be excluded. synchro_metadata: dict Dictionary used as input for synchronizer objects. synchronization_individuals: list of AlignmentPlotIndividual Container for the AlignmentPlotIndividual objects of each synchronized video. led_detection_individuals: list of LEDMarkerPlot Container for the LEDMarkerPlot objects of each synchronized video. Methods _______ run_synchronization(overwrite_synchronisations, verbose) Perform synchronization of all videos to the led_pattern and downsampling to target_fps. run_calibration(use_own_intrinsic_calibration, charuco_calibration_board, iteration, verbose, overwrite_calibrations) Call ap_lib calibrate function. calibrate_optimal(calibration_validation, max_iters, p_threshold, angle_threshold, verbose, overwrite_calibrations) Call run_calibration repeatedly and validate the quality of the resulting calibration on calibration_validation images and ground_truth. References __________ [1] Karashchuk, P., Rupp, K. L., Dickinson, E. S., et al. (2021). Anipose: A toolkit for robust markerless 3D pose estimation. Cell reports, 36(13), 109730. https://doi.org/10.1016/j.celrep.2021.109730 See Also ________ TriangulationRecordings: A class, in which videos are triangulated based on a calibration file. CalibrationValidation: A class, in which images are triangulated based on a calibration file and the triangulated coordinates are validated based on a ground_truth. core.checker_objects.CheckCalibration: A class, that checks the metadata and filenames of videos in a given folder and allows for filename changing via user input. core.meta.MetaInterface.create_calibrations: Create Calibration objects for all calibration_directories added to MetaInterface. core.meta.MetaInterface.synchronize_calibrations: Run the function run_synchronization for all calibration objects added to MetaInterface. core.meta.MetaInterface.calibrate: Run the function run_calibration or calibrate_optimal for all calibration objects added to MetaInterface. Examples ________ >>> from pathlib import Path >>> from core.triangulation_calibration_module import Calibration >>> rec_config = Path( ... "test_data/Server_structure/Calibrations/220922/recording_config_220922.yaml" ... ) >>> calibration_object = Calibration( ... calibration_directory=rec_config.parent, ... recording_config_filepath=rec_config, ... project_config_filepath="test_data/project_config.yaml", ... output_directory=rec_config.parent, ... ) >>> calibration_object.run_synchronization() >>> calibration_object.run_calibration(verbose=2) """ def __init__( self, calibration_directory: Union[Path, str], project_config_filepath: Union[Path, str], recording_config_filepath: Union[Path, str], output_directory: Optional[Union[Path, str]] = None, recreate_undistorted_plots: bool = True, ) -> None: """ Construct all necessary attributes for the Calibration class. Read the metadata from project-/recording config and from video filenames. Create representations of the videos inside the given calibration_directory. Parameters ---------- calibration_directory: Path or string Directory, where the calibration videos are stored. project_config_filepath: Path or string Filepath to the project_config .yaml file. recording_config_filepath: Path or string Filepath to the recording_config .yaml file. output_directory: Path or string, optional Directory, in which the files created during the analysis are saved. Per default it will be set the same as the calibration_directory. recreate_undistorted_plots: bool, default True If True (default), then preexisting undistorted plots will be overwritten. """ for attribute in STANDARD_ATTRIBUTES_CALIBRATION: setattr(self, attribute, None) self.calibration_directory = convert_to_path(calibration_directory) project_config_filepath = convert_to_path(project_config_filepath) recording_config_filepath = convert_to_path(recording_config_filepath) output_directory = convert_to_path(output_directory) self.output_directory = _check_output_directory(output_directory=output_directory, project_config_filepath=project_config_filepath) recording_config_dict, project_config_dict = _get_metadata_from_configs( recording_config_filepath=recording_config_filepath, project_config_filepath=project_config_filepath, ) self.synchro_metadata = {key: project_config_dict[key] for key in SYNCHRO_METADATA_KEYS} for attribute in ["calibration_tag", "allowed_num_diverging_frames"]: setattr(self, attribute, project_config_dict[attribute]) for attribute in ["recording_date", "led_pattern", "calibration_index", "target_fps"]: setattr(self, attribute, recording_config_dict[attribute]) self.recording_date = str(self.recording_date) self.calibration_index = str(self.calibration_index) self.video_interfaces, self.metadata_from_videos = _create_video_objects( directory=self.calibration_directory, recording_config_dict=recording_config_dict, project_config_dict=project_config_dict, videometadata_tag="calibration", output_directory=self.output_directory, filename_tag=self.calibration_tag, filetypes=[".AVI", ".avi", ".mov", ".mp4"], recreate_undistorted_plots=recreate_undistorted_plots, ) self.recording_date, *_ = _validate_metadata(metadata_from_videos=self.metadata_from_videos, attributes_to_check=['recording_date']) self.target_fps = min( [video_metadata.fps for video_metadata in self.metadata_from_videos.values()]) # limits target_fps to fps of the slowest video for video_metadata in self.metadata_from_videos.values(): video_metadata.target_fps = self.target_fps
[docs] def run_synchronization(self, overwrite_synchronisations: bool = False, verbose: bool = True) -> None: """ Perform synchronization of all videos to the led_pattern and downsampling to target_fps. Call the synchronizer via VideoInterface and save the synchronized_video_filepaths. Create a plot for crossvalidation of the synchronised LED timeseries. Exclude videos, if there are any duplicates in camera names or diverging framenumbers after synchronization. Parameters ---------- overwrite_synchronisations: bool, default False If True (default False), then pre-existing synchronisations will be overwritten during analysis. verbose: bool, default True If True (default), then Crossvalidation plot and synchronised number of frames for each camera are printed. """ self.synchronized_charuco_videofiles = {} camera_objects_unexcluded = [] for video_interface in self.video_interfaces.values(): video_interface.run_synchronizer( synchronizer=CharucoVideoSynchronizer, output_directory=self.output_directory, synchronize_only=True, overwrite_DLC_analysis_and_synchro=overwrite_synchronisations, synchro_metadata=self.synchro_metadata, verbose=verbose ) self.synchronized_charuco_videofiles[ video_interface.video_metadata.cam_id ] = str(video_interface.synchronized_video_filepath) camera_objects_unexcluded.append(video_interface.export_for_aniposelib()) camera_objects_unexcluded.sort(key=lambda x: x.name, reverse=False) self._plot_synchro_crossvalidation(verbose=verbose) cameras = [camera_object.name for camera_object in camera_objects_unexcluded] duplicate_cams = _get_duplicate_elems_in_list(cameras) if duplicate_cams: raise ValueError( f"You added multiple cameras with the cam_id {duplicate_cams}, " "however, all cam_ids must be unique! Please check for duplicates " "in the calibration directory and rename them!" ) self.cams_to_exclude = _exclude_by_framenum(metadata_from_videos=self.metadata_from_videos, allowed_num_diverging_frames=self.allowed_num_diverging_frames) self.valid_videos = [cam.name for cam in camera_objects_unexcluded if cam.name not in self.cams_to_exclude] self.camera_objects = [cam for cam in camera_objects_unexcluded if cam.name not in self.cams_to_exclude] for cam in self.cams_to_exclude: self.synchronized_charuco_videofiles.pop(cam) self.camera_group = _initialize_camera_group(camera_objects=self.camera_objects)
[docs] def run_calibration( self, use_own_intrinsic_calibration: bool = True, verbose: int = 0, charuco_calibration_board: Optional[ap_lib.boards.CharucoBoard] = None, overwrite_calibrations: bool = True, iteration: Optional[int] = None, ) -> Path: """ Call ap_lib calibrate function. Create a filename for the calibration file. Pass videos to ap_lib.cameras.camera_group.calibrate_videos function. Parameters ---------- use_own_intrinsic_calibration: bool, default True If True (default), then the externally created intrinsic calibrations are passed to the calibrate function. Otherwise, the ap_lib built-in intrinsic calibration is used. verbose: int, default 0 Show ap_lib output if > 1 or no output if <= 1. charuco_calibration_board: ap_lib.boards.CharucoBoard, optional Specify the board, that was used in the calibration videos. overwrite_calibrations: bool, default True If True (default), then pre-existing calibrations will be overwritten. iteration: int, optional Variable to be included into the filename to make the filepath of calibration files unique for repeated calibrations. Returns ------- calibration_filepath: Path """ calibration_key = create_calibration_key(videos=self.valid_videos, recording_date=self.recording_date, calibration_index=self.calibration_index, iteration=iteration) filename = f"{calibration_key}.toml" calibration_filepath = self.output_directory.joinpath(filename) if (overwrite_calibrations) or (not overwrite_calibrations and not calibration_filepath.exists() ): if charuco_calibration_board is None: aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250) charuco_calibration_board = ap_lib.boards.CharucoBoard( 7, 5, square_length=1, marker_length=0.8, marker_bits=6, aruco_dict=aruco_dict, ) sorted_videos = OrderedDict(sorted(self.synchronized_charuco_videofiles.items())) videos = [[video] for video in sorted_videos.values()] self.reprojerr, _ = self.camera_group.calibrate_videos( videos=videos, board=charuco_calibration_board, init_intrinsics=not use_own_intrinsic_calibration, init_extrinsics=True, verbose=verbose > 1, ) self._save_calibration(calibration_filepath=calibration_filepath, camera_group=self.camera_group) else: self.reprojerr = 0 return calibration_filepath
def _save_calibration(self, calibration_filepath: Path, camera_group: ap_lib.cameras.CameraGroup) -> None: if calibration_filepath.exists(): calibration_filepath.unlink() camera_group.dump(calibration_filepath)
[docs] def calibrate_optimal( self, calibration_validation: "CalibrationValidation", max_iters: int = 5, p_threshold: float = 0.1, angle_threshold: float = 5., verbose: int = 1, overwrite_calibrations: bool = True, ): """ Call run_calibration repeatedly and validates the quality of the resulting calibration on calibration_validation images and ground_truth. Repeat calibration until a good calibration is reached or max_iters is superceded. Check whether the triangulated data in calibration_validation matches the ground_truth data p_threshold and angle_threshold. Create a report file in which the metadata to the calibration of each iteration is specified. Parameters ---------- calibration_validation: CalibrationValidation Object, containing images, triangulated data and ground_truth information for calibration validation. max_iters: int, default 5 Number of iterations allowed to find a good calibration. p_threshold: float, default 0.1 Threshold for errors in the triangulated distances compared to ground truth (mean distances in percent). angle_threshold: float, default 5 Threshold for errors in the triangulated angles compared to ground truth (mean angles in degrees). verbose: int, default 1 Show ap_lib output if > 1, calibration_validation output if > 0 or no output if < 1. overwrite_calibrations: bool, default True If True (default), then pre-existing calibrations will be overwritten. Returns ------- calibration_filepath: Path The filepath to the optimal calibration of if no good calibration was reached during iteration, the filepath of the last calibration. """ report = pd.DataFrame() calibration_found = False calibration_key = create_calibration_key(videos=self.valid_videos, recording_date=self.recording_date, calibration_index=self.calibration_index) good_calibration_filepath = self.output_directory.joinpath(f"{calibration_key}.toml") calibration_filepath = None for cal in range(max_iters): if good_calibration_filepath.exists() and not overwrite_calibrations: calibration_filepath = good_calibration_filepath self.reprojerr = 0 else: calibration_filepath = self.run_calibration(verbose=verbose, overwrite_calibrations=overwrite_calibrations, iteration=cal) calibration_validation.run_triangulation(calibration_toml_filepath=calibration_filepath) mean_dist_err_percentage, mean_angle_err, reprojerr_nonan_mean = \ calibration_validation.evaluate_triangulation_of_calibration_validation_markers(verbose=bool(verbose > 2), show_3D_plot=bool(verbose)) if verbose: print( f"Calibration {cal}\n mean percentage error: {mean_dist_err_percentage}\n " f"mean angle error: {mean_angle_err}\n " f"ap_lib reprojection error: {self.reprojerr}\n " f'calvin mean reprojection error: {calibration_validation.anipose_io["reproj_nonan"].mean()}') report.loc[cal, "key"] = str(calibration_filepath) report.loc[cal, "num_cams"] = len(self.camera_objects) report.loc[cal, "cams_to_exclude"] = str(self.cams_to_exclude) report.loc[cal, "mean_distance_error_percentage"] = mean_dist_err_percentage report.loc[cal, "mean_angle_error"] = mean_angle_err report.loc[cal, "mean_reprojerror_calvin"] = reprojerr_nonan_mean report.loc[cal, "ap_lib_reprojerr"] = self.reprojerr if (mean_dist_err_percentage < p_threshold and mean_angle_err < angle_threshold): calibration_found = True calibration_filepath.rename(good_calibration_filepath) calibration_filepath = good_calibration_filepath if verbose: print( f"Good Calibration reached at iteration {cal}!\n" f"Named it {good_calibration_filepath}.") break self.report_filepath = self.output_directory.joinpath( f"{self.recording_date}_calibration_report.csv") if overwrite_calibrations or not self.report_filepath.exists(): report.to_csv(self.report_filepath, index=False) if not calibration_found: if verbose > 0: print( "No optimal calibration found with given thresholds! Returned last executed calibration!") return calibration_filepath
def _plot_synchro_crossvalidation(self, verbose: bool = True) -> None: template = list(self.video_interfaces.values())[ 0].synchronizer_object.template_blinking_motif.adjust_template_timeseries_to_fps( fps=self.target_fps)[0][0] led_timeseries_crossvalidation = {} for video_interface in self.video_interfaces.values(): if video_interface.synchronizer_object.led_timeseries_for_cross_video_validation is not None: led_timeseries_crossvalidation[ video_interface.video_metadata.cam_id ] = video_interface.synchronizer_object.led_timeseries_for_cross_video_validation if list(led_timeseries_crossvalidation.keys()): filename = f'{self.recording_date}_charuco_synchronization_crossvalidation_{self.target_fps}fps' synchronization_crossvalidation = AlignmentPlotCrossvalidation( template=template, led_timeseries=led_timeseries_crossvalidation, output_directory=self.output_directory, filename=filename, ) synchronization_crossvalidation.create_plot(save=True, plot=verbose)
[docs]class Triangulation(ABC): """ Parent class, for triangulation of videos or images. Triangulation is performed using aniposelib (ap_lib). Parameters ---------- project_config_filepath: Path or string Filepath to the project_config .yaml file. directory: Path or string Directory, where the videos or images are stored. recording_config_filepath: Path or string Filepath to the recording_config .yaml file. recreate_undistorted_plots: bool, default True If True (default), then preexisting undistorted plots will be overwritten. output_directory: Path or string, optional Directory, in which the files created during the analysis are saved. Per default it will be set the same as the directory. Attributes __________ project_config_filepath: Path Filepath to the project_config .yaml file. output_directory: Path Directory, in which the files created during the analysis are saved. video_interfaces: {str: VideoInterface} Dict of VideoInterface objects for all videos or images. metadata_from_videos: {str: VideoMetadata} Dict of VideoMetadata objects for all videos or images. triangulation_dlc_cams_filepaths: {str: Path} Containing the filepath to the predictions for each camera. csv_output_filepath: Path Filepath, were the triangulated dataframe should be saved. recording_date: str Date at which the calibration was done based on recording_config and as read from the filenames. calibration_index: int Index of a calibration. Together with recording_date, it creates a unique calibration key. target_fps: int Fps rate, to which the videos should be synchronized. led_pattern: dict Blinking pattern to use for temporal synchronisation. mouse_id: str Only defined in TriangulationRecordings. The mouse_id as read from the filenames. paradigm: str Only defined in TriangulationRecordings. The paradigm as read from the filenames. cams_to_exclude: list of str Videos, that were excluded due to synchronization issues. all_cameras: list of str All camera names, that are stored in the camera_group. self.camera_group: ap_lib.cameras.CameraGroup Group of cameras loaded from calibration_toml_filepath. anipose_io: dict Containing information for ap_lib functions, such as points_flat, p3ds, n_joints, reprojerr, as well as distances and angles to validate calibration in comparison with ground truth. markers: list of str All markers that will be triangulated. normalised_dataframe: bool, default False If True (default False), then the dataframe was normalised based on input from normalisation config. markers_excluded_manually: bool, default False If True (default False), then markers were excluded from predictions based on marker exclusion config. rotated_filepath: Path Filepath, were the rotated triangulated dataframe is saved. video_plotting_config: dict Containing information from video_plotting config to create 3D videos. synchronization_individuals: list of AlignmentPlotIndividual Only defined in TriangulationRecordings. Container for the AlignmentPlotIndividual objects of each synchronized video. led_detection_individuals: list of LEDMarkerPlot Only defined in TriangulationRecordings. Container for the LEDMarkerPlot objects of each synchronized video. synchro_metadata: dict Only used in TriangulationRecordings. Dictionary used as input for synchronizer objects. allowed_num_diverging_frames: int Only used in TriangulationRecordings. Difference of framenumber to the framenumber median of all synchronised videos, that is allowed before a video has to be excluded. ground_truth_config: dict Only defined in CalibrationValidation to compare the triangulated data to ground truth data. calibration_validation_tag: str Only used in CalibrationValidation. Filename tag to search for in the filenames in directory. _allowed_filetypes: list of str Abstract property, specify what file endings to look for in directory. triangulation_visualization: TriangulationVisualization Object to create plots for triangulation video. video_start_s: int The second, in which the triangulation video starts. Methods _______ run_triangulation(calibration_toml_filepath, triangulate_full_recording): Load and validate the calibration, triangulate and create 3D df. exclude_marker(all_markers_to_exclude_config_path, verbose): Exclude markers in prediction based on markers_to_exclude config. _get_dataframe_of_triangulated_points(anipose_io): Combine the triangulated data from anipose_io to a 3D df. References __________ [1] Karashchuk, P., Rupp, K. L., Dickinson, E. S., et al. (2021). Anipose: A toolkit for robust markerless 3D pose estimation. Cell reports, 36(13), 109730. https://doi.org/10.1016/j.celrep.2021.109730 See Also ________ TriangulationRecordings: Subclass of Triangulation, in which videos are triangulated based on a calibration file. CalibrationValidation: Subclass of Triangulation, in which images are triangulated based on a calibration file and the triangulated coordinates are validated based on a ground_truth. Calibration: A class, in which videos are calibrated to each other. core.meta.MetaInterface.triangulate_recordings: Run the function run_triangulation for all TriangulationRecording objects added to MetaInterface. Calibration.triangulate_optim: Run the function run_triangulation for the CalibrationValidation object passed to triangulate_optim. """ @abstractmethod def _create_csv_filepath(self) -> Path: pass @property @abstractmethod def _metadata_keys(self) -> List[str]: pass @property @abstractmethod def _allowed_filetypes(self) -> List[str]: pass @property @abstractmethod def _videometadata_tag(self) -> str: pass def __init__(self, project_config_filepath: Union[Path, str], directory: Union[Path, str], recording_config_filepath: Union[Path, str], recreate_undistorted_plots: bool = True, output_directory: Optional[Union[Path, str]] = None): """ Construct all necessary attributes for the Triangulation Class. Read the metadata from project-/recording config and from video filenames. Create csv_output_filepath based on the metadata. Create representations of the videos inside the given directory. Parameters ---------- project_config_filepath: Path or string Filepath to the project_config .yaml file. directory: Path or string Directory, where the videos or images are stored. recording_config_filepath: Path or string Filepath to the recording_config .yaml file. recreate_undistorted_plots: bool, default True If True (default), then preexisting undistorted plots will be overwritten. output_directory: Path or string, optional Directory, in which the files created during the analysis are saved. Per default it will be set the same as the directory. """ for attribute in STANDARD_ATTRIBUTES_TRIANGULATION: setattr(self, attribute, None) self.directory = convert_to_path(directory) project_config_filepath = convert_to_path(project_config_filepath) recording_config_filepath = convert_to_path(recording_config_filepath) output_directory = convert_to_path(output_directory) self.output_directory = _check_output_directory(output_directory=output_directory, project_config_filepath=project_config_filepath) recording_config_dict, project_config_dict = _get_metadata_from_configs( recording_config_filepath=recording_config_filepath, project_config_filepath=project_config_filepath, ) self.synchro_metadata = {key: project_config_dict[key] for key in SYNCHRO_METADATA_KEYS} for attribute in ["use_gpu", "calibration_validation_tag", "score_threshold", "triangulation_type", "allowed_num_diverging_frames"]: setattr(self, attribute, project_config_dict[attribute]) for attribute in ["recording_date", "led_pattern", "target_fps", "calibration_index"]: setattr(self, attribute, recording_config_dict[attribute]) self.recording_date = str(self.recording_date) self.calibration_index = str(self.calibration_index) self.video_interfaces, self.metadata_from_videos = _create_video_objects( directory=self.directory, recording_config_dict=recording_config_dict, project_config_dict=project_config_dict, videometadata_tag=self._videometadata_tag, output_directory=self.output_directory, filename_tag=self.calibration_validation_tag if self._videometadata_tag == "calvin" else "", recreate_undistorted_plots=recreate_undistorted_plots, filetypes=self._allowed_filetypes, ) metadata = _validate_metadata(metadata_from_videos=self.metadata_from_videos, attributes_to_check=self._metadata_keys) for attribute, value in zip(self._metadata_keys, metadata): setattr(self, attribute, value)
[docs] def run_triangulation( self, calibration_toml_filepath: Union[Path, str], triangulate_full_recording: bool = True, use_preexisting_csvs: bool = False ) -> None: """ Load and validate the calibration, triangulate and create 3D df. Validate, that the camera names in camera_group match triangulation_dlc_cams_filepaths and drop or add empty files if they don't. Triangulate using different options as defined in the project_config via ap_lib functions. Parameters ---------- calibration_toml_filepath: Path or str Filepath to the calibration, that should be used for triangulation. triangulate_full_recording: bool, default True If False (default True), then only the first 2 frames of the recording will be triangulated and the 3D dataframe won't be saved. use_preexisting_csvs: bool, default False If True (default False), then a already existing file at csv_output_filepath will be read in and no triangulatin will be performed. """ self.csv_output_filepath = self._create_csv_filepath() calibration_toml_filepath = convert_to_path(calibration_toml_filepath) self.camera_group = self._load_calibration(filepath=calibration_toml_filepath) filepath_keys = list(self.triangulation_dlc_cams_filepaths.keys()) filepath_keys.sort() self.all_cameras = [camera.name for camera in self.camera_group.cameras] self.all_cameras.sort() missing_cams_in_all_cameras = _find_non_matching_list_elements(filepath_keys, self.all_cameras) missing_cams_in_filepath_keys = _find_non_matching_list_elements(self.all_cameras, filepath_keys) if len(self.all_cameras)-len(missing_cams_in_filepath_keys) < 2: print("All videos had to be excluded!") raise IndexError("The exclusion state for all cameras is 'exclude'. " "At least two cameras are necessary to perform the triangulation!") if missing_cams_in_filepath_keys: min_framenum = min([pd.read_hdf(path).shape[0] for path in self.triangulation_dlc_cams_filepaths.values()]) self._create_empty_files(cams_to_create_empty_files=missing_cams_in_filepath_keys, framenum=min_framenum, markers=self.markers) for cam in missing_cams_in_all_cameras: self.triangulation_dlc_cams_filepaths.pop(cam) self.cams_to_exclude.append(cam) if use_preexisting_csvs and self.csv_output_filepath.exists(): self.df = pd.read_csv(self.csv_output_filepath) self.anipose_io = {"reprojerr": np.array([0]), "reproj_nonan": np.array([0]), "reprojerr_flat": np.array([0])} print(f"Found a file at {self.csv_output_filepath}!\n" "No triangulation will be performed.") else: self.anipose_io = self._preprocess_dlc_predictions_for_anipose(triangulate_full_recording=triangulate_full_recording) if self.triangulation_type == "triangulate": p3ds_flat = self.camera_group.triangulate(self.anipose_io["points_flat"], progress=True) elif self.triangulation_type == "triangulate_optim_ransac_False": p3ds_flat = self.camera_group.triangulate_optim(self.anipose_io["points"], init_ransac=False, init_progress=True).reshape( self.anipose_io["n_points"] * self.anipose_io["n_joints"], 3) elif self.triangulation_type == "triangulate_optim_ransac_True": p3ds_flat = self.camera_group.triangulate_optim(self.anipose_io["points"], init_ransac=True, init_progress=True).reshape( self.anipose_io["n_points"] * self.anipose_io["n_joints"], 3) else: raise ValueError( "Supported methods for triangulation are triangulate, " "triangulate_optim_ransac_True, triangulate_optim_ransac_False!") self.anipose_io["p3ds"] = p3ds_flat.reshape(self.anipose_io["n_points"], self.anipose_io["n_joints"], 3) self.anipose_io["reprojerr"], self.anipose_io["reproj_nonan"], self.anipose_io[ "reprojerr_flat"] = self._get_reprojection_errors( p3ds_flat=p3ds_flat) self.df = self._get_dataframe_of_triangulated_points(anipose_io=self.anipose_io) if (triangulate_full_recording) or (not self.csv_output_filepath.exists()): _save_dataframe_as_csv(filepath=self.csv_output_filepath, df=self.df) self._delete_temp_files()
def _delete_temp_files(self) -> None: cams_to_delete = [] for cam in self.triangulation_dlc_cams_filepaths: path = self.triangulation_dlc_cams_filepaths[cam] if "_temp" in path.name: path.unlink() cams_to_delete.append(cam) for cam in cams_to_delete: self.triangulation_dlc_cams_filepaths.pop(cam)
[docs] def exclude_markers(self, all_markers_to_exclude_config_path: Union[Path, str], verbose: bool = True): """ Exclude markers in prediction based on markers_to_exclude config. Parameters ---------- all_markers_to_exclude_config_path: Path or str Filepath to the config used for exclusion of markers. verbose: bool, default True If True (default), print if exclusion of markers worked without any abnormalities. Notes _____ The all_markers_to_exclude_config_path has to be a path to a yaml file representing a dictionary with the camera names as keys and a list of the markers, that should be excluded as value. """ all_markers_to_exclude = read_config(all_markers_to_exclude_config_path) missing_cams = check_keys(all_markers_to_exclude, list(self.triangulation_dlc_cams_filepaths)) if missing_cams: if verbose: print( f"Found no markers to exclude for {missing_cams} in {str(all_markers_to_exclude_config_path)}!") for cam_id in self.triangulation_dlc_cams_filepaths: h5_file = self.triangulation_dlc_cams_filepaths[cam_id] df = pd.read_hdf(h5_file) markers = set(b for a, b, c in df.keys()) markers_to_exclude_per_cam = all_markers_to_exclude[cam_id] existing_markers_to_exclude = list(set(markers) & set(markers_to_exclude_per_cam)) not_existing_markers = [marker for marker in markers_to_exclude_per_cam if marker not in markers] if not_existing_markers: if verbose: print( f"The following markers were not found in the dataframe, " f"but were given as markers to exclude for {cam_id}: {not_existing_markers}!") if existing_markers_to_exclude: for i, keys in enumerate(df.columns): a, b, c = keys if b in existing_markers_to_exclude and c == "likelihood": df.iloc[:, i] = 0 df.to_hdf(h5_file, key="key", mode="w") self.markers_excluded_manually = True
def _create_empty_files( self, cams_to_create_empty_files: List[str], framenum: int, markers: List[str]) -> None: for cam in cams_to_create_empty_files: print(f"Creating empty .h5 file for {cam}!") h5_output_filepath = self.output_directory.joinpath( f"empty_temp_{cam}.h5" ) cols = get_multi_index(markers) df = pd.DataFrame(data=np.zeros((framenum, len(cols))), columns=cols, dtype=int) df.to_hdf(str(h5_output_filepath), "empty") self._validate_triangulation_marker_ids( triangulation_markers_df_filepath=h5_output_filepath, framenum=framenum, defined_marker_ids=markers) self.triangulation_dlc_cams_filepaths[cam] = h5_output_filepath def _validate_triangulation_marker_ids(self, triangulation_markers_df_filepath: Path, framenum: int, defined_marker_ids: List[str], add_missing_marker_ids_with_0_likelihood: bool = True) -> None: triangulation_markers_df = pd.read_hdf(triangulation_markers_df_filepath) prediction_marker_ids = list( set([marker_id for scorer, marker_id, key in triangulation_markers_df.columns])) marker_ids_not_in_ground_truth = _find_non_matching_list_elements(prediction_marker_ids, defined_marker_ids) marker_ids_not_in_prediction = _find_non_matching_list_elements(defined_marker_ids, prediction_marker_ids) if add_missing_marker_ids_with_0_likelihood & bool(marker_ids_not_in_prediction): triangulation_markers_df = _add_missing_marker_ids_to_prediction( missing_marker_ids=marker_ids_not_in_prediction, df=triangulation_markers_df, framenum=framenum, ) print( "The following marker_ids were missing and added to the dataframe with a " f"likelihood of 0: {marker_ids_not_in_prediction}." ) if marker_ids_not_in_ground_truth: triangulation_markers_df = _remove_marker_ids_not_in_ground_truth( marker_ids_to_remove=marker_ids_not_in_ground_truth, df=triangulation_markers_df, ) print( "The following marker_ids were deleted from the dataframe, since they were " f"not present in the ground truth: {marker_ids_not_in_ground_truth}." ) triangulation_markers_df.to_hdf( triangulation_markers_df_filepath, "empty", mode="w" ) def _load_calibration(self, filepath: Path) -> ap_lib.cameras.CameraGroup: if filepath.name.endswith(".toml") and filepath.exists(): return ap_lib.cameras.CameraGroup.load(filepath) else: raise FileNotFoundError( f"The path, given as calibration_toml_filepath\n" "does not end with .toml or does not exist!\n" "Make sure, that you enter the correct path!" ) def _preprocess_dlc_predictions_for_anipose(self, triangulate_full_recording: bool = True) -> Dict: anipose_io = ap_lib.utils.load_pose2d_fnames( fname_dict=self.triangulation_dlc_cams_filepaths ) return self._add_additional_information_and_continue_preprocessing(anipose_io=anipose_io, triangulate_full_recording=triangulate_full_recording) def _add_additional_information_and_continue_preprocessing( self, anipose_io: Dict, triangulate_full_recording: bool = True ) -> Dict: n_cams, anipose_io["n_points"], anipose_io["n_joints"], _ = anipose_io["points"].shape if not triangulate_full_recording: start_idx, end_idx = 0, 2 anipose_io["points"] = anipose_io["points"][:, start_idx:end_idx, :, :] anipose_io["n_points"] = (end_idx - start_idx) if end_idx < anipose_io['n_points'] else anipose_io['n_points'] anipose_io["scores"] = anipose_io["scores"][:, start_idx:end_idx, :] anipose_io["points"][anipose_io["scores"] < self.score_threshold] = np.nan anipose_io["points_flat"] = anipose_io["points"].reshape(n_cams, -1, 2) anipose_io["scores_flat"] = anipose_io["scores"].reshape(n_cams, -1) return anipose_io def _get_reprojection_errors( self, p3ds_flat: np.array ) -> Tuple[np.array, np.array, np.array]: reprojerr_flat = self.camera_group.reprojection_error(p3ds_flat, self.anipose_io["points_flat"], mean=True) reprojerr = reprojerr_flat.reshape(self.anipose_io["n_points"], self.anipose_io["n_joints"]) reprojerr_nonan = reprojerr[np.logical_not(np.isnan(reprojerr))] return reprojerr, reprojerr_nonan, reprojerr_flat def _get_dataframe_of_triangulated_points(self, anipose_io: Dict) -> pd.DataFrame: """ The following function was taken from https://github.com/lambdaloop/anipose/blob/d20091550dc8b901f460f914544ecfc66c116329/anipose/triangulate.py. Changes were made to match our needs here. BSD 2-Clause License Copyright (c) 2019, Pierre Karashchuk All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ all_points_raw = anipose_io["points"] all_scores = anipose_io["scores"] _cams, n_frames, n_joints, _ = all_points_raw.shape good_points = ~np.isnan(all_points_raw[:, :, :, 0]) num_cams = np.sum(good_points, axis=0).astype("float") all_points_3d = anipose_io['p3ds'].reshape(n_frames, n_joints, 3) all_errors = anipose_io['reprojerr_flat'].reshape(n_frames, n_joints) all_scores[~good_points] = 2 scores_3d = np.min(all_scores, axis=0) scores_3d[num_cams < 2] = np.nan all_errors[num_cams < 2] = np.nan num_cams[num_cams < 2] = np.nan all_points_3d_adj = all_points_3d #M = np.identity(3) #center = np.zeros(3) df = pd.DataFrame() for bp_num, bp in enumerate(anipose_io["bodyparts"]): for ax_num, axis in enumerate(["x", "y", "z"]): df[bp + "_" + axis] = all_points_3d_adj[:, bp_num, ax_num] df[bp + "_error"] = anipose_io['reprojerr'][:, bp_num] df[bp + "_score"] = scores_3d[:, bp_num] #for i in range(3): # for j in range(3): # df["M_{}{}".format(i, j)] = M[i, j] #for i in range(3): # df["center_{}".format(i)] = center[i] df["fnum"] = np.arange(n_frames) return df
[docs]class TriangulationRecordings(Triangulation): """ Subclass of Triangulation, in which videos are triangulated based on a calibration file. Temporal synchronization of the videos can be performed based on a pattern. The triangulated dataframe can be normalised (rotated and translated). For visalization, a triangulated video can be created. Methods _______ run_synchronization(overwrite_DLC_analysis_and_synchro, verbose): Perform analysis of all videos using DLC or other methods, synchronization to the led_pattern and downsampling to target_fps. create_triangulated_video(filename, config_path): Create video of the triangulated data. normalize(normalization_config_path, save_dataframe): Rotate and translate the triangulated dataframe. See Also ________ Triangulation: Parent class, for triangulation of videos or images. core.meta.MetaInterface.create_recordings: Create TriangulationRecording objects for all recording_directories added to MetaInterface. core.meta.MetaInterface.synchronize_recordings: Run the function run_synchronization for all TriangulationRecording objects added to MetaInterface. core.meta.MetaInterface.triangulate_recordings: Run the function run_triangulation for all TriangulationRecording objects added to MetaInterface. core.checker_objects.CheckRecording: A class, that checks the metadata and filenames of videos in a given folder and allows for filename changing via user input. Examples ________ >>> from core.triangulation_calibration_module import TriangulationRecordings >>> rec_config = "test_data/Server_structure/Calibrations/220922/recording_config_220922.yaml" >>> directory = "test_data/Server_structure/VGlut2-flp/September2022/206_F2-63/220922_OTE/" >>> triangulation_object = TriangulationRecordings( ... directory=directory, ... recording_config_filepath=rec_config, ... project_config_filepath="test_data/project_config.yaml", ... recreate_undistorted_plots = True, ... output_directory=directory ... ) >>> triangulation_object.run_synchronization() >>> triangulation_object.exclude_markers( ... all_markers_to_exclude_config_path="test_data/markers_to_exclude_config.yaml", ... verbose=False, ... ) >>> triangulation_object.run_triangulation( ... calibration_toml_filepath="test_data/Server_structure/Calibrations/220922/220922_0_Bottom_Ground1_Ground2_Side1_Side2_Side3.toml" ... ) >>> normalised_path, normalisation_error = triangulation_object.normalize( ... normalization_config_path="test_data/normalization_config.yaml" ... ) """ @property def _metadata_keys(self) -> List[str]: return ["recording_date", "paradigm", "mouse_id"] @property def _videometadata_tag(self) -> str: return "recording" @property def _allowed_filetypes(self) -> List[str]: return [".AVI", ".avi", ".mov", ".mp4"]
[docs] def run_synchronization( self, overwrite_DLC_analysis_and_synchro: bool = False, verbose: bool = True ) -> None: """ Perform analysis of all videos using DLC or other methods, synchronization to the led_pattern and downsampling to target_fps. Call the synchronizer via VideoInterface and save the prediction file in triangulation_dlc_cams_filepaths. Create a plot for crossvalidation of the synchronised LED timeseries. Define self.markers as unique markers found in all prediction files. Exclude videos, if there are diverging framenumbers after synchronization. Parameters ---------- overwrite_DLC_analysis_and_synchro: bool, default False If True (default False), then pre-existing DLC files and synchronisations will be overwritten during analysis. verbose: bool, default True If True (default), then Crossvalidation plot and synchronised number of frames for each camera are printed. """ cams_not_to_analyse = [] for video_interface in self.video_interfaces.values(): if video_interface.video_metadata.processing_type == "exclude": cams_not_to_analyse.append(video_interface.video_metadata.cam_id) else: if ( video_interface.video_metadata.fps >= video_interface.video_metadata.target_fps ): synchronizer = RecordingVideoDownSynchronizer else: synchronizer = RecordingVideoUpSynchronizer, video_interface.run_synchronizer( synchronizer=synchronizer, output_directory=self.output_directory, synchronize_only=False, overwrite_DLC_analysis_and_synchro=overwrite_DLC_analysis_and_synchro, synchro_metadata=self.synchro_metadata, verbose=verbose ) for cam in cams_not_to_analyse: self.video_interfaces.pop(cam) self.metadata_from_videos.pop(cam) self._plot_synchro_crossvalidation(verbose=verbose) self.triangulation_dlc_cams_filepaths = { video_interface: self.video_interfaces[ video_interface ].export_for_aniposelib() for video_interface in self.video_interfaces } all_markers = set() for file in self.triangulation_dlc_cams_filepaths.values(): df = pd.read_hdf(file) markers = list(df.columns.levels[1]) all_markers = all_markers.union(markers) self.markers = list(all_markers) self.cams_to_exclude = _exclude_by_framenum(metadata_from_videos=self.metadata_from_videos, allowed_num_diverging_frames=self.allowed_num_diverging_frames) for cam in self.metadata_from_videos: if cam in self.cams_to_exclude: self.triangulation_dlc_cams_filepaths.pop(cam)
def _plot_synchro_crossvalidation(self, verbose: bool) -> None: template = list(self.video_interfaces.values())[ 0].synchronizer_object.template_blinking_motif.adjust_template_timeseries_to_fps( fps=self.target_fps)[0][0] led_timeseries_crossvalidation = {} for video_interface in self.video_interfaces.values(): if video_interface.synchronizer_object.led_timeseries_for_cross_video_validation is not None: led_timeseries_crossvalidation[ video_interface.video_metadata.cam_id ] = video_interface.synchronizer_object.led_timeseries_for_cross_video_validation if list(led_timeseries_crossvalidation.keys()): filename = f'{self.mouse_id}_{self.recording_date}_{self.paradigm}_synchronization_crossvalidation_{self.target_fps}fps' synchronization_crossvalidation = AlignmentPlotCrossvalidation( template=template, led_timeseries=led_timeseries_crossvalidation, filename=filename, output_directory=self.output_directory, ) synchronization_crossvalidation.create_plot(save=True, plot=verbose) def _create_csv_filepath(self) -> Path: filepath_out = self.output_directory.joinpath( f"{self.mouse_id}_{self.recording_date}_{self.paradigm}_{self.target_fps}fps" f"_{self.score_threshold}p_excludedmarkers{self.markers_excluded_manually}_" f"filtered{self.synchro_metadata['use_2D_filter']}_" f"normalised{self.normalised_dataframe}_{self.triangulation_type}.csv" ) return filepath_out
[docs] def normalize(self, normalization_config_path: Union[Path, str], save_dataframe: bool = True, verbose: bool = False) -> Tuple[Path, float]: """ Rotate and translate the triangulated dataframe. Find frame, in which all markers given in the config are defined. Translate all points to center. Convert into cm. Rotate dataframe using scipy.transform.Rotation.align_vectors to align triangulated vectors and ground truth vectors. Create a plot for Visualization of Rotation. Parameters ---------- normalization_config_path: Path or str The path to the config used for normalisation. save_dataframe: bool, default True If True (default), then the dataframe will be saved and overwrites the pre-existing one. Returns ------- self.rotated_filepath: Path Filepath, were the rotated triangulated dataframe is saved. rotation_error: flot Error returned by scipy.transform.Rotation.align_vectors, representing whether the alignment worked well. verbose: bool, default False If True (default False), then the rotation visualization plot is shown. Notes ----- The normalization_config_path is a path to a yaml file, representing a dictionary with the following key-value pairs: CENTER: str The marker, at which to set (0, 0, 0). REFERENCE_LENGTH_CM: int The reference length in cm. REFERENCE_LENGTH_MARKERS: list The two markers for defining the reference length. The distance between those markers in px will be set to ReferenceLengthCm. REFERENCE_ROTATION_COORDS: list of list of int List of reference rotation markers (at least 3), to align the real world space and the triangulated space. Each element is a list of int, defining their x, y and z coordinate. REFERENCE_ROTATION_MARKERS: list of str List of triangulated markers, that should be aligned with ReferenceRotationCoords. Their lengths have to be equal! INVISIBLE_MARKERS: {str: list of int} Keys are 'x', 'y' and 'z', values are lists of ints. All lists have to match in length. The length is equal to the number of points to be plotted. Markers to plot in rotation visualization plot invisiblly. Can be used to set the axis aspect equal, since this feature is not established for 3D axes in matplotlib. """ normalization_config_path = convert_to_path(normalization_config_path) config = read_config(normalization_config_path) try: best_frame = _get_best_frame_for_normalisation(config=config, df=self.df) except ValueError: print(f"could not normalize the dataframe {self.csv_output_filepath}!") self.rotated_filepath = None return Path(""), 1000 x, y, z = get_3D_array(self.df, config['CENTER'], best_frame) for key in self.df.keys(): if key.endswith('_x'): self.df[key] = self.df[key] - x if key.endswith('_y'): self.df[key] = self.df[key] - y if key.endswith('_z'): self.df[key] = self.df[key] - z reference_length_px = get_xyz_distance_in_triangulation_space( marker_ids=tuple(config['REFERENCE_LENGTH_MARKERS']), df_xyz=self.df.iloc[best_frame, :]) conversionfactor = config['REFERENCE_LENGTH_CM'] / reference_length_px bp_keys_unflat = set(get_3D_df_keys(key[:-2]) for key in self.df.keys() if 'error' not in key and 'score' not in key and "M_" not in key and 'center' not in key and 'fn' not in key) bp_keys = list(it.chain(*bp_keys_unflat)) normalised = self.df.copy() normalised[bp_keys] *= conversionfactor if config['FLIP_AXIS_TO_ADJUST_CHIRALITY'] is not None: keys_to_flip = [key for key in bp_keys if key.endswith(config['FLIP_AXIS_TO_ADJUST_CHIRALITY'])] normalised[keys_to_flip] *= -1 reference_rotation_markers = [] for marker in config['REFERENCE_ROTATION_MARKERS']: reference_rotation_markers.append(get_3D_array(normalised, marker, best_frame)) r, rotation_error = Rotation.align_vectors(config["REFERENCE_ROTATION_COORDS"], reference_rotation_markers) rotated = normalised.copy() for key in bp_keys_unflat: rot_points = r.apply(normalised.loc[:, [key[0], key[1], key[2]]]) for axis in range(3): rotated.loc[:, key[axis]] = rot_points[:, axis] rotated_markers = [] for marker in config['REFERENCE_ROTATION_MARKERS']: rotated_markers.append(get_3D_array(rotated, marker, best_frame)) self.normalised_dataframe = True self.rotated_filepath = self._create_csv_filepath() if (save_dataframe) or (not self.rotated_filepath.exists()): _save_dataframe_as_csv(filepath=str(self.rotated_filepath), df=rotated) rotation_plot_filename = self.output_directory.joinpath(f"{self.mouse_id}_{self.recording_date}_{self.paradigm}_rotation_visualization") visualization = RotationVisualization( rotated_markers=rotated_markers, config=config, output_filepath=rotation_plot_filename, rotation_error=rotation_error ) visualization.create_plot(plot=verbose, save=True) return self.rotated_filepath, rotation_error
[docs] def create_triangulated_video( self, filename: str, config_path: Union[Path, str], start_s: int, end_s: int ) -> None: """ Create video of the triangulated data. Parameters ---------- filename: str The filename, where the video should be saved. config_path: Path or str The path to the config used to create triangulated videos. start_s: The second in the recording to start video creation. end_s: The second in the recording to end video creation. Notes _____ The yaml file at config_path has to have the following keys: body_marker_size, body_label_size: int, int body_marker_color, body_label_color: str, str matplotlib color body_marker_alpha, body_label_alpha: float 0 < 1, float 0 < 1 markers_to_exclude: list of str Markers that should not be plotted. Recommended is, giving at least "M_", "center_", "fnum" and "Unnamed" to it, since those labels are created from aniposelib and can not be plotted. markers_to_connect: list of list of str, optional Each element consists of a list of markers, that will be connected in the video. markers_to_fill: list of dict, optional Each element consists of a dict, that will be filled in the video. The keys of the dict have to be: markers: list of str The markers, to create a polygon in between. color: str matplotlib color, to fill the polygon. alpha: float 0 < 1 additional_markers_to_plot: list of dict, optional Markers to plot in addition to the triangulated points. The elements of the lists are dictionaries containing the following key-value pairs: name: str x, y, z: int, int, int alpha: float 0 < 1 size: int color: str matplotlib color Can be used to set the axis aspect equal, since this feature is not established for 3D axes in matplotlib. All markers can be used to be connected or filled. """ config_path = convert_to_path(config_path) self.video_plotting_config = read_config(config_path) self.triangulation_visualization = TriangulationVisualization(df_3D_filepath=self.rotated_filepath, output_directory=self.output_directory, config=self.video_plotting_config) self.triangulated_video_start_s = start_s triangulated_video = VideoClip( self._get_triangulated_plots, duration=(end_s - start_s), ) triangulated_video.write_videofile( f"{filename}.mp4", fps=self.target_fps, logger=None )
def _get_triangulated_plots(self, idx: int) -> np.ndarray: idx = int( (self.triangulated_video_start_s + idx) * self.target_fps) return self.triangulation_visualization.return_fig(idx=idx)
[docs]class CalibrationValidation(Triangulation): """ Subclass of Triangulation, in which images are triangulated based on a calibration file and the triangulated coordinates are validated based on a ground_truth. Methods _______ add_ground_truth_config(ground_truth_config_filepath) Read the metadata from ground_truth_config_filepath and create list of markers. get_marker_predictions(overwrite_analysed_markers) Run marker detection for all images in metadata_from_videos. evaluate_triangulation_of_calibration_validation_markers(show_3D_plot, verbose) Evaluate the triangulated data and return mean errors. See Also ________ Triangulation: Parent class, for triangulation of videos or images. core.meta.MetaInterface.create_calibrations: Create CalibrationValidation objects and run add_ground_truth_config for all calibration_directories added to MetaInterface. core.meta.MetaInterface.synchronize_calibrations: Run get_marker_predictions for all calibration_validation objects added to MetaInterface. core.checker_objects.CheckCalibrationValidation: A class, that checks the metadata and filenames of videos in a given folder and allows for filename changing via user input. Calibration.triangulate_optim: Run the function run_triangulation for the CalibrationValidation object passed to triangulate_optim. Examples ________ >>> from core.triangulation_calibration_module import CalibrationValidation >>> from pathlib import Path >>> rec_config = Path("test_data/Server_structure/Calibrations/220922/recording_config_220922.yaml") >>> calibration_validation_object = CalibrationValidation( ... project_config_filepath="test_data/project_config.yaml", ... directory=rec_config.parent, recording_config_filepath=rec_config, ... recreate_undistorted_plots = True, output_directory=rec_config.parent) >>> calibration_validation_object.add_ground_truth_config("test_data/ground_truth_config.yaml") >>> calibration_validation_object.get_marker_predictions() >>> calibration_validation_object.run_triangulation( ... calibration_toml_filepath="test_data/Server_structure/Calibrations/220922/220922_0_Bottom_Ground1_Ground2_Side1_Side2_Side3.toml", ... triangulate_full_recording = True) >>> mean_dist_err_percentage, mean_angle_err, reprojerr_nonan_mean = calibration_validation_object.evaluate_triangulation_of_calibration_validation_markers() """ @property def _metadata_keys(self) -> List[str]: return ["recording_date"] @property def _videometadata_tag(self) -> str: return "calvin" @property def _allowed_filetypes(self) -> List[str]: return [".bmp", ".tiff", ".png", ".jpg", ".AVI", ".avi", ".mp4"]
[docs] def add_ground_truth_config(self, ground_truth_config_filepath: Union[Path, str]) -> None: """ Read the metadata from ground_truth_config_filepath and create list of markers. Parameters ---------- ground_truth_config_filepath: str or Path The path to the ground_truth config file. Notes _____ The ground_truth yaml file at ground_truth_config_filepath has to have the following structure: distances: {str: {str: float}} Dictionary with first markers as key and dictionaries as values, that have second markers as key and the known distances between first and second markers as values. Distances are floats in cm. angles: Dictionary with vertex markers as keys and dictionaries as values, that have the following keys: value: float The value of the calculated angle in degrees. marker: list of str The markers between that draw the triangle (if 3 markers given: 0: vertex, 1/2: ray) or a plane and a line (if 5 markers given: 1/2/3: plane, 4/5: line). unique_ids: list of str A list with all marker_ids to take into account for ground_truth validation and plotting. marker_ids_to_connect_in_3D_plot: list of list of str, optional Each element consists of a list of markers, that will be connected in the 3D calibration validation plot. """ ground_truth_config_filepath = convert_to_path(ground_truth_config_filepath) self.ground_truth_config = read_config(ground_truth_config_filepath) self.markers = self.ground_truth_config["unique_ids"]
[docs] def get_marker_predictions(self, overwrite_analysed_markers: bool = False) -> None: """ Run marker detection for all images in metadata_from_videos. Save predictions in triangulation_dlc_cams_filepaths, validate predictions and create predictions plots. Parameters ---------- overwrite_analysed_markers: bool, default False If True (default False), then pre-existing files won't be overwritten during the analysis. """ self.markers_excluded_manually = False self.triangulation_dlc_cams_filepaths = {} for cam in self.metadata_from_videos.values(): h5_output_filepath = self._run_marker_detection(cam=cam, overwrite_analysed_markers=overwrite_analysed_markers) self.triangulation_dlc_cams_filepaths[cam.cam_id] = h5_output_filepath self._validate_triangulation_marker_ids( triangulation_markers_df_filepath=h5_output_filepath, framenum=1, defined_marker_ids=self.markers ) predictions = PredictionsPlot( image=cam.filepath, predictions=h5_output_filepath, output_directory=self.output_directory, cam_id=cam.cam_id, likelihood_threshold=self.score_threshold, ) predictions.create_plot(plot=False, save=True) self.cams_to_exclude = []
[docs] def evaluate_triangulation_of_calibration_validation_markers( self, show_3D_plot: bool = True, verbose: bool = True, ) -> Tuple[np.float64, np.float64, np.float64]: """ Evaluate the triangulated data and return mean errors. Calculate the distances and angles for all references in ground truth and get the differences between triangulated and ground truth data. Print these differences and show the plot of the triangulated data. Calculate the mean of distance and angle error and reprojection error. Parameters ---------- show_3D_plot: bool, default True If True (default), then a plot of the triangulated calibration_validation data is shown. verbose: bool, default True If True (default), then all angles and distances compared to their ground truth will be printed. Returns ------- mean_dist_err_percentage: np.float64 The mean error of all triangulated distances compared to their ground truth. mean_angle_err: np.float64 The mean error of all triangulated errors compared to their ground truth. reprojerr_nonan_mean: np.float64 The mean reprojection error of all triangulated points. Notes _____ The path directing to the ground_truth yaml file, that is saved as self.ground_truth_config, has to have the following structure: distances: {str: {str: float}} Dictionary with first markers as key and dictionaries as values, that have second markers as key and the known distances between first and second markers as values. Distances are floats in cm. angles: Dictionary with vertex markers as keys and dictionaries as values, that have the following keys: value: float The value of the calculated angle in degrees. marker: list of str The markers between that draw the triangle (if 3 markers given: 0: vertex, 1/2: ray) or a plane and a line (if 5 markers given: 1/2/3: plane, 4/5: line). unique_ids: list of str A list with all marker_ids to take into account for ground_truth validation and plotting. marker_ids_to_connect_in_3D_plot: list of list of str, optional Each element consists of a list of markers, that will be connected in the 3D calibration validation plot. """ self.anipose_io = add_reprojection_errors_of_all_calibration_validation_markers( anipose_io=self.anipose_io, df_xyz=self.df ) self.anipose_io = set_distances_and_angles_for_evaluation(parameters=self.ground_truth_config, anipose_io=self.anipose_io, df_xyz=self.df) gt_distances = load_distances_from_ground_truth(self.ground_truth_config["distances"]) self.anipose_io = add_errors_between_computed_and_ground_truth_distances_for_different_references( anipose_io=self.anipose_io, ground_truth_distances=gt_distances) self.anipose_io = add_errors_between_computed_and_ground_truth_angles( self.ground_truth_config["angles"], self.anipose_io) if verbose: for reference_distance_id, distance_errors in self.anipose_io[ "distance_errors_in_cm" ].items(): print( f'Using {reference_distance_id} as reference distance, ' f'the mean distance error is: {distance_errors["mean_error"]} cm.' ) for angle, angle_error in self.anipose_io[ "angles_error_ground_truth_vs_triangulated" ].items(): print(f"Considering {angle}, the angle error is: {angle_error}") if show_3D_plot: calibration_validation_plot = CalibrationValidationPlot( p3d=self.anipose_io["p3ds"][0], bodyparts=self.anipose_io["bodyparts"], output_directory=self.output_directory, marker_ids_to_connect=self.ground_truth_config["marker_ids_to_connect_in_3D_plot"], filename_tag=f"{self.csv_output_filepath.stem}" ) calibration_validation_plot.create_plot(plot=True, save=True) all_percentage_errors = [] for reference in self.anipose_io["distance_errors_in_cm"].keys(): all_percentage_errors = [percentage_error for *_, percentage_error in self.anipose_io["distance_errors_in_cm"][reference]["individual_errors"]] all_angle_errors = list(self.anipose_io["angles_error_ground_truth_vs_triangulated"].values()) mean_dist_err_percentage = np.nanmean(np.asarray(all_percentage_errors)) mean_angle_err = np.nanmean(np.asarray(all_angle_errors)) reprojerr_nonan_mean = self.anipose_io["reproj_nonan"].mean() return mean_dist_err_percentage, mean_angle_err, reprojerr_nonan_mean
def _create_csv_filepath(self) -> Path: filepath_out = self.output_directory.joinpath( f"Calvin_{self.recording_date}_{self.score_threshold}p_excludedmarkers" f"{self.markers_excluded_manually}_filteredFalse_{self.triangulation_type}.csv") return filepath_out def _run_marker_detection(self, cam: VideoMetadata, overwrite_analysed_markers: bool=False) -> Path: h5_output_filepath = self.output_directory.joinpath( f"Calvin_{self.recording_date}_{cam.cam_id}.h5" ) if overwrite_analysed_markers or (not h5_output_filepath.exists()): if cam.calibration_evaluation_type == "manual": config = cam.calibration_evaluation_filepath manual_interface = ManualAnnotation( object_to_analyse=cam.filepath, output_directory=self.output_directory, marker_detection_directory=config, ) h5_output_filepath = manual_interface.analyze_objects(filepath=h5_output_filepath, only_first_frame=True) elif cam.calibration_evaluation_type == "DLC": config = cam.calibration_evaluation_filepath dlc_interface = DeeplabcutInterface( object_to_analyse=cam.filepath, output_directory=self.output_directory, marker_detection_directory=config, ) h5_output_filepath = dlc_interface.analyze_objects(filepath=h5_output_filepath, filtering=False, use_gpu = "low") # filtering is not supported and not necessary for single frame predictions! else: raise ValueError( "For calibration_evaluation only manual and DLC are supported!" ) return h5_output_filepath