Loading bids_tools/handlers/project.py +31 −17 Original line number Diff line number Diff line Loading @@ -52,13 +52,13 @@ def initialize_project_structure( f.writelines([f"# {project_title}\n\n", f"{project_description}"]) def create_project(input_data: dict): def create_project(input_data: str): """Create a new Collaborative Project on the HIP initialized with directory/file structure. Parameters ---------- input_data : dict Dictionary sent by the HIP that contains all information input_data : str Path to input data JSON file sent by the HIP that contains all information about Project and corresponding BIDS dataset in the form:: { Loading @@ -82,7 +82,10 @@ def create_project(input_data: dict): } } """ print(f"Creating new collaborative project at {input_data['path']}...") # Load input data with open(input_data, "r") as f: input_data = json.load(f) print(f'Creating new collaborative project at {input_data["path"]}...') # Extract input data project_dir = Path(input_data["path"]) project_title = input_data["title"] Loading @@ -92,18 +95,18 @@ def create_project(input_data: dict): # Create initial BIDS dataset create_empty_bids_dataset( bids_dir=(project_dir / "inputs" / "bids-dataset").absolute(), dataset_desc=input_data["createBidsDatasetDto"], dataset_desc=input_data["createBidsDatasetDto"]["DatasetDescJSON"], ) print(SUCCESS) def import_subject(input_data: dict): def import_subject(input_data: str): """Import a new subject from a BIDS dataset of the HIP Center space to the BIDS dataset of the HIP Collaborative Project. Parameters ---------- input_data : dict Dictionary sent by the HIP that contains all information input_data : str Path to input data JSON file sent by the HIP that contains all information about the subject and files to import in the form:: { Loading @@ -112,6 +115,9 @@ def import_subject(input_data: dict): "targetDatasetPath": "/path/to/target/bids/dataset/directory", } """ # Load input data with open(input_data, "r") as f: input_data = json.load(f) print( f"Importing subject {input_data['subject']} " f"from {input_data['sourceDatasetPath']} " Loading @@ -119,17 +125,18 @@ def import_subject(input_data: dict): ) # Copy subject directory from source to target shutil.copytree( Path(input_data["sourceDatasetPath"] / input_data["subject"]).absolute(), Path(input_data["targetDatasetPath"] / input_data["subject"]).absolute(), (Path(input_data["sourceDatasetPath"]) / input_data["subject"]).absolute(), (Path(input_data["targetDatasetPath"]) / input_data["subject"]).absolute(), dirs_exist_ok=True, ) # Update participants.tsv file of target dataset with subject row from source dataset transfer_subject_participants_tsv_row( participant_id=input_data["subject"], source_participant_tsv=Path( input_data["sourceDatasetPath"] / "participants.tsv" source_participant_tsv=( Path(input_data["sourceDatasetPath"]) / "participants.tsv" ).absolute(), target_participants_tsv=Path( input_data["targetDatasetPath"] / "participants.tsv" target_participants_tsv=( Path(input_data["targetDatasetPath"]) / "participants.tsv" ).absolute(), ) print(SUCCESS) Loading Loading @@ -168,13 +175,13 @@ def transfer_subject_participants_tsv_row( target_participants_df.to_csv(target_participants_tsv, sep="\t", index=False) def import_document(input_data: dict): def import_document(input_data: str): """Import a new supporting document to the `documents/` folder of the HIP Collaborative Project. Parameters ---------- input_data : dict Dictionary sent by the HIP that contains all information iinput_data : str Path to input data JSON file sent by the HIP that contains all information about the document to import in the form:: { Loading @@ -182,12 +189,19 @@ def import_document(input_data: dict): "targetDocumentPath": "/path/to/target/document/file", } """ # Load input data with open(input_data, "r") as f: input_data = json.load(f) print( f'Importing document {input_data["sourceDocumentPath"]} from HIP Center space ' f'to HIP Collaborative Project at {input_data["targetDocumentPath"]}... ' ) source_document_path = Path(input_data["sourceDocumentPath"]) target_document_path = Path(input_data["targetDocumentPath"]) # Remove target document if it already exists if target_document_path.exists(): print(f"WARNING: Overwriting target document at {target_document_path}...") os.remove(target_document_path) # Copy document from source to target shutil.copyfile(source_document_path, target_document_path) print(SUCCESS) Loading
bids_tools/handlers/project.py +31 −17 Original line number Diff line number Diff line Loading @@ -52,13 +52,13 @@ def initialize_project_structure( f.writelines([f"# {project_title}\n\n", f"{project_description}"]) def create_project(input_data: dict): def create_project(input_data: str): """Create a new Collaborative Project on the HIP initialized with directory/file structure. Parameters ---------- input_data : dict Dictionary sent by the HIP that contains all information input_data : str Path to input data JSON file sent by the HIP that contains all information about Project and corresponding BIDS dataset in the form:: { Loading @@ -82,7 +82,10 @@ def create_project(input_data: dict): } } """ print(f"Creating new collaborative project at {input_data['path']}...") # Load input data with open(input_data, "r") as f: input_data = json.load(f) print(f'Creating new collaborative project at {input_data["path"]}...') # Extract input data project_dir = Path(input_data["path"]) project_title = input_data["title"] Loading @@ -92,18 +95,18 @@ def create_project(input_data: dict): # Create initial BIDS dataset create_empty_bids_dataset( bids_dir=(project_dir / "inputs" / "bids-dataset").absolute(), dataset_desc=input_data["createBidsDatasetDto"], dataset_desc=input_data["createBidsDatasetDto"]["DatasetDescJSON"], ) print(SUCCESS) def import_subject(input_data: dict): def import_subject(input_data: str): """Import a new subject from a BIDS dataset of the HIP Center space to the BIDS dataset of the HIP Collaborative Project. Parameters ---------- input_data : dict Dictionary sent by the HIP that contains all information input_data : str Path to input data JSON file sent by the HIP that contains all information about the subject and files to import in the form:: { Loading @@ -112,6 +115,9 @@ def import_subject(input_data: dict): "targetDatasetPath": "/path/to/target/bids/dataset/directory", } """ # Load input data with open(input_data, "r") as f: input_data = json.load(f) print( f"Importing subject {input_data['subject']} " f"from {input_data['sourceDatasetPath']} " Loading @@ -119,17 +125,18 @@ def import_subject(input_data: dict): ) # Copy subject directory from source to target shutil.copytree( Path(input_data["sourceDatasetPath"] / input_data["subject"]).absolute(), Path(input_data["targetDatasetPath"] / input_data["subject"]).absolute(), (Path(input_data["sourceDatasetPath"]) / input_data["subject"]).absolute(), (Path(input_data["targetDatasetPath"]) / input_data["subject"]).absolute(), dirs_exist_ok=True, ) # Update participants.tsv file of target dataset with subject row from source dataset transfer_subject_participants_tsv_row( participant_id=input_data["subject"], source_participant_tsv=Path( input_data["sourceDatasetPath"] / "participants.tsv" source_participant_tsv=( Path(input_data["sourceDatasetPath"]) / "participants.tsv" ).absolute(), target_participants_tsv=Path( input_data["targetDatasetPath"] / "participants.tsv" target_participants_tsv=( Path(input_data["targetDatasetPath"]) / "participants.tsv" ).absolute(), ) print(SUCCESS) Loading Loading @@ -168,13 +175,13 @@ def transfer_subject_participants_tsv_row( target_participants_df.to_csv(target_participants_tsv, sep="\t", index=False) def import_document(input_data: dict): def import_document(input_data: str): """Import a new supporting document to the `documents/` folder of the HIP Collaborative Project. Parameters ---------- input_data : dict Dictionary sent by the HIP that contains all information iinput_data : str Path to input data JSON file sent by the HIP that contains all information about the document to import in the form:: { Loading @@ -182,12 +189,19 @@ def import_document(input_data: dict): "targetDocumentPath": "/path/to/target/document/file", } """ # Load input data with open(input_data, "r") as f: input_data = json.load(f) print( f'Importing document {input_data["sourceDocumentPath"]} from HIP Center space ' f'to HIP Collaborative Project at {input_data["targetDocumentPath"]}... ' ) source_document_path = Path(input_data["sourceDocumentPath"]) target_document_path = Path(input_data["targetDocumentPath"]) # Remove target document if it already exists if target_document_path.exists(): print(f"WARNING: Overwriting target document at {target_document_path}...") os.remove(target_document_path) # Copy document from source to target shutil.copyfile(source_document_path, target_document_path) print(SUCCESS)