Coverage for oc_ds_converter / preprocessing / datacite.py: 95%
100 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Arcangelo Massari <arcangelo.massari@unibo.it>
2# SPDX-FileCopyrightText: 2023-2026 Marta Soricetti <marta.soricetti@unibo.it>
3#
4# SPDX-License-Identifier: ISC
6import glob
7import gzip
8import json
9import os
10import os.path
11import tarfile
12from os import listdir, makedirs
13from os.path import exists, join, split
15from oc_ds_converter.preprocessing.base import Preprocessing
18class DatacitePreProcessing(Preprocessing):
19 """This class aims at pre-processing the DataCite Public Data File.
20 The Data File is supplied as a TAR archive, containing JSONLines formatted metadata records and CSVs with some
21 supplemental information for easier filtering.
22 The folders within the Data File are used to group each record by the month it was last updated,
23 following the convention `updated_YYYY-MM`.
24 Inside each folder, individual files are compressed with GZIP, to allow for targeted and/or parallel extraction.
25 Each `part_XXXX.jsonl` file contains up to 10,000 metadata records, one per line, with each line being a valid self-contained JSON document.
27 The class splits the original nldJSON in many JSON files, each one containing the number of entities specified in input by the user.
28 Further, the class discards those entities that are not involved in citations"""
30 def __init__(self, input_tar, output_dir, interval, state_file=None, filter=None):
31 self._req_type = ".json"
32 self._input_tar = input_tar
33 self._output_dir = output_dir
34 self._needed_info = ["relationType", "relatedIdentifierType", "relatedIdentifier"]
35 if not exists(self._output_dir):
36 makedirs(self._output_dir)
37 self._interval = interval
38 if filter:
39 self._filter = filter
40 else:
41 self._filter = ["references", "isreferencedby", "cites", "iscitedby"]
42 # Checkpoint file path
43 if state_file:
44 self._state_file = state_file
45 else:
46 self._state_file = join(self._output_dir, "processing_state.json")
47 super(DatacitePreProcessing, self).__init__()
49 def load_checkpoint(self):
50 """Loads the last processing state if available."""
51 if exists(self._state_file):
52 with open(self._state_file, 'r') as f:
53 state = json.load(f)
54 print(f"Resuming from count {state['count']} and {len(state['processed_files'])} files.")
55 return set(state['processed_files']), state['count']
56 return set(), 0
58 def save_checkpoint(self, processed_files, count):
59 """Saves the current list of processed files and global count."""
60 with open(self._state_file, 'w') as f:
61 json.dump({
62 "processed_files": list(processed_files),
63 "count": count
64 }, f)
66 def split_input(self):
68 #initialize state
69 processed_files_set, global_count = self.load_checkpoint()
71 # Files that have been read but their data is still in the 'data' buffer (not yet written)
72 pending_files = []
74 with tarfile.open(self._input_tar, 'r') as tar:
75 data=[]
77 # 1. find jsonl gz archives
78 jsonl_gz_files = [member for member in tar.getmembers()
79 if member.name.endswith('.jsonl.gz') and member.isfile()]
81 # 2. Iterate through each member
82 for member in jsonl_gz_files:
84 #skip already fully processed files
85 if member.name in processed_files_set:
86 continue
88 print(f"Processing: {member.name}")
89 pending_files.append(member.name)
91 # 3. Extract the file object from the tar archive
92 f_obj = tar.extractfile(member)
94 if f_obj is not None:
95 # 4. Read the extracted file object as a gzip stream
96 with gzip.open(f_obj, mode='rt', encoding='utf-8') as f:
97 for line in f:
98 try:
99 linedict = json.loads(line)
101 #filter for entities without dois
102 if 'id' not in linedict or 'type' not in linedict:
103 continue
104 if linedict['type'] != "dois":
105 continue
107 #filter for entities not involved in citations
108 attributes = linedict["attributes"]
109 rel_ids = attributes.get("relatedIdentifiers")
111 if rel_ids:
112 match_found = False
113 for ref in rel_ids:
114 if all(elem in ref for elem in self._needed_info):
115 relatedIdentifierType = (str(ref["relatedIdentifierType"])).lower()
116 relationType = str(ref["relationType"]).lower()
117 relatedIdentifier = str(ref["relatedIdentifier"])
118 #ignore also entities with self citations
119 if relatedIdentifierType == "doi" and relationType in self._filter and relatedIdentifier != linedict['id']:
120 match_found = True
121 break
123 if match_found:
124 data.append(linedict)
125 global_count += 1
126 new_data = self.splitted_to_file(global_count, self._interval, self._output_dir,
127 data)
128 if len(new_data) == 0 and len(data) > 0:
129 safe_to_commit = pending_files[:-1]
130 if safe_to_commit:
131 processed_files_set.update(safe_to_commit)
132 self.save_checkpoint(processed_files_set, global_count)
133 pending_files = [member.name]
134 data = new_data
135 except json.JSONDecodeError:
136 continue
137 if data:
138 print(f"Flushing final {len(data)} entities (count {global_count}).")
139 self.splitted_to_file(global_count, 1, self._output_dir, data) # interval=1 forces write
141 if pending_files:
142 processed_files_set.update(pending_files)
143 self.save_checkpoint(processed_files_set, global_count)
144 print(f"Completed all files. Total entities: {global_count}")
148 def splitted_to_file(self, cur_n, target_n, out_dir, data, headers=None):
149 makedirs(out_dir, exist_ok=True)
150 dict_to_json = dict()
151 #check if the interval is reached
152 if int(cur_n) != 0 and int(cur_n) % int(target_n) == 0 and data:
154 filename = "jSonFile_" + str(cur_n // target_n) + self._req_type
155 file_path = os.path.join(out_dir, filename)
157 print(f"Writing {filename}")
158 with open(file_path, 'w', encoding="utf8") as json_file:
159 dict_to_json["data"] = data
160 json.dump(dict_to_json, json_file, ensure_ascii=False, indent=2)
162 return []
163 else:
164 return data