Coverage for oc_meta/plugins/get_ids_from_citations.py: 99%
78 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1import csv
2import io
3import os
4from zipfile import ZipFile
6from tqdm import tqdm
8from oc_meta.lib.file_manager import pathoo, write_csv
11def get_ids_from_citations(citations_dir:str, output_dir:str, threshold:int=10000, verbose:bool=False) -> None:
12 '''
13 This script extracts the identifiers of the citing and cited documents from citation data organized in the CSV format accepted by OpenCitations.
15 :params citations_dir: the directory containing the citations files, either in CSV or ZIP format
16 :type citations_dir: str
17 :params output_dir: directory of the output CSV files
18 :type output_dir: str
19 :params verbose: show a loading bar, elapsed time and estimated time
20 :type verbose: bool
21 :returns: None
22 '''
23 threshold = 10000 if threshold is None else int(threshold)
24 if not any(file.endswith('.csv') or file.endswith('.zip') for _, _, files in os.walk(citations_dir) for file in files):
25 raise RuntimeError('I did not find CSV or ZIP files in the given directory')
26 ids_found = set()
27 if os.path.isdir(output_dir):
28 if verbose:
29 print(f'[INFO: get_ids_from_citations_data] Looking for previously stored IDs')
30 for filename in os.listdir(output_dir):
31 with open(os.path.join(output_dir, filename), 'r', encoding='utf8', newline='') as f:
32 reader = csv.DictReader(f)
33 for row in reader:
34 ids_found.add(row['id'])
35 if verbose:
36 print(f'[INFO: get_ids_from_citations] {len(ids_found)} IDs found')
37 else:
38 pathoo(output_dir)
39 if verbose:
40 pbar = tqdm(total=get_files_count(citations_dir))
41 file_counter = 1
42 output_csv = list()
43 len_ids_found = len(ids_found)
44 for fold, _, files in os.walk(citations_dir):
45 for file in files:
46 cur_file = file_counter + len_ids_found
47 if file.endswith('.csv'):
48 with open(os.path.join(fold, file), 'r', encoding='utf8') as file_obj:
49 data = list(csv.DictReader(file_obj))
50 process_data(data, ids_found, output_csv)
51 if file_counter % threshold == 0:
52 write_csv(path=os.path.join(output_dir, f'{cur_file-threshold+1}-{cur_file}.csv'), datalist=output_csv)
53 output_csv = list()
54 if verbose:
55 pbar.update()
56 file_counter += 1
57 elif file.endswith('.zip'):
58 with ZipFile(os.path.join(citations_dir, file), 'r') as archive:
59 for name in archive.namelist():
60 cur_file = file_counter + len_ids_found
61 with archive.open(name) as infile:
62 data = csv.DictReader(io.TextIOWrapper(infile, 'utf-8'))
63 process_data(data, ids_found, output_csv)
64 if file_counter % threshold == 0:
65 write_csv(path=os.path.join(output_dir, f'{cur_file-threshold+1}-{cur_file}.csv'), datalist=output_csv)
66 output_csv = list()
67 if verbose:
68 pbar.update()
69 file_counter += 1
70 if output_csv:
71 write_csv(path=os.path.join(output_dir, f'{cur_file + 1 - (cur_file % threshold)}-{cur_file}.csv'), datalist=output_csv)
72 if verbose:
73 pbar.close()
74 print(f'[INFO: get_ids_from_citations] {len(ids_found)} IDs stored')
76def process_data(data:csv.DictReader, ids_found:set, output_csv:list) -> None:
77 for row in data:
78 citing = row['citing']
79 cited = row['cited']
80 if citing not in ids_found:
81 output_csv.append({'id':row['citing']})
82 ids_found.add(citing)
83 if cited not in ids_found:
84 output_csv.append({'id':row['cited']})
85 ids_found.add(cited)
87def get_files_count(citations_dir:str) -> int:
88 if any(file.endswith('.csv') for _, _, files in os.walk(citations_dir) for file in files):
89 file_count = sum(len(files) for _, _, files in os.walk(citations_dir))
90 elif any(file.endswith('.zip') for _, _, files in os.walk(citations_dir) for file in files):
91 file_count = 0
92 for _, _, files in os.walk(citations_dir):
93 for file in files:
94 with ZipFile(os.path.join(citations_dir, file), 'r') as archive:
95 file_count += len(archive.namelist())
96 return file_count