Coverage for oc_ds_converter / preprocessing / datacite.py: 95%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-25 18:06 +0000

1# SPDX-FileCopyrightText: 2023 Arcangelo Massari <arcangelo.massari@unibo.it> 

2# SPDX-FileCopyrightText: 2023-2026 Marta Soricetti <marta.soricetti@unibo.it> 

3# 

4# SPDX-License-Identifier: ISC 

5 

6import glob 

7import gzip 

8import json 

9import os 

10import os.path 

11import tarfile 

12from os import listdir, makedirs 

13from os.path import exists, join, split 

14 

15from oc_ds_converter.preprocessing.base import Preprocessing 

16 

17 

18class DatacitePreProcessing(Preprocessing): 

19 """This class aims at pre-processing the DataCite Public Data File. 

20 The Data File is supplied as a TAR archive, containing JSONLines formatted metadata records and CSVs with some 

21 supplemental information for easier filtering. 

22 The folders within the Data File are used to group each record by the month it was last updated, 

23 following the convention `updated_YYYY-MM`. 

24 Inside each folder, individual files are compressed with GZIP, to allow for targeted and/or parallel extraction. 

25 Each `part_XXXX.jsonl` file contains up to 10,000 metadata records, one per line, with each line being a valid self-contained JSON document. 

26 

27 The class splits the original nldJSON in many JSON files, each one containing the number of entities specified in input by the user. 

28 Further, the class discards those entities that are not involved in citations""" 

29 

30 def __init__(self, input_tar, output_dir, interval, state_file=None, filter=None): 

31 self._req_type = ".json" 

32 self._input_tar = input_tar 

33 self._output_dir = output_dir 

34 self._needed_info = ["relationType", "relatedIdentifierType", "relatedIdentifier"] 

35 if not exists(self._output_dir): 

36 makedirs(self._output_dir) 

37 self._interval = interval 

38 if filter: 

39 self._filter = filter 

40 else: 

41 self._filter = ["references", "isreferencedby", "cites", "iscitedby"] 

42 # Checkpoint file path 

43 if state_file: 

44 self._state_file = state_file 

45 else: 

46 self._state_file = join(self._output_dir, "processing_state.json") 

47 super(DatacitePreProcessing, self).__init__() 

48 

49 def load_checkpoint(self): 

50 """Loads the last processing state if available.""" 

51 if exists(self._state_file): 

52 with open(self._state_file, 'r') as f: 

53 state = json.load(f) 

54 print(f"Resuming from count {state['count']} and {len(state['processed_files'])} files.") 

55 return set(state['processed_files']), state['count'] 

56 return set(), 0 

57 

58 def save_checkpoint(self, processed_files, count): 

59 """Saves the current list of processed files and global count.""" 

60 with open(self._state_file, 'w') as f: 

61 json.dump({ 

62 "processed_files": list(processed_files), 

63 "count": count 

64 }, f) 

65 

66 def split_input(self): 

67 

68 #initialize state 

69 processed_files_set, global_count = self.load_checkpoint() 

70 

71 # Files that have been read but their data is still in the 'data' buffer (not yet written) 

72 pending_files = [] 

73 

74 with tarfile.open(self._input_tar, 'r') as tar: 

75 data=[] 

76 

77 # 1. find jsonl gz archives 

78 jsonl_gz_files = [member for member in tar.getmembers() 

79 if member.name.endswith('.jsonl.gz') and member.isfile()] 

80 

81 # 2. Iterate through each member 

82 for member in jsonl_gz_files: 

83 

84 #skip already fully processed files 

85 if member.name in processed_files_set: 

86 continue 

87 

88 print(f"Processing: {member.name}") 

89 pending_files.append(member.name) 

90 

91 # 3. Extract the file object from the tar archive 

92 f_obj = tar.extractfile(member) 

93 

94 if f_obj is not None: 

95 # 4. Read the extracted file object as a gzip stream 

96 with gzip.open(f_obj, mode='rt', encoding='utf-8') as f: 

97 for line in f: 

98 try: 

99 linedict = json.loads(line) 

100 

101 #filter for entities without dois 

102 if 'id' not in linedict or 'type' not in linedict: 

103 continue 

104 if linedict['type'] != "dois": 

105 continue 

106 

107 #filter for entities not involved in citations 

108 attributes = linedict["attributes"] 

109 rel_ids = attributes.get("relatedIdentifiers") 

110 

111 if rel_ids: 

112 match_found = False 

113 for ref in rel_ids: 

114 if all(elem in ref for elem in self._needed_info): 

115 relatedIdentifierType = (str(ref["relatedIdentifierType"])).lower() 

116 relationType = str(ref["relationType"]).lower() 

117 relatedIdentifier = str(ref["relatedIdentifier"]) 

118 #ignore also entities with self citations 

119 if relatedIdentifierType == "doi" and relationType in self._filter and relatedIdentifier != linedict['id']: 

120 match_found = True 

121 break 

122 

123 if match_found: 

124 data.append(linedict) 

125 global_count += 1 

126 new_data = self.splitted_to_file(global_count, self._interval, self._output_dir, 

127 data) 

128 if len(new_data) == 0 and len(data) > 0: 

129 safe_to_commit = pending_files[:-1] 

130 if safe_to_commit: 

131 processed_files_set.update(safe_to_commit) 

132 self.save_checkpoint(processed_files_set, global_count) 

133 pending_files = [member.name] 

134 data = new_data 

135 except json.JSONDecodeError: 

136 continue 

137 if data: 

138 print(f"Flushing final {len(data)} entities (count {global_count}).") 

139 self.splitted_to_file(global_count, 1, self._output_dir, data) # interval=1 forces write 

140 

141 if pending_files: 

142 processed_files_set.update(pending_files) 

143 self.save_checkpoint(processed_files_set, global_count) 

144 print(f"Completed all files. Total entities: {global_count}") 

145 

146 

147 

148 def splitted_to_file(self, cur_n, target_n, out_dir, data, headers=None): 

149 makedirs(out_dir, exist_ok=True) 

150 dict_to_json = dict() 

151 #check if the interval is reached 

152 if int(cur_n) != 0 and int(cur_n) % int(target_n) == 0 and data: 

153 

154 filename = "jSonFile_" + str(cur_n // target_n) + self._req_type 

155 file_path = os.path.join(out_dir, filename) 

156 

157 print(f"Writing {filename}") 

158 with open(file_path, 'w', encoding="utf8") as json_file: 

159 dict_to_json["data"] = data 

160 json.dump(dict_to_json, json_file, ensure_ascii=False, indent=2) 

161 

162 return [] 

163 else: 

164 return data 

165 

166