Coverage for test/duplicated_ids_from_files_test.py: 99%

111 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-12-20 08:55 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2025 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17import csv 

18import json 

19import os 

20import shutil 

21import tempfile 

22import unittest 

23import zipfile 

24from collections import defaultdict 

25 

26from oc_meta.run.find.duplicated_ids_from_files import ( 

27 load_and_merge_temp_csv, process_chunk, process_zip_file, 

28 read_and_analyze_zip_files, save_chunk_to_temp_csv, save_duplicates_to_csv) 

29 

30 

31class TestDuplicatedIdsFromFiles(unittest.TestCase): 

32 def setUp(self): 

33 self.test_dir = tempfile.mkdtemp() 

34 self.id_dir = os.path.join(self.test_dir, 'id') 

35 os.makedirs(self.id_dir) 

36 self.temp_dir = tempfile.mkdtemp() 

37 

38 self.test_rdf_with_duplicates = self._create_test_rdf_data() 

39 self.test_zip_paths = self._create_test_zip_files() 

40 

41 def tearDown(self): 

42 shutil.rmtree(self.test_dir, ignore_errors=True) 

43 shutil.rmtree(self.temp_dir, ignore_errors=True) 

44 

45 def _create_test_rdf_data(self): 

46 rdf_data = [ 

47 { 

48 "@graph": [ 

49 { 

50 "@id": "https://w3id.org/oc/meta/id/1", 

51 "http://purl.org/spar/datacite/usesIdentifierScheme": { 

52 "@id": "http://purl.org/spar/datacite/doi" 

53 }, 

54 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue": { 

55 "@value": "10.1234/test1" 

56 } 

57 } 

58 ] 

59 }, 

60 { 

61 "@graph": [ 

62 { 

63 "@id": "https://w3id.org/oc/meta/id/2", 

64 "http://purl.org/spar/datacite/usesIdentifierScheme": { 

65 "@id": "http://purl.org/spar/datacite/doi" 

66 }, 

67 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue": { 

68 "@value": "10.1234/test1" 

69 } 

70 } 

71 ] 

72 }, 

73 { 

74 "@graph": [ 

75 { 

76 "@id": "https://w3id.org/oc/meta/id/3", 

77 "http://purl.org/spar/datacite/usesIdentifierScheme": { 

78 "@id": "http://purl.org/spar/datacite/doi" 

79 }, 

80 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue": { 

81 "@value": "10.1234/test2" 

82 } 

83 } 

84 ] 

85 }, 

86 { 

87 "@graph": [ 

88 { 

89 "@id": "https://w3id.org/oc/meta/id/4", 

90 "http://purl.org/spar/datacite/usesIdentifierScheme": { 

91 "@id": "http://purl.org/spar/datacite/orcid" 

92 }, 

93 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue": { 

94 "@value": "0000-0001-2345-6789" 

95 } 

96 } 

97 ] 

98 }, 

99 { 

100 "@graph": [ 

101 { 

102 "@id": "https://w3id.org/oc/meta/id/5", 

103 "http://purl.org/spar/datacite/usesIdentifierScheme": { 

104 "@id": "http://purl.org/spar/datacite/orcid" 

105 }, 

106 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue": { 

107 "@value": "0000-0001-2345-6789" 

108 } 

109 } 

110 ] 

111 }, 

112 { 

113 "@graph": [ 

114 { 

115 "@id": "https://w3id.org/oc/meta/id/6", 

116 "http://purl.org/spar/datacite/usesIdentifierScheme": { 

117 "@id": "http://purl.org/spar/datacite/doi" 

118 }, 

119 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue": { 

120 "@value": "10.1234/test3", 

121 "@type": "http://www.w3.org/2001/XMLSchema#string" 

122 } 

123 } 

124 ] 

125 }, 

126 { 

127 "@graph": [ 

128 { 

129 "@id": "https://w3id.org/oc/meta/id/7", 

130 "http://purl.org/spar/datacite/usesIdentifierScheme": { 

131 "@id": "http://purl.org/spar/datacite/doi" 

132 }, 

133 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue": { 

134 "@value": "10.1234/test3" 

135 } 

136 } 

137 ] 

138 } 

139 ] 

140 return rdf_data 

141 

142 def _create_test_zip_files(self): 

143 zip_paths = [] 

144 for i in range(4): 

145 zip_path = os.path.join(self.id_dir, f'test_{i}.zip') 

146 with zipfile.ZipFile(zip_path, 'w') as zf: 

147 start_idx = i * 2 

148 end_idx = min(start_idx + 2, len(self.test_rdf_with_duplicates)) 

149 for j in range(start_idx, end_idx): 

150 rdf_content = json.dumps(self.test_rdf_with_duplicates[j], indent=2) 

151 zf.writestr(f'rdf_{j}.json', rdf_content) 

152 zip_paths.append(zip_path) 

153 return zip_paths 

154 

155 def test_process_zip_file(self): 

156 result = process_zip_file(self.test_zip_paths[0]) 

157 

158 self.assertIsInstance(result, dict) 

159 self.assertGreater(len(result), 0) 

160 

161 doi_key = ("http://purl.org/spar/datacite/doi", "10.1234/test1") 

162 self.assertIn(doi_key, result) 

163 self.assertIn("https://w3id.org/oc/meta/id/1", result[doi_key]) 

164 

165 def test_save_and_load_chunk_csv(self): 

166 entity_info = defaultdict(set) 

167 entity_info[("http://purl.org/spar/datacite/doi", "10.1234/test1")] = { 

168 "https://w3id.org/oc/meta/id/1", 

169 "https://w3id.org/oc/meta/id/2" 

170 } 

171 entity_info[("http://purl.org/spar/datacite/doi", "10.1234/test2")] = { 

172 "https://w3id.org/oc/meta/id/3" 

173 } 

174 

175 temp_file = os.path.join(self.temp_dir, 'test_chunk.csv') 

176 save_chunk_to_temp_csv(entity_info, temp_file) 

177 

178 self.assertTrue(os.path.exists(temp_file)) 

179 

180 loaded_info = defaultdict(set) 

181 load_and_merge_temp_csv(temp_file, loaded_info) 

182 

183 self.assertEqual(len(loaded_info), 2) 

184 doi_key = ("http://purl.org/spar/datacite/doi", "10.1234/test1") 

185 self.assertIn(doi_key, loaded_info) 

186 self.assertEqual(len(loaded_info[doi_key]), 2) 

187 

188 def test_process_chunk(self): 

189 chunk_files = self.test_zip_paths[:2] 

190 temp_file = process_chunk(chunk_files, self.temp_dir, 0) 

191 

192 self.assertTrue(os.path.exists(temp_file)) 

193 

194 with open(temp_file, 'r', encoding='utf-8') as f: 

195 reader = csv.DictReader(f) 

196 rows = list(reader) 

197 self.assertGreater(len(rows), 0) 

198 

199 def test_save_duplicates_to_csv(self): 

200 entity_info = defaultdict(set) 

201 entity_info[("http://purl.org/spar/datacite/doi", "10.1234/test1")] = { 

202 "https://w3id.org/oc/meta/id/1", 

203 "https://w3id.org/oc/meta/id/2", 

204 "https://w3id.org/oc/meta/id/3" 

205 } 

206 entity_info[("http://purl.org/spar/datacite/doi", "10.1234/test2")] = { 

207 "https://w3id.org/oc/meta/id/4" 

208 } 

209 

210 output_file = os.path.join(self.temp_dir, 'duplicates.csv') 

211 save_duplicates_to_csv(entity_info, output_file) 

212 

213 self.assertTrue(os.path.exists(output_file)) 

214 

215 with open(output_file, 'r', encoding='utf-8') as f: 

216 reader = csv.DictReader(f) 

217 rows = list(reader) 

218 

219 self.assertEqual(len(rows), 1) 

220 

221 self.assertIn(rows[0]['surviving_entity'], { 

222 "https://w3id.org/oc/meta/id/1", 

223 "https://w3id.org/oc/meta/id/2", 

224 "https://w3id.org/oc/meta/id/3" 

225 }) 

226 

227 merged_entities = rows[0]['merged_entities'].split('; ') 

228 self.assertEqual(len(merged_entities), 2) 

229 

230 def test_read_and_analyze_zip_files(self): 

231 output_csv = os.path.join(self.temp_dir, 'output.csv') 

232 

233 read_and_analyze_zip_files(self.test_dir, output_csv, chunk_size=2) 

234 

235 self.assertTrue(os.path.exists(output_csv)) 

236 

237 with open(output_csv, 'r', encoding='utf-8') as f: 

238 reader = csv.DictReader(f) 

239 rows = list(reader) 

240 

241 self.assertGreater(len(rows), 0) 

242 

243 def test_chunking_behavior(self): 

244 output_csv = os.path.join(self.temp_dir, 'output_chunked.csv') 

245 

246 read_and_analyze_zip_files(self.test_dir, output_csv, chunk_size=1) 

247 

248 self.assertTrue(os.path.exists(output_csv)) 

249 

250 with open(output_csv, 'r', encoding='utf-8') as f: 

251 reader = csv.DictReader(f) 

252 rows = list(reader) 

253 

254 for row in rows: 

255 self.assertIn('surviving_entity', row) 

256 self.assertIn('merged_entities', row) 

257 

258 def test_datatype_normalization(self): 

259 output_csv = os.path.join(self.temp_dir, 'output_datatype.csv') 

260 

261 read_and_analyze_zip_files(self.test_dir, output_csv, chunk_size=2) 

262 

263 self.assertTrue(os.path.exists(output_csv)) 

264 

265 with open(output_csv, 'r', encoding='utf-8') as f: 

266 reader = csv.DictReader(f) 

267 rows = list(reader) 

268 

269 id6_and_id7_merged = False 

270 for row in rows: 

271 all_entities = {row['surviving_entity']} | set(row['merged_entities'].split('; ')) 

272 

273 if ('https://w3id.org/oc/meta/id/6' in all_entities and 

274 'https://w3id.org/oc/meta/id/7' in all_entities): 

275 id6_and_id7_merged = True 

276 break 

277 

278 self.assertTrue( 

279 id6_and_id7_merged, 

280 "ID 6 (with xsd:string datatype) and ID 7 (without datatype) should be merged as duplicates" 

281 ) 

282 

283 

284if __name__ == "__main__": 

285 unittest.main()