Coverage for oc_meta/plugins/analyser.py: 88%
237 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1#!python
2# Copyright 2022, Arcangelo Massari <arcangelo.massari@unibo.it>
3#
4# Permission to use, copy, modify, and/or distribute this software for any purpose
5# with or without fee is hereby granted, provided that the above copyright notice
6# and this permission notice appear in all copies.
7#
8# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
9# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
10# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
11# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
12# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
13# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
14# SOFTWARE.
16from __future__ import annotations
18import os
19import re
20from datetime import datetime
21from functools import cmp_to_key
22from typing import Dict, List
24from dateutil.parser import parse
25from tqdm import tqdm
27from oc_meta.lib.file_manager import get_csv_data, write_csv
28from oc_meta.lib.master_of_regex import name_and_ids
31class OCMetaAnalyser:
32 def __init__(self, csv_dump_path: str):
33 self.csv_dump_path = csv_dump_path
35 def merge_rows_by_id(self, output_dir: str) -> None:
36 ids_by_csv = dict()
37 for filename in os.listdir(self.csv_dump_path):
38 csv_data = get_csv_data(os.path.join(self.csv_dump_path, filename))
39 for i, row in enumerate(csv_data):
40 metaid = [identifier for identifier in row['id'].split() if identifier.split(':', maxsplit=1)[0] == 'omid'][0]
41 ids_by_csv.setdefault(metaid, dict())
42 ids_by_csv[metaid].setdefault(filename, set())
43 ids_by_csv[metaid][filename].add(i)
44 storer = dict()
45 for metaid, filenames in ids_by_csv.items():
46 if len(filenames) > 1:
47 sorted_filenames = sorted([name for name in filenames], key=cmp_to_key(self.sort_csv_filenames))
48 to_be_overritten = sorted_filenames[:-1]
49 latest_file = sorted_filenames[-1]
50 for filename in to_be_overritten:
51 storer.setdefault(filename, set())
52 storer[filename].update(ids_by_csv[metaid][filename])
53 storer.setdefault(latest_file, set())
54 else:
55 storer.setdefault(list(filenames.keys())[0], set())
56 for filename, rows in storer.items():
57 old_data = get_csv_data(os.path.join(self.csv_dump_path, filename))
58 new_data = [row for i, row in enumerate(old_data) if i not in rows]
59 write_csv(
60 path=os.path.join(output_dir, filename),
61 datalist=new_data,
62 fieldnames=['id', 'title', 'pub_date', 'page', 'type', 'author', 'editor', 'publisher', 'volume', 'venue', 'issue'],
63 method='w')
65 @staticmethod
66 def sort_csv_filenames(file_1, file_2) -> str:
67 file_1_date = datetime.strptime(file_1.split('_')[1].replace('.csv', ''), '%Y-%m-%dT%H-%M-%S')
68 file_2_date = datetime.strptime(file_2.split('_')[1].replace('.csv', ''), '%Y-%m-%dT%H-%M-%S')
69 if file_1_date > file_2_date:
70 return 1
71 elif file_1_date < file_2_date:
72 return -1
73 elif file_1_date == file_2_date:
74 if int(file_1.split('_')[0]) > int(file_2.split('_')[0]):
75 return 1
76 else:
77 return -1
79 def explore_csv_dump(self, analyser: callable) -> None|int|dict:
80 global_output = None
81 filenames = sorted(os.listdir(self.csv_dump_path))
82 pbar = tqdm(total=len(filenames))
83 for i, filename in enumerate(filenames):
84 csv_data = get_csv_data(os.path.join(self.csv_dump_path, filename))
85 local_output = analyser(csv_data)
86 if i == 0:
87 if isinstance(local_output, int):
88 global_output = 0
89 elif isinstance(local_output, dict):
90 global_output = dict()
91 elif isinstance(local_output, set):
92 global_output = set()
93 if isinstance(local_output, int):
94 global_output += local_output
95 elif isinstance(local_output, dict):
96 for k,v in local_output.items():
97 if k in global_output:
98 for i_k, _ in v.items():
99 if i_k in global_output[k]:
100 if isinstance(global_output[k][i_k], set):
101 global_output[k][i_k].update(local_output[k][i_k])
102 else:
103 global_output[k][i_k] = local_output[k][i_k]
104 else:
105 global_output[k] = local_output[k]
106 elif isinstance(local_output, set):
107 global_output.update(local_output)
108 pbar.update()
109 pbar.close()
110 if isinstance(global_output, int):
111 return str(global_output)
112 elif isinstance(global_output, dict):
113 return global_output
114 elif isinstance(global_output, set):
115 return str(len(global_output))
118class OCMetaCounter(OCMetaAnalyser):
119 def __init__(self, csv_dump_path: str):
120 super(OCMetaCounter, self).__init__(csv_dump_path)
122 def get_top(self, what: str, by_what: str, number: int|None = None) -> dict:
123 counter_func = getattr(self, f'count_{what}_by_{by_what}')
124 all_data = self.explore_csv_dump(counter_func)
125 all_data_sorted: list = sorted(all_data, key=lambda k: len(all_data[k][by_what]), reverse=True)
126 top_n = all_data_sorted[:number] if number is not None else all_data_sorted
127 all_top_n = [(k, v) for k, v in all_data.items() if k in top_n]
128 for tuple_k_v in all_top_n:
129 tuple_k_v[1]['total'] = len(tuple_k_v[1][by_what])
130 all_top_n = [(meta, {k: v for k, v in data.items() if not isinstance(v, set)}) for meta, data in all_top_n]
131 return sorted(all_top_n, key=lambda x: top_n.index(x[0]))
133 def count(self, what: str) -> int:
134 counter_func = getattr(self, f'count_{what}')
135 return self.explore_csv_dump(counter_func)
137 def count_authors(self, csv_data: List[dict]) -> int:
138 count = 0
139 for row in csv_data:
140 count += len(list(filter(None, row['author'].split('; '))))
141 return count
143 def count_editors(self, csv_data: List[dict]) -> int:
144 count = 0
145 for row in csv_data:
146 count += len(list(filter(None, row['editor'].split('; '))))
147 return count
149 def count_publishers(self, csv_data: List[dict]) -> set:
150 publishers = set()
151 for row in csv_data:
152 if row['publisher']:
153 pub_name_and_ids = re.search(name_and_ids, row['publisher'])
154 if pub_name_and_ids:
155 pub_name = pub_name_and_ids.group(1).lower()
156 publishers.add(pub_name)
157 return publishers
159 def count_venues(self, csv_data: List[dict]) -> set:
160 venues = set()
161 for row in csv_data:
162 if row['venue']:
163 ven_name_and_ids = re.search(name_and_ids, row['venue'])
164 venue_name = ven_name_and_ids.group(1).lower()
165 venue_ids = set(ven_name_and_ids.group(2).split())
166 venue_metaid = [identifier for identifier in venue_ids if identifier.split(':', maxsplit=1)[0] == 'omid'][0]
167 if not venue_ids.difference({venue_metaid}):
168 venues.add(venue_name)
169 else:
170 venues.add(venue_metaid)
171 return venues
173 def count_publishers_by_venue(self, csv_data: List[dict]) -> Dict[str, Dict[str, set|str]]:
174 publishers_by_venue = dict()
175 for row in csv_data:
176 publisher_name_and_ids = re.search(name_and_ids, row['publisher'])
177 venue_name_and_ids = re.search(name_and_ids, row['venue'])
178 if publisher_name_and_ids and venue_name_and_ids:
179 publisher_name = publisher_name_and_ids.group(1).lower()
180 venue_name: str = venue_name_and_ids.group(1).lower()
181 venue_ids = set(venue_name_and_ids.group(2).split())
182 venue_metaid = [identifier for identifier in venue_ids if identifier.split(':', maxsplit=1)[0] == 'omid'][0]
183 publishers_by_venue.setdefault(publisher_name, {'name': publisher_name, 'venue': set()})
184 venue_key = venue_name if not venue_ids.difference({venue_metaid}) else venue_metaid
185 publishers_by_venue[publisher_name]['venue'].add(venue_key)
186 return publishers_by_venue
188 def count_publishers_by_publication(self, csv_data: List[dict]) -> Dict[str, Dict[str, set|str]]:
189 publishers_by_publication = dict()
190 for row in csv_data:
191 publishers_name_and_ids = re.search(name_and_ids, row['publisher'])
192 if publishers_name_and_ids:
193 publishers_name = publishers_name_and_ids.group(1)
194 row_metaid = [identifier for identifier in row['id'].split() if identifier.split(':', maxsplit=1)[0] == 'omid'][0]
195 publishers_by_publication.setdefault(publishers_name.lower(), {'name': publishers_name, 'publication': set()})
196 publishers_by_publication[publishers_name.lower()]['publication'].add(row_metaid)
197 return publishers_by_publication
199 def count_venues_by_publication(self, csv_data: List[dict]) -> Dict[str, Dict[str, set|str]]:
200 venues_by_publication = dict()
201 for row in csv_data:
202 venue_name_and_ids = re.search(name_and_ids, row['venue'])
203 if venue_name_and_ids:
204 venue_name = venue_name_and_ids.group(1)
205 venue_ids = set(venue_name_and_ids.group(2).split())
206 venue_metaid = [identifier for identifier in venue_ids if identifier.split(':', maxsplit=1)[0] == 'omid'][0]
207 row_metaid = [identifier for identifier in row['id'].split() if identifier.split(':', maxsplit=1)[0] == 'omid'][0]
208 venue_key = venue_name.lower() if not venue_ids.difference({venue_metaid}) else venue_metaid
209 venues_by_publication.setdefault(venue_key, {'name': venue_name, 'publication': set()})
210 venues_by_publication[venue_key]['publication'].add(row_metaid)
211 return venues_by_publication
213 def count_years_by_publication(self, csv_data: List[dict]) -> Dict[str, Dict[str, set]]:
214 years_by_publication = dict()
215 for row in csv_data:
216 pub_date = row['pub_date']
217 if pub_date:
218 year = datetime.strftime(parse(pub_date), '%Y')
219 row_metaid = [identifier for identifier in row['id'].split() if identifier.split(':', maxsplit=1)[0] == 'omid'][0]
220 years_by_publication.setdefault(year, {'publication': set()})
221 years_by_publication[year]['publication'].add(row_metaid)
222 return years_by_publication
224 def count_types_by_publication(self, csv_data: List[dict]) -> Dict[str, Dict[str, set|str]]:
225 types_by_publication = dict()
226 for row in csv_data:
227 br_type = row['type']
228 if br_type:
229 row_metaid = [identifier for identifier in row['id'].split() if identifier.split(':', maxsplit=1)[0] == 'omid'][0]
230 types_by_publication.setdefault(br_type, {'publication': set()})
231 types_by_publication[br_type]['publication'].add(row_metaid)
232 venue_name_and_ids = re.search(name_and_ids, row['venue'])
233 if venue_name_and_ids:
234 venue_name = venue_name_and_ids.group(1)
235 venue_ids = set(venue_name_and_ids.group(2).split())
236 venue_type = self.get_venue_type(br_type, venue_ids)
237 venue_metaid = [identifier for identifier in venue_ids if identifier.split(':', maxsplit=1)[0] == 'omid'][0]
238 if venue_type:
239 if not venue_ids.difference({venue_metaid}):
240 venue_key = venue_name
241 else:
242 venue_key = venue_metaid
243 types_by_publication.setdefault(venue_type, {'publication': set()})
244 types_by_publication[venue_type]['publication'].add(venue_key)
245 return types_by_publication
247 @classmethod
248 def get_venue_type(cls, br_type:str, venue_ids:list) -> str:
249 schemas = {venue_id.split(':', maxsplit=1)[0] for venue_id in venue_ids}
250 if br_type in {'journal article', 'journal volume', 'journal issue'}:
251 venue_type = 'journal'
252 elif br_type in {'book chapter', 'book part', 'book section', 'book track'}:
253 venue_type = 'book'
254 elif br_type in {'book', 'edited book', 'monograph', 'reference book'}:
255 venue_type = 'book series'
256 elif br_type == 'proceedings article':
257 venue_type = 'proceedings'
258 elif br_type in {'proceedings', 'report', 'standard', 'series'}:
259 venue_type = 'series'
260 elif br_type == 'reference entry':
261 venue_type = 'reference book'
262 elif br_type == 'report series':
263 venue_type = 'report series'
264 elif not br_type or br_type in {'dataset', 'data file', 'journal'}:
265 venue_type = ''
266 # Check the type based on the identifier scheme
267 if any(identifier for identifier in venue_ids if not identifier.startswith('omid:')):
268 try:
269 if venue_type in {'journal', 'book series', 'series', 'report series'}:
270 if 'isbn' in schemas or 'issn' not in schemas:
271 # It is undecidable
272 venue_type = ''
273 elif venue_type in {'book', 'proceedings'}:
274 if 'issn' in schemas or 'isbn' not in schemas:
275 venue_type = ''
276 elif venue_type == 'reference book':
277 if 'isbn' in schemas and 'issn' not in schemas:
278 venue_type = 'reference book'
279 elif 'issn' in schemas and 'isbn' not in schemas:
280 venue_type = 'journal'
281 elif 'issn' in schemas and 'isbn' in schemas:
282 venue_type = ''
283 except UnboundLocalError:
284 print(br_type, venue_ids)
285 raise(UnboundLocalError)
286 return venue_type