Coverage for oc_meta/plugins/get_ids_from_citations.py: 99%

78 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1import csv 

2import io 

3import os 

4from zipfile import ZipFile 

5 

6from tqdm import tqdm 

7 

8from oc_meta.lib.file_manager import pathoo, write_csv 

9 

10 

11def get_ids_from_citations(citations_dir:str, output_dir:str, threshold:int=10000, verbose:bool=False) -> None: 

12 ''' 

13 This script extracts the identifiers of the citing and cited documents from citation data organized in the CSV format accepted by OpenCitations. 

14 

15 :params citations_dir: the directory containing the citations files, either in CSV or ZIP format 

16 :type citations_dir: str 

17 :params output_dir: directory of the output CSV files 

18 :type output_dir: str 

19 :params verbose: show a loading bar, elapsed time and estimated time 

20 :type verbose: bool 

21 :returns: None 

22 ''' 

23 threshold = 10000 if threshold is None else int(threshold) 

24 if not any(file.endswith('.csv') or file.endswith('.zip') for _, _, files in os.walk(citations_dir) for file in files): 

25 raise RuntimeError('I did not find CSV or ZIP files in the given directory') 

26 ids_found = set() 

27 if os.path.isdir(output_dir): 

28 if verbose: 

29 print(f'[INFO: get_ids_from_citations_data] Looking for previously stored IDs') 

30 for filename in os.listdir(output_dir): 

31 with open(os.path.join(output_dir, filename), 'r', encoding='utf8', newline='') as f: 

32 reader = csv.DictReader(f) 

33 for row in reader: 

34 ids_found.add(row['id']) 

35 if verbose: 

36 print(f'[INFO: get_ids_from_citations] {len(ids_found)} IDs found') 

37 else: 

38 pathoo(output_dir) 

39 if verbose: 

40 pbar = tqdm(total=get_files_count(citations_dir)) 

41 file_counter = 1 

42 output_csv = list() 

43 len_ids_found = len(ids_found) 

44 for fold, _, files in os.walk(citations_dir): 

45 for file in files: 

46 cur_file = file_counter + len_ids_found 

47 if file.endswith('.csv'): 

48 with open(os.path.join(fold, file), 'r', encoding='utf8') as file_obj: 

49 data = list(csv.DictReader(file_obj)) 

50 process_data(data, ids_found, output_csv) 

51 if file_counter % threshold == 0: 

52 write_csv(path=os.path.join(output_dir, f'{cur_file-threshold+1}-{cur_file}.csv'), datalist=output_csv) 

53 output_csv = list() 

54 if verbose: 

55 pbar.update() 

56 file_counter += 1 

57 elif file.endswith('.zip'): 

58 with ZipFile(os.path.join(citations_dir, file), 'r') as archive: 

59 for name in archive.namelist(): 

60 cur_file = file_counter + len_ids_found 

61 with archive.open(name) as infile: 

62 data = csv.DictReader(io.TextIOWrapper(infile, 'utf-8')) 

63 process_data(data, ids_found, output_csv) 

64 if file_counter % threshold == 0: 

65 write_csv(path=os.path.join(output_dir, f'{cur_file-threshold+1}-{cur_file}.csv'), datalist=output_csv) 

66 output_csv = list() 

67 if verbose: 

68 pbar.update() 

69 file_counter += 1 

70 if output_csv: 

71 write_csv(path=os.path.join(output_dir, f'{cur_file + 1 - (cur_file % threshold)}-{cur_file}.csv'), datalist=output_csv) 

72 if verbose: 

73 pbar.close() 

74 print(f'[INFO: get_ids_from_citations] {len(ids_found)} IDs stored') 

75 

76def process_data(data:csv.DictReader, ids_found:set, output_csv:list) -> None: 

77 for row in data: 

78 citing = row['citing'] 

79 cited = row['cited'] 

80 if citing not in ids_found: 

81 output_csv.append({'id':row['citing']}) 

82 ids_found.add(citing) 

83 if cited not in ids_found: 

84 output_csv.append({'id':row['cited']}) 

85 ids_found.add(cited) 

86 

87def get_files_count(citations_dir:str) -> int: 

88 if any(file.endswith('.csv') for _, _, files in os.walk(citations_dir) for file in files): 

89 file_count = sum(len(files) for _, _, files in os.walk(citations_dir)) 

90 elif any(file.endswith('.zip') for _, _, files in os.walk(citations_dir) for file in files): 

91 file_count = 0 

92 for _, _, files in os.walk(citations_dir): 

93 for file in files: 

94 with ZipFile(os.path.join(citations_dir, file), 'r') as archive: 

95 file_count += len(archive.namelist()) 

96 return file_count