Coverage for oc_meta/plugins/multiprocess/prepare_multiprocess.py: 69%

451 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-07-14 14:06 +0000

1#!/usr/bin/python 

2# -*- coding: utf-8 -*- 

3# Copyright (c) 2022 Arcangelo Massari <arcangelo.massari@unibo.it> 

4# 

5# Permission to use, copy, modify, and/or distribute this software for any purpose 

6# with or without fee is hereby granted, provided that the above copyright notice 

7# and this permission notice appear in all copies. 

8# 

9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 

10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND 

11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, 

12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, 

13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 

14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS 

15# SOFTWARE. 

16 

17 

18import os 

19import re 

20from typing import Dict, List 

21 

22import psutil 

23from tqdm import tqdm 

24 

25from oc_meta.constants import CONTAINER_EDITOR_TYPES 

26from oc_meta.core.creator import Creator 

27from oc_meta.lib.file_manager import (get_csv_data, pathoo, sort_files, 

28 write_csv) 

29from oc_meta.lib.master_of_regex import (comma_and_spaces, name_and_ids, 

30 semicolon_in_people_field) 

31 

32FORBIDDEN_IDS = {'issn:0000-0000'} 

33VENUES = {'archival-document', 'book', 'book-part', 'book-section', 'book-series', 'book-set', 'edited-book', 'journal', 'journal-volume', 'journal-issue', 'monograph', 'proceedings-series', 'proceedings', 'reference-book', 'report-series', 'standard-series'} 

34 

35def prepare_relevant_items(csv_dir:str, output_dir:str, items_per_file:int, verbose:bool) -> None: 

36 ''' 

37 This function receives an input folder containing CSVs formatted for Meta.  

38 It output other CSVs, including deduplicated items only.  

39 You can specify how many items to insert in each output file. 

40 

41 :params csv_dir: the path to the folder containing the input CSV files 

42 :type csv_dir: str 

43 :params output_dir: the location of the folder to save to output file 

44 :type output_dir: str 

45 :params items_per_file: an integer to specify how many rows to insert in each output file 

46 :type items_per_file: int 

47 :params verbose: if True, show a loading bar, elapsed, and estimated time 

48 :type verbose: bool 

49 :returns: None -- This function returns None and saves the output CSV files in the `output_dir` folder 

50 ''' 

51 files = [os.path.join(csv_dir, file) for file in sort_files(os.listdir(csv_dir)) if file.endswith('.csv')] 

52 pbar = tqdm(total=len(files)) if verbose else None 

53 pathoo(output_dir) 

54 ids_found = set() 

55 venues_found = dict() 

56 duplicated_ids = dict() 

57 venues_by_id = dict() 

58 duplicated_venues = dict() 

59 resp_agents_found = set() 

60 resp_agents_by_id = dict() 

61 duplicated_resp_agents = dict() 

62 editors_found = set() 

63 # Look for all venues, responsible agents, and publishers 

64 for file in files: 

65 data = get_csv_data(file) 

66 _get_duplicated_ids(data=data, ids_found=ids_found, editors_found=editors_found, items_by_id=duplicated_ids) 

67 _get_relevant_venues(data=data, ids_found=venues_found, items_by_id=venues_by_id, duplicated_items=duplicated_venues) 

68 _get_resp_agents(data=data, ids_found=resp_agents_found, items_by_id=resp_agents_by_id, duplicated_items=duplicated_resp_agents) 

69 pbar.update() if verbose else None 

70 pbar.close() if verbose else None 

71 if verbose: 

72 print('[INFO:prepare_multiprocess] Enriching the duplicated bibliographic resources found') 

73 pbar = tqdm(total=len(files)) 

74 for file in files: 

75 data = get_csv_data(file) 

76 _enrich_duplicated_ids_found(data, duplicated_ids) 

77 pbar.update() if verbose else None 

78 pbar.close() if verbose else None 

79 ids_merged = _do_collective_merge(duplicated_ids, duplicated_ids) 

80 venues_merged = _do_collective_merge(venues_by_id, duplicated_venues) 

81 resp_agents_merged = _do_collective_merge(resp_agents_by_id, duplicated_resp_agents) 

82 fieldnames = ['id', 'title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type', 'publisher', 'editor'] 

83 __save_relevant_venues(venues_merged, items_per_file, output_dir, fieldnames) 

84 __save_ids(ids_merged, items_per_file, output_dir, fieldnames) 

85 for field in ['author', 'editor', 'publisher']: 

86 __save_responsible_agents(resp_agents_merged, items_per_file, output_dir, fieldnames, field) 

87 

88def _get_duplicated_ids(data: List[dict], ids_found: set, editors_found: set, items_by_id: Dict[str, dict]) -> None: 

89 cur_file_ids = set() 

90 cur_file_venue_ids = set() 

91 for row in data: 

92 ids_list = row['id'].split() 

93 venue_name_and_ids = re.search(name_and_ids, row['venue']) 

94 venue_ids = venue_name_and_ids.group(2).split() if venue_name_and_ids else [] 

95 if any(id in ids_found and (id not in cur_file_ids or id in items_by_id) for id in ids_list) or \ 

96 ((row['editor'] and row['author'] and row['venue'] and row['type'] in CONTAINER_EDITOR_TYPES) and \ 

97 any(id in editors_found and (id not in cur_file_venue_ids) for id in venue_ids)): 

98 for id in ids_list: 

99 items_by_id.setdefault(id, {'others': set()}) 

100 items_by_id[id]['others'].update( 

101 {other for other in ids_list if other != id}) 

102 for field in ['title', 'author', 'pub_date', 'venue', 'volume', 'issue', 'page', 'type', 'publisher', 'editor']: 

103 if field in items_by_id[id]: 

104 if len(row[field]) < len(items_by_id[id][field]): 

105 continue 

106 items_by_id[id][field] = row[field] 

107 cur_file_ids.update(ids_list) 

108 cur_file_venue_ids.update(venue_ids) 

109 editors_found.update(venue_ids) 

110 ids_found.update(ids_list) 

111 

112def _enrich_duplicated_ids_found(data:List[dict], items_by_id:Dict[str, dict]) -> None: 

113 for row in data: 

114 br_ids = row['id'].split() 

115 for br_id in br_ids: 

116 if br_id in items_by_id: 

117 all_ids = __find_all_ids_by_key(items_by_id, br_id) 

118 for field, value in row.items(): 

119 if field != 'id': 

120 for all_id in all_ids: 

121 if len(value) > len(items_by_id[all_id][field]): 

122 items_by_id[all_id][field] = value 

123 

124def _get_relevant_venues(data:List[dict], ids_found:dict, items_by_id:Dict[str, dict], duplicated_items:Dict[str, dict]) -> None: 

125 cur_file_ids = dict() 

126 for row in data: 

127 venue = row['venue'] 

128 venues = list() 

129 if row['type'] in VENUES: 

130 venues.append((row['title'], row['id'], row['type'], row['publisher'])) 

131 if venue: 

132 full_name_and_ids = re.search(name_and_ids, venue) 

133 name = full_name_and_ids.group(1) if full_name_and_ids else venue 

134 ids = full_name_and_ids.group(2) if full_name_and_ids else None 

135 if ids: 

136 try: 

137 br_type = Creator.get_venue_type(row['type'], ids.split()) 

138 except UnboundLocalError: 

139 print(f"[INFO:prepare_multiprocess] I found the venue {row['venue']} for the resource of type {row['type']}, but I don't know how to handle it") 

140 raise UnboundLocalError 

141 venues.append((name, ids, br_type, row['publisher'])) 

142 for venue_tuple in venues: 

143 name, ids, br_type, publisher = venue_tuple 

144 ids_list = [identifier for identifier in ids.split() if identifier not in FORBIDDEN_IDS] 

145 if any(id in ids_found and (id not in cur_file_ids or id in duplicated_items) for id in ids_list): 

146 for id in ids_list: 

147 duplicated_items.setdefault(id, {'others': set(), 'name': name, 'type': br_type, 'volume': dict(), 'issue': set(), 'publisher': publisher}) 

148 duplicated_items[id]['others'].update({other for other in ids_list if other != id}) 

149 for id in ids_list: 

150 items_by_id.setdefault(id, {'others': set(), 'name': name, 'type': br_type, 'volume': dict(), 'issue': set(), 'publisher': publisher}) 

151 items_by_id[id]['others'].update({other for other in ids_list if other != id}) 

152 ids_found.setdefault(id, {'volumes': dict(), 'issues': set()}) 

153 cur_file_ids.setdefault(id, {'volumes': dict(), 'issues': set()}) 

154 volume = row['volume'] 

155 issue = row['issue'] 

156 if volume: 

157 if any(volume in ids_found[id]['volumes'] and volume not in cur_file_ids[id]['volumes'] for id in ids_list): 

158 for id in ids_list: 

159 duplicated_items[id]['volume'].setdefault(volume, set()) 

160 for id in ids_list: 

161 ids_found[id]['volumes'].setdefault(volume, set()) 

162 cur_file_ids[id]['volumes'].setdefault(volume, set()) 

163 if issue: 

164 if any(issue in ids_found[id]['volumes'][volume] and issue not in cur_file_ids[id]['volumes'][volume] for id in ids_list): 

165 for id in ids_list: 

166 duplicated_items[id]['volume'].setdefault(volume, set()) 

167 duplicated_items[id]['volume'][volume].add(issue) 

168 for id in ids_list: 

169 cur_file_ids[id]['volumes'][volume].add(issue) 

170 ids_found[id]['volumes'][volume].add(issue) 

171 elif not volume and issue: 

172 if any(issue in ids_found[id]['issues'] and issue not in cur_file_ids[id]['issues'] for id in ids_list): 

173 for id in ids_list: 

174 duplicated_items[id]['issue'].add(issue) 

175 for id in ids_list: 

176 ids_found[id]['issues'].add(row['issue']) 

177 cur_file_ids[id]['issues'].add(row['issue']) 

178 

179def _get_resp_agents(data:List[dict], ids_found:set, items_by_id:Dict[str, Dict[str, set]], duplicated_items:Dict[str, dict]) -> None: 

180 cur_file_ids = set() 

181 for row in data: 

182 for field in {'author', 'editor', 'publisher'}: 

183 if row[field]: 

184 resp_agents = re.split(semicolon_in_people_field, row[field]) if field in {'author', 'editor'} else [row[field]] 

185 for resp_agent in resp_agents: 

186 full_name_and_ids = re.search(name_and_ids, resp_agent) 

187 name = full_name_and_ids.group(1) if full_name_and_ids else resp_agent 

188 ids = full_name_and_ids.group(2) if full_name_and_ids else None 

189 if ids: 

190 ids_list = [identifier for identifier in ids.split() if identifier not in FORBIDDEN_IDS] 

191 richest_name = _find_all_names(duplicated_items, ids_list, name) 

192 if any(id in ids_found and (id not in cur_file_ids or id in duplicated_items) for id in ids_list): 

193 for id in ids_list: 

194 duplicated_items.setdefault(id, {'others': set(), 'type': field}) 

195 duplicated_items[id]['name'] = richest_name 

196 duplicated_items[id]['others'].update({other for other in ids_list if other != id}) 

197 for id in ids_list: 

198 items_by_id.setdefault(id, {'others': set(), 'type': field}) 

199 items_by_id[id]['name'] = richest_name 

200 items_by_id[id]['others'].update({other for other in ids_list if other != id}) 

201 cur_file_ids.update(set(ids_list)) 

202 ids_found.update(set(ids_list)) 

203 

204def _find_all_names(items_by_id:Dict[str, Dict[str, set]], ids_list:list, cur_name:str) -> str: 

205 if ',' in cur_name: 

206 split_name = re.split(comma_and_spaces, cur_name) 

207 given_name = split_name[1].strip() 

208 if given_name and not (given_name.endswith('.') or len(given_name) == 1): 

209 return cur_name 

210 for id in ids_list: 

211 if id in items_by_id: 

212 other_name = items_by_id[id]['name'] 

213 if ',' in other_name: 

214 split_other_name = re.split(comma_and_spaces, other_name) 

215 family_other_name = split_other_name[0].strip() 

216 given_other_name = split_other_name[1].strip() 

217 if given_other_name and not (given_other_name.endswith('.') or len(given_other_name) == 1): 

218 return family_other_name + ', ' + given_other_name 

219 for id in ids_list: 

220 if id in items_by_id: 

221 other_name = items_by_id[id]['name'] 

222 if ',' in other_name: 

223 split_other_name = re.split(comma_and_spaces, other_name) 

224 family_other_name = split_other_name[0].strip() 

225 given_other_name = split_other_name[1].strip() 

226 if given_other_name: 

227 return family_other_name + ', ' + given_other_name 

228 return cur_name 

229 

230def _do_collective_merge(items_by_id:dict, duplicated_items:Dict[str, dict]) -> dict: 

231 merged_by_key:Dict[str, Dict[str, set]] = dict() 

232 ids_checked = set() 

233 for id, data in duplicated_items.items(): 

234 if id not in ids_checked: 

235 all_vi = None 

236 ids_found = {id} 

237 ids_to_be_checked = {id} 

238 ids_to_be_checked.update(data['others']) 

239 for id_to_be_checked in ids_to_be_checked: 

240 if id_to_be_checked not in ids_checked: 

241 output_ids = __find_all_ids_by_key(items_by_id, key=id_to_be_checked) 

242 ids_checked.update(output_ids) 

243 ids_found.update(output_ids) 

244 all_other_ids = {item for item in ids_found if item != id} 

245 ids_checked.update(ids_found) 

246 if 'volume' in data and 'issue' in data: 

247 if isinstance(data['volume'], dict): 

248 all_vi = __find_all_vi(items_by_id=duplicated_items, all_ids=ids_found) 

249 if data['type'] == 'author': 

250 richest_name = _find_all_names(items_by_id, ids_found, data['name']) 

251 data['name'] = richest_name 

252 if 'publisher' in data: 

253 publisher_with_id = _find_a_publisher_with_id(items_by_id, all_ids=ids_found) 

254 data['publisher'] = publisher_with_id if publisher_with_id else data['publisher'] 

255 merged_by_key[id] = {k:v if not k == 'others' else all_other_ids for k,v in data.items()} 

256 if all_vi: 

257 merged_by_key[id]['volume'] = all_vi['volume'] 

258 merged_by_key[id]['issue'] = all_vi['issue'] 

259 del items_by_id 

260 del duplicated_items 

261 return merged_by_key 

262 

263def _find_a_publisher_with_id(items_by_id:dict, all_ids: list): 

264 for id in all_ids: 

265 if id in items_by_id: 

266 item = items_by_id[id] 

267 if 'publisher' in item: 

268 pub_name_and_ids = re.search(name_and_ids, item['publisher']) 

269 if pub_name_and_ids: 

270 return item['publisher'] 

271 

272def __find_all_ids_by_key(items_by_id:dict, key:str): 

273 visited_items = set() 

274 items_to_visit = {item for item in items_by_id[key]['others']} 

275 while items_to_visit: 

276 for item in set(items_to_visit): 

277 if item not in visited_items: 

278 visited_items.add(item) 

279 items_to_visit.update({item for item in items_by_id[item]['others'] if item not in visited_items}) 

280 items_to_visit.remove(item) 

281 return visited_items 

282 

283def __find_all_vi(items_by_id:dict, all_ids:set) -> dict: 

284 all_vi = {'volume': dict(), 'issue': set()} 

285 for id in all_ids: 

286 if id in items_by_id: 

287 for volume, volume_issues in items_by_id[id]['volume'].items(): 

288 all_vi['volume'].setdefault(volume, set()).update(volume_issues) 

289 for venue_issues in items_by_id[id]['issue']: 

290 all_vi['issue'].add(venue_issues) 

291 return all_vi 

292 

293def __save_relevant_venues(items_by_id:dict, items_per_file:int, output_dir:str, fieldnames:list): 

294 output_dir = os.path.join(output_dir, 'venues') 

295 rows = list() 

296 counter = 0 

297 for item_id, data in items_by_id.items(): 

298 item_type = data['type'] 

299 item_publisher = data['publisher'] 

300 row = dict() 

301 name, ids = __get_name_and_ids(item_id, data) 

302 row['id'] = ids 

303 row['title'] = name 

304 row['type'] = item_type 

305 row['publisher'] = item_publisher 

306 for volume, volume_issues in data['volume'].items(): 

307 volume_row = dict() 

308 volume_row['volume'] = volume 

309 volume_row['venue'] = f'{name} [{ids}]' 

310 volume_row['publisher'] = item_publisher 

311 if volume_issues: 

312 volume_row['type'] = 'journal issue' 

313 for volume_issue in volume_issues: 

314 volume_issue_row = dict(volume_row) 

315 volume_issue_row['issue'] = volume_issue 

316 rows.append(volume_issue_row) 

317 else: 

318 volume_row['type'] = 'journal volume' 

319 rows.append(volume_row) 

320 for venue_issue in data['issue']: 

321 issue_row = dict() 

322 issue_row['venue'] = f'{name} [{ids}]' 

323 issue_row['issue'] = venue_issue 

324 issue_row['type'] = 'journal issue' 

325 issue_row['publisher'] = item_publisher 

326 rows.append(issue_row) 

327 if not data['volume'] and not data['issue']: 

328 rows.append(row) 

329 if len(rows) >= items_per_file: 

330 output_path = os.path.join(output_dir, f"{counter}.csv") 

331 write_csv(output_path, rows, fieldnames) 

332 rows = list() 

333 counter += 1 

334 output_path = os.path.join(output_dir, f"{counter}.csv") 

335 write_csv(output_path, rows, fieldnames) 

336 

337def __save_responsible_agents(items_by_id:dict, items_per_file:int, output_dir:str, fieldnames:list, field:str): 

338 rows = list() 

339 chunks = int(items_per_file) 

340 saved_chunks = 0 

341 items_to_be_processed = {k:v for k,v in items_by_id.items() if v['type'] == field} 

342 output_length = len(items_to_be_processed) 

343 for item_id, data in items_to_be_processed.items(): 

344 name, ids = __get_name_and_ids(item_id, data) 

345 output_path = os.path.join(output_dir, field + 's') 

346 rows.append({field: f'{name} [{ids}]'}) 

347 rows, saved_chunks = __store_data(rows, output_length, chunks, saved_chunks, output_path, fieldnames) 

348 

349def __save_ids(items_by_id:dict, items_per_file:int, output_dir:str, fieldnames:list): 

350 output_dir = os.path.join(output_dir, 'ids') 

351 rows = list() 

352 chunks = int(items_per_file) 

353 saved_chunks = 0 

354 output_length = len(items_by_id) 

355 for item_id, data in items_by_id.items(): 

356 ids_list = list(data['others']) 

357 ids_list.append(item_id) 

358 ids = ' '.join(ids_list) 

359 output_row = {'id': ids} 

360 for field in [f for f in fieldnames if f != 'id']: 

361 output_row[field] = data[field] 

362 rows.append(output_row) 

363 rows, saved_chunks = __store_data(rows, output_length, chunks, saved_chunks, output_dir, fieldnames) 

364 

365def __store_data(rows:list, output_length:int, chunks:int, saved_chunks:int, output_dir:str, fieldnames:str) -> list: 

366 data_about_to_end = (output_length - saved_chunks) < chunks and (output_length - saved_chunks) == len(rows) 

367 if len(rows) == chunks or data_about_to_end: 

368 saved_chunks = saved_chunks + chunks if not data_about_to_end else output_length 

369 filename = f'{str(saved_chunks)}.csv' 

370 output_path = os.path.join(output_dir, filename) 

371 write_csv(path=output_path, datalist=rows, fieldnames=fieldnames) 

372 rows = list() 

373 return rows, saved_chunks 

374 

375def __get_name_and_ids(item_id:str, data:dict): 

376 ids_list = list(data['others']) 

377 ids_list.append(item_id) 

378 name = data['name'] if 'name' in data else '' 

379 ids = ' '.join(ids_list) 

380 return name, ids 

381 

382def split_csvs_in_chunks(csv_dir:str, output_dir:str, chunk_size:int, verbose:bool=False) -> None: 

383 ''' 

384 This function splits all CSVs in a folder in smaller CSVs having a specified number of rows. 

385 Moreover, this function tries, where possible, to keep in a single file the bibliographic resources contained in the same venue.  

386 For this reason, the final rows number could be slightly over the specified one. 

387 

388 :params csv_dir: the path to the folder containing the input CSV files 

389 :type csv_dir: str 

390 :params output_dir: the location of the folder to save to output files 

391 :type output_dir: str 

392 :params chunk_size: an integer to specify how many rows to insert in each output file 

393 :type chunk_size: int 

394 :params verbose: if True, show a loading bar, elapsed, and estimated time 

395 :type verbose: bool 

396 :returns: None -- This function returns None and saves the output CSV files in the `output_dir` folder 

397 ''' 

398 files = [os.path.join(csv_dir, file) for file in sort_files(os.listdir(csv_dir)) if file.endswith('.csv')] 

399 pid = psutil.Process(os.getpid()) 

400 venues_occurrences = __index_all_venues(files, verbose) 

401 __split_csvs_by_venues(files, venues_occurrences, output_dir, pid, verbose) 

402 __split_in_chunks(output_dir, chunk_size, verbose) 

403 

404def __index_all_venues(files:list, verbose:bool) -> dict: 

405 if verbose: 

406 print('[INFO:prepare_multiprocess] Scanning venues') 

407 pbar = tqdm(total=len(files)) 

408 venues_occurrences = dict() 

409 for file in files: 

410 data = get_csv_data(file) 

411 for row in data: 

412 venues = list() 

413 if row['type'] in VENUES: 

414 venues.append(row['id'].split()) 

415 venue_and_ids = re.search(name_and_ids, row['venue']) 

416 if venue_and_ids: 

417 ids = venue_and_ids.group(2).split() 

418 venues.append(ids) 

419 for venue_ids in venues: 

420 for venue_id in venue_ids: 

421 venues_occurrences.setdefault(venue_id, {'others': set()}) 

422 venues_occurrences[venue_id]['others'].update({other for other in venue_ids if other != venue_id}) 

423 pbar.update() if verbose else None 

424 pbar.close() if verbose else None 

425 return venues_occurrences 

426 

427def __split_csvs_by_venues(files:list, venues_occurrences:dict, output_dir:str, pid:psutil.Process, verbose:bool): 

428 pathoo(output_dir) 

429 if verbose: 

430 print('[INFO:prepare_multiprocess] Splitting CSVs by venue') 

431 pbar = tqdm(total=len(files)) 

432 chunk_venues = dict() 

433 chunk_no_venues = dict() 

434 existing_files = set() 

435 no_venues_outdata = list() 

436 counter = 0 

437 for file in files: 

438 data = get_csv_data(file) 

439 for row in data: 

440 venues = list() 

441 if row['type'] in VENUES: 

442 venues.append(row['id'].split()) 

443 venue_and_ids = re.search(name_and_ids, row['venue']) 

444 if venue_and_ids: 

445 ids = venue_and_ids.group(2).split() 

446 venues.append(ids) 

447 if venues: 

448 output_filepath = None 

449 for venue_ids in venues: 

450 all_ids:list = venue_ids 

451 try: 

452 all_ids.extend(__find_all_ids_by_key(venues_occurrences, key=all_ids[0])) 

453 except IndexError: 

454 print(file, row) 

455 raise(IndexError) 

456 for any_id in all_ids: 

457 filename = any_id.replace(':', '').replace('/', '').replace('\\', '').replace('<', '').replace('>', '').replace(';', '') 

458 if os.path.join(output_dir, f'{filename}.csv') in existing_files: 

459 output_filepath = os.path.join(output_dir, f'{filename}.csv') 

460 filename = all_ids[0].replace(':', '').replace('/', '').replace('\\', '').replace('<', '').replace('>', '').replace(';', '') 

461 output_filepath = os.path.join(output_dir, f'{filename}.csv') if not output_filepath else output_filepath 

462 chunk_venues.setdefault(output_filepath, list()).append(row) 

463 chunk_venues = __dump_if_chunk_size(chunk_venues, existing_files, pid) 

464 elif not venues: 

465 no_venues_outdata.append(row) 

466 if len(no_venues_outdata) == 1000: 

467 no_venues_filepath = os.path.join(output_dir, f'no_venues_{counter}.csv') 

468 chunk_no_venues[no_venues_filepath] = no_venues_outdata 

469 counter += 1 

470 no_venues_outdata = list() 

471 chunk_no_venues = __dump_if_chunk_size(chunk_no_venues, existing_files, pid) 

472 pbar.update() if verbose else None 

473 pbar.close() if verbose else None 

474 if no_venues_outdata: 

475 no_venues_filepath = os.path.join(output_dir, f'no_venues_{counter}.csv') 

476 chunk_no_venues[no_venues_filepath] = no_venues_outdata 

477 for chunk in [chunk_venues, chunk_no_venues]: 

478 for filepath, dump in chunk.items(): 

479 all_data = get_csv_data(filepath) if os.path.exists(filepath) else list() 

480 all_data.extend(dump) 

481 write_csv(filepath, all_data) 

482 del chunk 

483 del venues_occurrences 

484 

485def __split_in_chunks(output_dir:str, chunk_size:int, verbose:bool): 

486 files = os.listdir(output_dir) 

487 if verbose: 

488 print('[INFO:prepare_multiprocess] Splitting CSVs in chunks') 

489 pbar = tqdm(total=len(files)) 

490 even_chunk = list() 

491 counter = 0 

492 for file in files: 

493 filepath = os.path.join(output_dir, file) 

494 data = get_csv_data(filepath) 

495 len_data = len(data) 

496 if len_data > chunk_size: 

497 while len_data > chunk_size: 

498 write_csv(os.path.join(output_dir, f'{counter}.csv'), data[:chunk_size]) 

499 counter += 1 

500 del data[:chunk_size] 

501 len_data = len(data) 

502 even_chunk.extend(data) 

503 if len(even_chunk) >= chunk_size: 

504 write_csv(os.path.join(output_dir, f'{counter}.csv'), even_chunk) 

505 counter += 1 

506 even_chunk = list() 

507 elif len_data <= chunk_size: 

508 even_chunk.extend(data) 

509 if len(even_chunk) >= chunk_size: 

510 write_csv(os.path.join(output_dir, f'{counter}.csv'), even_chunk) 

511 counter += 1 

512 even_chunk = list() 

513 os.remove(filepath) 

514 pbar.update() if verbose else None 

515 pbar.close() if verbose else None 

516 if even_chunk: 

517 write_csv(os.path.join(output_dir, f'{counter}.csv'), even_chunk) 

518 

519def __dump_if_chunk_size(chunk:dict, existing_files:set, pid:psutil.Process) -> dict: 

520 memory_used = pid.memory_info().rss / (1024.0 ** 3) 

521 if memory_used > 10: 

522 for filepath, dump in chunk.items(): 

523 all_data = get_csv_data(filepath) if os.path.exists(filepath) else list() 

524 all_data.extend(dump) 

525 write_csv(filepath, all_data) 

526 existing_files.add(filepath) 

527 return dict() 

528 return chunk