Coverage for oc_meta/plugins/analyser.py: 88%

237 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1#!python 

2# Copyright 2022, Arcangelo Massari <arcangelo.massari@unibo.it> 

3# 

4# Permission to use, copy, modify, and/or distribute this software for any purpose 

5# with or without fee is hereby granted, provided that the above copyright notice 

6# and this permission notice appear in all copies. 

7# 

8# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

9# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

10# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

11# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

12# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

13# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

14# SOFTWARE. 

15 

16from __future__ import annotations 

17 

18import os 

19import re 

20from datetime import datetime 

21from functools import cmp_to_key 

22from typing import Dict, List 

23 

24from dateutil.parser import parse 

25from tqdm import tqdm 

26 

27from oc_meta.lib.file_manager import get_csv_data, write_csv 

28from oc_meta.lib.master_of_regex import name_and_ids 

29 

30 

31class OCMetaAnalyser: 

32 def __init__(self, csv_dump_path: str): 

33 self.csv_dump_path = csv_dump_path 

34 

35 def merge_rows_by_id(self, output_dir: str) -> None: 

36 ids_by_csv = dict() 

37 for filename in os.listdir(self.csv_dump_path): 

38 csv_data = get_csv_data(os.path.join(self.csv_dump_path, filename)) 

39 for i, row in enumerate(csv_data): 

40 metaid = [identifier for identifier in row['id'].split() if identifier.split(':', maxsplit=1)[0] == 'omid'][0] 

41 ids_by_csv.setdefault(metaid, dict()) 

42 ids_by_csv[metaid].setdefault(filename, set()) 

43 ids_by_csv[metaid][filename].add(i) 

44 storer = dict() 

45 for metaid, filenames in ids_by_csv.items(): 

46 if len(filenames) > 1: 

47 sorted_filenames = sorted([name for name in filenames], key=cmp_to_key(self.sort_csv_filenames)) 

48 to_be_overritten = sorted_filenames[:-1] 

49 latest_file = sorted_filenames[-1] 

50 for filename in to_be_overritten: 

51 storer.setdefault(filename, set()) 

52 storer[filename].update(ids_by_csv[metaid][filename]) 

53 storer.setdefault(latest_file, set()) 

54 else: 

55 storer.setdefault(list(filenames.keys())[0], set()) 

56 for filename, rows in storer.items(): 

57 old_data = get_csv_data(os.path.join(self.csv_dump_path, filename)) 

58 new_data = [row for i, row in enumerate(old_data) if i not in rows] 

59 write_csv( 

60 path=os.path.join(output_dir, filename), 

61 datalist=new_data, 

62 fieldnames=['id', 'title', 'pub_date', 'page', 'type', 'author', 'editor', 'publisher', 'volume', 'venue', 'issue'], 

63 method='w') 

64 

65 @staticmethod 

66 def sort_csv_filenames(file_1, file_2) -> str: 

67 file_1_date = datetime.strptime(file_1.split('_')[1].replace('.csv', ''), '%Y-%m-%dT%H-%M-%S') 

68 file_2_date = datetime.strptime(file_2.split('_')[1].replace('.csv', ''), '%Y-%m-%dT%H-%M-%S') 

69 if file_1_date > file_2_date: 

70 return 1 

71 elif file_1_date < file_2_date: 

72 return -1 

73 elif file_1_date == file_2_date: 

74 if int(file_1.split('_')[0]) > int(file_2.split('_')[0]): 

75 return 1 

76 else: 

77 return -1 

78 

79 def explore_csv_dump(self, analyser: callable) -> None|int|dict: 

80 global_output = None 

81 filenames = sorted(os.listdir(self.csv_dump_path)) 

82 pbar = tqdm(total=len(filenames)) 

83 for i, filename in enumerate(filenames): 

84 csv_data = get_csv_data(os.path.join(self.csv_dump_path, filename)) 

85 local_output = analyser(csv_data) 

86 if i == 0: 

87 if isinstance(local_output, int): 

88 global_output = 0 

89 elif isinstance(local_output, dict): 

90 global_output = dict() 

91 elif isinstance(local_output, set): 

92 global_output = set() 

93 if isinstance(local_output, int): 

94 global_output += local_output 

95 elif isinstance(local_output, dict): 

96 for k,v in local_output.items(): 

97 if k in global_output: 

98 for i_k, _ in v.items(): 

99 if i_k in global_output[k]: 

100 if isinstance(global_output[k][i_k], set): 

101 global_output[k][i_k].update(local_output[k][i_k]) 

102 else: 

103 global_output[k][i_k] = local_output[k][i_k] 

104 else: 

105 global_output[k] = local_output[k] 

106 elif isinstance(local_output, set): 

107 global_output.update(local_output) 

108 pbar.update() 

109 pbar.close() 

110 if isinstance(global_output, int): 

111 return str(global_output) 

112 elif isinstance(global_output, dict): 

113 return global_output 

114 elif isinstance(global_output, set): 

115 return str(len(global_output)) 

116 

117 

118class OCMetaCounter(OCMetaAnalyser): 

119 def __init__(self, csv_dump_path: str): 

120 super(OCMetaCounter, self).__init__(csv_dump_path) 

121 

122 def get_top(self, what: str, by_what: str, number: int|None = None) -> dict: 

123 counter_func = getattr(self, f'count_{what}_by_{by_what}') 

124 all_data = self.explore_csv_dump(counter_func) 

125 all_data_sorted: list = sorted(all_data, key=lambda k: len(all_data[k][by_what]), reverse=True) 

126 top_n = all_data_sorted[:number] if number is not None else all_data_sorted 

127 all_top_n = [(k, v) for k, v in all_data.items() if k in top_n] 

128 for tuple_k_v in all_top_n: 

129 tuple_k_v[1]['total'] = len(tuple_k_v[1][by_what]) 

130 all_top_n = [(meta, {k: v for k, v in data.items() if not isinstance(v, set)}) for meta, data in all_top_n] 

131 return sorted(all_top_n, key=lambda x: top_n.index(x[0])) 

132 

133 def count(self, what: str) -> int: 

134 counter_func = getattr(self, f'count_{what}') 

135 return self.explore_csv_dump(counter_func) 

136 

137 def count_authors(self, csv_data: List[dict]) -> int: 

138 count = 0 

139 for row in csv_data: 

140 count += len(list(filter(None, row['author'].split('; ')))) 

141 return count 

142 

143 def count_editors(self, csv_data: List[dict]) -> int: 

144 count = 0 

145 for row in csv_data: 

146 count += len(list(filter(None, row['editor'].split('; ')))) 

147 return count 

148 

149 def count_publishers(self, csv_data: List[dict]) -> set: 

150 publishers = set() 

151 for row in csv_data: 

152 if row['publisher']: 

153 pub_name_and_ids = re.search(name_and_ids, row['publisher']) 

154 if pub_name_and_ids: 

155 pub_name = pub_name_and_ids.group(1).lower() 

156 publishers.add(pub_name) 

157 return publishers 

158 

159 def count_venues(self, csv_data: List[dict]) -> set: 

160 venues = set() 

161 for row in csv_data: 

162 if row['venue']: 

163 ven_name_and_ids = re.search(name_and_ids, row['venue']) 

164 venue_name = ven_name_and_ids.group(1).lower() 

165 venue_ids = set(ven_name_and_ids.group(2).split()) 

166 venue_metaid = [identifier for identifier in venue_ids if identifier.split(':', maxsplit=1)[0] == 'omid'][0] 

167 if not venue_ids.difference({venue_metaid}): 

168 venues.add(venue_name) 

169 else: 

170 venues.add(venue_metaid) 

171 return venues 

172 

173 def count_publishers_by_venue(self, csv_data: List[dict]) -> Dict[str, Dict[str, set|str]]: 

174 publishers_by_venue = dict() 

175 for row in csv_data: 

176 publisher_name_and_ids = re.search(name_and_ids, row['publisher']) 

177 venue_name_and_ids = re.search(name_and_ids, row['venue']) 

178 if publisher_name_and_ids and venue_name_and_ids: 

179 publisher_name = publisher_name_and_ids.group(1).lower() 

180 venue_name: str = venue_name_and_ids.group(1).lower() 

181 venue_ids = set(venue_name_and_ids.group(2).split()) 

182 venue_metaid = [identifier for identifier in venue_ids if identifier.split(':', maxsplit=1)[0] == 'omid'][0] 

183 publishers_by_venue.setdefault(publisher_name, {'name': publisher_name, 'venue': set()}) 

184 venue_key = venue_name if not venue_ids.difference({venue_metaid}) else venue_metaid 

185 publishers_by_venue[publisher_name]['venue'].add(venue_key) 

186 return publishers_by_venue 

187 

188 def count_publishers_by_publication(self, csv_data: List[dict]) -> Dict[str, Dict[str, set|str]]: 

189 publishers_by_publication = dict() 

190 for row in csv_data: 

191 publishers_name_and_ids = re.search(name_and_ids, row['publisher']) 

192 if publishers_name_and_ids: 

193 publishers_name = publishers_name_and_ids.group(1) 

194 row_metaid = [identifier for identifier in row['id'].split() if identifier.split(':', maxsplit=1)[0] == 'omid'][0] 

195 publishers_by_publication.setdefault(publishers_name.lower(), {'name': publishers_name, 'publication': set()}) 

196 publishers_by_publication[publishers_name.lower()]['publication'].add(row_metaid) 

197 return publishers_by_publication 

198 

199 def count_venues_by_publication(self, csv_data: List[dict]) -> Dict[str, Dict[str, set|str]]: 

200 venues_by_publication = dict() 

201 for row in csv_data: 

202 venue_name_and_ids = re.search(name_and_ids, row['venue']) 

203 if venue_name_and_ids: 

204 venue_name = venue_name_and_ids.group(1) 

205 venue_ids = set(venue_name_and_ids.group(2).split()) 

206 venue_metaid = [identifier for identifier in venue_ids if identifier.split(':', maxsplit=1)[0] == 'omid'][0] 

207 row_metaid = [identifier for identifier in row['id'].split() if identifier.split(':', maxsplit=1)[0] == 'omid'][0] 

208 venue_key = venue_name.lower() if not venue_ids.difference({venue_metaid}) else venue_metaid 

209 venues_by_publication.setdefault(venue_key, {'name': venue_name, 'publication': set()}) 

210 venues_by_publication[venue_key]['publication'].add(row_metaid) 

211 return venues_by_publication 

212 

213 def count_years_by_publication(self, csv_data: List[dict]) -> Dict[str, Dict[str, set]]: 

214 years_by_publication = dict() 

215 for row in csv_data: 

216 pub_date = row['pub_date'] 

217 if pub_date: 

218 year = datetime.strftime(parse(pub_date), '%Y') 

219 row_metaid = [identifier for identifier in row['id'].split() if identifier.split(':', maxsplit=1)[0] == 'omid'][0] 

220 years_by_publication.setdefault(year, {'publication': set()}) 

221 years_by_publication[year]['publication'].add(row_metaid) 

222 return years_by_publication 

223 

224 def count_types_by_publication(self, csv_data: List[dict]) -> Dict[str, Dict[str, set|str]]: 

225 types_by_publication = dict() 

226 for row in csv_data: 

227 br_type = row['type'] 

228 if br_type: 

229 row_metaid = [identifier for identifier in row['id'].split() if identifier.split(':', maxsplit=1)[0] == 'omid'][0] 

230 types_by_publication.setdefault(br_type, {'publication': set()}) 

231 types_by_publication[br_type]['publication'].add(row_metaid) 

232 venue_name_and_ids = re.search(name_and_ids, row['venue']) 

233 if venue_name_and_ids: 

234 venue_name = venue_name_and_ids.group(1) 

235 venue_ids = set(venue_name_and_ids.group(2).split()) 

236 venue_type = self.get_venue_type(br_type, venue_ids) 

237 venue_metaid = [identifier for identifier in venue_ids if identifier.split(':', maxsplit=1)[0] == 'omid'][0] 

238 if venue_type: 

239 if not venue_ids.difference({venue_metaid}): 

240 venue_key = venue_name 

241 else: 

242 venue_key = venue_metaid 

243 types_by_publication.setdefault(venue_type, {'publication': set()}) 

244 types_by_publication[venue_type]['publication'].add(venue_key) 

245 return types_by_publication 

246 

247 @classmethod 

248 def get_venue_type(cls, br_type:str, venue_ids:list) -> str: 

249 schemas = {venue_id.split(':', maxsplit=1)[0] for venue_id in venue_ids} 

250 if br_type in {'journal article', 'journal volume', 'journal issue'}: 

251 venue_type = 'journal' 

252 elif br_type in {'book chapter', 'book part', 'book section', 'book track'}: 

253 venue_type = 'book' 

254 elif br_type in {'book', 'edited book', 'monograph', 'reference book'}: 

255 venue_type = 'book series' 

256 elif br_type == 'proceedings article': 

257 venue_type = 'proceedings' 

258 elif br_type in {'proceedings', 'report', 'standard', 'series'}: 

259 venue_type = 'series' 

260 elif br_type == 'reference entry': 

261 venue_type = 'reference book' 

262 elif br_type == 'report series': 

263 venue_type = 'report series' 

264 elif not br_type or br_type in {'dataset', 'data file', 'journal'}: 

265 venue_type = '' 

266 # Check the type based on the identifier scheme 

267 if any(identifier for identifier in venue_ids if not identifier.startswith('omid:')): 

268 try: 

269 if venue_type in {'journal', 'book series', 'series', 'report series'}: 

270 if 'isbn' in schemas or 'issn' not in schemas: 

271 # It is undecidable 

272 venue_type = '' 

273 elif venue_type in {'book', 'proceedings'}: 

274 if 'issn' in schemas or 'isbn' not in schemas: 

275 venue_type = '' 

276 elif venue_type == 'reference book': 

277 if 'isbn' in schemas and 'issn' not in schemas: 

278 venue_type = 'reference book' 

279 elif 'issn' in schemas and 'isbn' not in schemas: 

280 venue_type = 'journal' 

281 elif 'issn' in schemas and 'isbn' in schemas: 

282 venue_type = '' 

283 except UnboundLocalError: 

284 print(br_type, venue_ids) 

285 raise(UnboundLocalError) 

286 return venue_type