Coverage for oc_meta/plugins/csv_generator_lite/csv_generator_lite.py: 34%
400 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-12-20 08:55 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2024 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from __future__ import annotations
19import concurrent.futures
20import csv
21import json
22import multiprocessing
23import os
24import re
25from typing import Dict, List, Optional
26from zipfile import ZipFile
28import redis
29from pebble import ProcessPool
30from tqdm import tqdm
32# Increase CSV field size limit to handle large fields
33csv.field_size_limit(2**31 - 1) # Set to max value for 32-bit systems
35# Constants
36FIELDNAMES = [
37 "id",
38 "title",
39 "author",
40 "issue",
41 "volume",
42 "venue",
43 "page",
44 "pub_date",
45 "type",
46 "publisher",
47 "editor",
48]
50URI_TYPE_DICT = {
51 "http://purl.org/spar/doco/Abstract": "abstract",
52 "http://purl.org/spar/fabio/ArchivalDocument": "archival document",
53 "http://purl.org/spar/fabio/AudioDocument": "audio document",
54 "http://purl.org/spar/fabio/Book": "book",
55 "http://purl.org/spar/fabio/BookChapter": "book chapter",
56 "http://purl.org/spar/fabio/ExpressionCollection": "book section",
57 "http://purl.org/spar/fabio/BookSeries": "book series",
58 "http://purl.org/spar/fabio/BookSet": "book set",
59 "http://purl.org/spar/fabio/ComputerProgram": "computer program",
60 "http://purl.org/spar/doco/Part": "book part",
61 "http://purl.org/spar/fabio/Expression": "",
62 "http://purl.org/spar/fabio/DataFile": "dataset",
63 "http://purl.org/spar/fabio/DataManagementPlan": "data management plan",
64 "http://purl.org/spar/fabio/Thesis": "dissertation",
65 "http://purl.org/spar/fabio/Editorial": "editorial",
66 "http://purl.org/spar/fabio/Journal": "journal",
67 "http://purl.org/spar/fabio/JournalArticle": "journal article",
68 "http://purl.org/spar/fabio/JournalEditorial": "journal editorial",
69 "http://purl.org/spar/fabio/JournalIssue": "journal issue",
70 "http://purl.org/spar/fabio/JournalVolume": "journal volume",
71 "http://purl.org/spar/fabio/Newspaper": "newspaper",
72 "http://purl.org/spar/fabio/NewspaperArticle": "newspaper article",
73 "http://purl.org/spar/fabio/NewspaperIssue": "newspaper issue",
74 "http://purl.org/spar/fr/ReviewVersion": "peer review",
75 "http://purl.org/spar/fabio/AcademicProceedings": "proceedings",
76 "http://purl.org/spar/fabio/Preprint": "preprint",
77 "http://purl.org/spar/fabio/Presentation": "presentation",
78 "http://purl.org/spar/fabio/ProceedingsPaper": "proceedings article",
79 "http://purl.org/spar/fabio/ReferenceBook": "reference book",
80 "http://purl.org/spar/fabio/ReferenceEntry": "reference entry",
81 "http://purl.org/spar/fabio/ReportDocument": "report",
82 "http://purl.org/spar/fabio/RetractionNotice": "retraction notice",
83 "http://purl.org/spar/fabio/Series": "series",
84 "http://purl.org/spar/fabio/SpecificationDocument": "standard",
85 "http://purl.org/spar/fabio/WebContent": "web content",
86}
88# Global cache for JSON files
89_json_cache = {}
92def init_redis_connection(
93 host: str = "localhost", port: int = 6379, db: int = 2
94) -> redis.Redis:
95 """Initialize Redis connection with given parameters"""
96 client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
97 # Test the connection by sending a PING command
98 client.ping() # This will raise redis.ConnectionError if connection fails
99 return client
102def increase_csv_field_limit() -> int:
103 """Incrementally increase the CSV field size limit until it works.
104 Returns the successful limit value."""
105 current_limit = csv.field_size_limit()
106 while True:
107 try:
108 csv.field_size_limit(current_limit)
109 return current_limit
110 except OverflowError:
111 # If we get an overflow, try a smaller value
112 if current_limit >= sys.maxsize:
113 current_limit = int(sys.maxsize / 2)
114 else:
115 break
116 except Exception:
117 break
119 # Try next value
120 current_limit *= 2
122 # If we get here, we need to try increasing from the default
123 current_limit = csv.field_size_limit()
124 while True:
125 try:
126 next_limit = current_limit * 2
127 csv.field_size_limit(next_limit)
128 current_limit = next_limit
129 return current_limit
130 except Exception as e:
131 # If we can't increase further, return the last working value
132 csv.field_size_limit(current_limit)
133 return current_limit
136def load_processed_omids_to_redis(output_dir: str, redis_client: redis.Redis) -> int:
137 """Load all previously processed OMID from existing CSV files into Redis.
138 This is done only once at startup to create the initial cache."""
139 if not os.path.exists(output_dir):
140 return 0
142 # Clear any existing data in Redis
143 redis_client.delete("processed_omids")
145 count = 0
146 BATCH_SIZE = 1000 # Process commands in batches of 1000
148 # Get list of CSV files first
149 csv_files = [f for f in os.listdir(output_dir) if f.endswith(".csv")]
151 # Read all CSV files and collect OMIDs with progress bar
152 for filename in tqdm(csv_files, desc="Loading existing identifiers"):
153 filepath = os.path.join(output_dir, filename)
154 with open(filepath, "r", encoding="utf-8") as f:
155 reader = csv.DictReader(f)
156 batch_pipe = redis_client.pipeline()
157 batch_count = 0
159 for row in reader:
160 # Extract all OMIDs from the id field
161 omids = [
162 id_part.strip()
163 for id_part in row["id"].split()
164 if id_part.startswith("omid:br/")
165 ]
166 # Add to Redis pipeline
167 for omid in omids:
168 batch_pipe.sadd("processed_omids", omid)
169 batch_count += 1
170 count += 1
172 # Execute batch if we've reached the batch size
173 if batch_count >= BATCH_SIZE:
174 batch_pipe.execute()
175 batch_pipe = redis_client.pipeline()
176 batch_count = 0
178 # Execute any remaining commands in the pipeline
179 if batch_count > 0:
180 batch_pipe.execute()
182 return count
185def is_omid_processed(omid: str, redis_client: redis.Redis) -> bool:
186 """Check if an OMID has been processed by looking it up in Redis"""
187 return redis_client.sismember("processed_omids", omid)
190def find_file(
191 rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str
192) -> Optional[str]:
193 """Find the file path for a given URI based on the directory structure"""
194 entity_regex: str = (
195 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$"
196 )
197 entity_match = re.match(entity_regex, uri)
198 if entity_match:
199 cur_number = int(entity_match.group(4))
200 cur_file_split = (
201 (cur_number - 1) // items_per_file
202 ) * items_per_file + items_per_file
203 cur_split = (
204 (cur_number - 1) // dir_split_number
205 ) * dir_split_number + dir_split_number
207 short_name = entity_match.group(2)
208 sub_folder = entity_match.group(3) or ""
209 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split))
210 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + ".zip"
212 return cur_file_path if os.path.exists(cur_file_path) else None
213 return None
216def load_json_from_file(filepath: str) -> dict:
217 """Load JSON data from a ZIP file, using cache if available"""
218 if filepath in _json_cache:
219 return _json_cache[filepath]
221 try:
222 with ZipFile(filepath, "r") as zip_file:
223 json_filename = zip_file.namelist()[0]
224 with zip_file.open(json_filename) as json_file:
225 json_content = json_file.read().decode("utf-8")
226 json_data = json.loads(json_content)
227 _json_cache[filepath] = json_data
228 return json_data
229 except Exception as e:
230 print(f"Error loading file {filepath}: {e}")
231 return {}
234def process_identifier(id_data: dict) -> Optional[str]:
235 """Process identifier data to extract schema and value"""
236 try:
237 id_schema = id_data["http://purl.org/spar/datacite/usesIdentifierScheme"][0][
238 "@id"
239 ].split("/datacite/")[1]
240 literal_value = id_data[
241 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue"
242 ][0]["@value"]
243 return f"{id_schema}:{literal_value}"
244 except (KeyError, IndexError):
245 return None
248def process_responsible_agent(
249 ra_data: dict, ra_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int
250) -> Optional[str]:
251 """Process responsible agent data to extract name and identifiers"""
252 try:
253 # Process name
254 family_name = ra_data.get("http://xmlns.com/foaf/0.1/familyName", [{}])[0].get(
255 "@value", ""
256 )
257 given_name = ra_data.get("http://xmlns.com/foaf/0.1/givenName", [{}])[0].get(
258 "@value", ""
259 )
260 foaf_name = ra_data.get("http://xmlns.com/foaf/0.1/name", [{}])[0].get(
261 "@value", ""
262 )
264 # Determine name format
265 if family_name or given_name:
266 if family_name and given_name:
267 name = f"{family_name}, {given_name}"
268 elif family_name:
269 name = f"{family_name},"
270 else:
271 name = f", {given_name}"
272 elif foaf_name:
273 name = foaf_name
274 else:
275 return None
277 # Get OMID
278 omid = ra_uri.split("/")[-1]
279 identifiers = [f"omid:ra/{omid}"]
281 # Process other identifiers
282 if "http://purl.org/spar/datacite/hasIdentifier" in ra_data:
283 for identifier in ra_data["http://purl.org/spar/datacite/hasIdentifier"]:
284 id_uri = identifier["@id"]
285 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
286 if id_file:
287 id_data = load_json_from_file(id_file)
288 for graph in id_data:
289 for entity in graph.get("@graph", []):
290 if entity["@id"] == id_uri:
291 id_value = process_identifier(entity)
292 if id_value:
293 identifiers.append(id_value)
295 if identifiers:
296 return f"{name} [{' '.join(identifiers)}]"
297 return name
298 except (KeyError, IndexError):
299 return None
302def process_venue_title(
303 venue_data: dict,
304 venue_uri: str,
305 rdf_dir: str,
306 dir_split_number: int,
307 items_per_file: int,
308) -> str:
309 """Process venue title and its identifiers"""
310 venue_title = venue_data.get("http://purl.org/dc/terms/title", [{}])[0].get(
311 "@value", ""
312 )
313 if not venue_title:
314 return ""
316 # Get OMID
317 omid = venue_uri.split("/")[-1]
318 identifiers = [f"omid:br/{omid}"]
320 # Process other identifiers
321 if "http://purl.org/spar/datacite/hasIdentifier" in venue_data:
322 for identifier in venue_data["http://purl.org/spar/datacite/hasIdentifier"]:
323 id_uri = identifier["@id"]
324 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
325 if id_file:
326 id_data = load_json_from_file(id_file)
327 for graph in id_data:
328 for entity in graph.get("@graph", []):
329 if entity["@id"] == id_uri:
330 id_value = process_identifier(entity)
331 if id_value:
332 identifiers.append(id_value)
334 return f"{venue_title} [{' '.join(identifiers)}]" if identifiers else venue_title
337def process_hierarchical_venue(
338 entity: dict, rdf_dir: str, dir_split_number: int, items_per_file: int
339) -> Dict[str, str]:
340 """Process hierarchical venue structure recursively"""
341 result = {"volume": "", "issue": "", "venue": ""}
342 entity_types = entity.get("@type", [])
344 if "http://purl.org/spar/fabio/JournalIssue" in entity_types:
345 result["issue"] = entity.get(
346 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}]
347 )[0].get("@value", "")
348 elif "http://purl.org/spar/fabio/JournalVolume" in entity_types:
349 result["volume"] = entity.get(
350 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}]
351 )[0].get("@value", "")
352 else:
353 # It's a main venue (journal, book series, etc.)
354 result["venue"] = process_venue_title(
355 entity, entity["@id"], rdf_dir, dir_split_number, items_per_file
356 )
357 return result
359 # Process parent if exists
360 if "http://purl.org/vocab/frbr/core#partOf" in entity:
361 parent_uri = entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"]
362 parent_file = find_file(rdf_dir, dir_split_number, items_per_file, parent_uri)
363 if parent_file:
364 parent_data = load_json_from_file(parent_file)
365 for graph in parent_data:
366 for parent_entity in graph.get("@graph", []):
367 if parent_entity["@id"] == parent_uri:
368 parent_info = process_hierarchical_venue(
369 parent_entity, rdf_dir, dir_split_number, items_per_file
370 )
371 # Merge parent info, keeping our current values
372 for key, value in parent_info.items():
373 if not result[key]: # Only update if we don't have a value
374 result[key] = value
376 return result
379def find_first_ar_by_role(
380 agent_roles: Dict, next_relations: Dict, role_type: str
381) -> Optional[str]:
382 """Find the first AR for a specific role type that isn't referenced by any other AR of the same role"""
383 # Get all ARs of this role type
384 role_ars = {
385 ar_uri: ar_data
386 for ar_uri, ar_data in agent_roles.items()
387 if role_type
388 in ar_data.get("http://purl.org/spar/pro/withRole", [{}])[0].get("@id", "")
389 }
391 # Get all "next" relations between ARs of this role type
392 role_next_relations = {
393 ar_uri: next_ar
394 for ar_uri, next_ar in next_relations.items()
395 if ar_uri in role_ars and next_ar in role_ars
396 }
398 # Find the AR that isn't referenced as next by any other AR of this role
399 referenced_ars = set(role_next_relations.values())
400 for ar_uri in role_ars:
401 if ar_uri not in referenced_ars:
402 return ar_uri
404 # If no first AR found, take the first one from the role ARs
405 return next(iter(role_ars)) if role_ars else None
408def process_bibliographic_resource(
409 br_data: dict, rdf_dir: str, dir_split_number: int, items_per_file: int
410) -> Optional[Dict[str, str]]:
411 """Process bibliographic resource data and its related entities"""
412 # Skip if the entity is a JournalVolume or JournalIssue
413 br_types = br_data.get("@type", [])
414 if (
415 "http://purl.org/spar/fabio/JournalVolume" in br_types
416 or "http://purl.org/spar/fabio/JournalIssue" in br_types
417 ):
418 return None
420 output = {field: "" for field in FIELDNAMES}
422 try:
423 # Extract OMID and basic BR information
424 entity_id = br_data.get("@id", "")
425 identifiers = [f'omid:br/{entity_id.split("/")[-1]}'] if entity_id else []
427 output["title"] = br_data.get("http://purl.org/dc/terms/title", [{}])[0].get(
428 "@value", ""
429 )
430 output["pub_date"] = br_data.get(
431 "http://prismstandard.org/namespaces/basic/2.0/publicationDate", [{}]
432 )[0].get("@value", "")
434 # Extract type
435 br_types = [
436 t
437 for t in br_data.get("@type", [])
438 if t != "http://purl.org/spar/fabio/Expression"
439 ]
440 output["type"] = URI_TYPE_DICT.get(br_types[0], "") if br_types else ""
442 # Process identifiers
443 if "http://purl.org/spar/datacite/hasIdentifier" in br_data:
444 for identifier in br_data["http://purl.org/spar/datacite/hasIdentifier"]:
445 id_uri = identifier["@id"]
446 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
447 if id_file:
448 id_data = load_json_from_file(id_file)
449 for graph in id_data:
450 for entity in graph.get("@graph", []):
451 if entity["@id"] == id_uri:
452 id_value = process_identifier(entity)
453 if id_value:
454 identifiers.append(id_value)
455 output["id"] = " ".join(identifiers)
457 # Process authors, editors and publishers through agent roles
458 authors = []
459 editors = []
460 publishers = []
462 # Create a dictionary to store agent roles and their next relations
463 agent_roles = {}
464 next_relations = {}
466 if "http://purl.org/spar/pro/isDocumentContextFor" in br_data:
467 # First pass: collect all agent roles and their next relations
468 for ar_data in br_data["http://purl.org/spar/pro/isDocumentContextFor"]:
469 ar_uri = ar_data["@id"]
470 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri)
471 if ar_file:
472 ar_data = load_json_from_file(ar_file)
473 for graph in ar_data:
474 for entity in graph.get("@graph", []):
475 if entity["@id"] == ar_uri:
476 # Store the agent role data
477 agent_roles[ar_uri] = entity
478 # Store the next relation if it exists
479 if "https://w3id.org/oc/ontology/hasNext" in entity:
480 next_ar = entity[
481 "https://w3id.org/oc/ontology/hasNext"
482 ][0]["@id"]
483 next_relations[ar_uri] = next_ar
485 # Process each role type separately
486 for role_type, role_list in [
487 ("author", authors),
488 ("editor", editors),
489 ("publisher", publishers),
490 ]:
491 first_ar = find_first_ar_by_role(agent_roles, next_relations, role_type)
492 if not first_ar:
493 continue
495 # Process agent roles in order for this role type
496 current_ar = first_ar
497 processed_ars = set()
498 max_iterations = len(agent_roles)
499 iterations = 0
501 while current_ar and current_ar in agent_roles:
502 if current_ar in processed_ars or iterations >= max_iterations:
503 print(
504 f"Warning: Detected cycle in hasNext relations or exceeded maximum iterations at AR: {current_ar}"
505 )
506 break
508 processed_ars.add(current_ar)
509 iterations += 1
511 # Process the current AR
512 entity = agent_roles[current_ar]
513 role = entity.get("http://purl.org/spar/pro/withRole", [{}])[0].get(
514 "@id", ""
515 )
517 # Only process if it matches our current role type
518 if role_type in role:
519 if "http://purl.org/spar/pro/isHeldBy" in entity:
520 ra_uri = entity["http://purl.org/spar/pro/isHeldBy"][0][
521 "@id"
522 ]
523 ra_file = find_file(
524 rdf_dir, dir_split_number, items_per_file, ra_uri
525 )
526 if ra_file:
527 ra_data = load_json_from_file(ra_file)
528 for ra_graph in ra_data:
529 for ra_entity in ra_graph.get("@graph", []):
530 if ra_entity["@id"] == ra_uri:
531 agent_name = process_responsible_agent(
532 ra_entity,
533 ra_uri,
534 rdf_dir,
535 dir_split_number,
536 items_per_file,
537 )
538 if agent_name:
539 role_list.append(agent_name)
541 # Move to next agent role
542 current_ar = next_relations.get(current_ar)
544 output["author"] = "; ".join(authors)
545 output["editor"] = "; ".join(editors)
546 output["publisher"] = "; ".join(publishers)
548 # Process venue information
549 if "http://purl.org/vocab/frbr/core#partOf" in br_data:
550 venue_uri = br_data["http://purl.org/vocab/frbr/core#partOf"][0]["@id"]
551 venue_file = find_file(rdf_dir, dir_split_number, items_per_file, venue_uri)
552 if venue_file:
553 venue_data = load_json_from_file(venue_file)
554 for graph in venue_data:
555 for entity in graph.get("@graph", []):
556 if entity["@id"] == venue_uri:
557 venue_info = process_hierarchical_venue(
558 entity, rdf_dir, dir_split_number, items_per_file
559 )
560 output.update(venue_info)
562 # Process page information
563 if "http://purl.org/vocab/frbr/core#embodiment" in br_data:
564 page_uri = br_data["http://purl.org/vocab/frbr/core#embodiment"][0]["@id"]
565 page_file = find_file(rdf_dir, dir_split_number, items_per_file, page_uri)
566 if page_file:
567 page_data = load_json_from_file(page_file)
568 for graph in page_data:
569 for entity in graph.get("@graph", []):
570 if entity["@id"] == page_uri:
571 start_page = entity.get(
572 "http://prismstandard.org/namespaces/basic/2.0/startingPage",
573 [{}],
574 )[0].get("@value", "")
575 end_page = entity.get(
576 "http://prismstandard.org/namespaces/basic/2.0/endingPage",
577 [{}],
578 )[0].get("@value", "")
579 if start_page or end_page:
580 output["page"] = f"{start_page}-{end_page}"
582 except (KeyError, IndexError) as e:
583 print(f"Error processing bibliographic resource: {e}")
585 return output
588def process_single_file(args):
589 """Process a single file and return the results"""
590 filepath, input_dir, dir_split_number, items_per_file, redis_params = args
591 results = []
593 # Initialize Redis client once
594 redis_client = redis.Redis(
595 host=redis_params["host"],
596 port=redis_params["port"],
597 db=redis_params["db"],
598 decode_responses=True,
599 )
601 data = load_json_from_file(filepath)
602 for graph in data:
603 for entity in graph.get("@graph", []):
604 # Skip if this is a JournalVolume or JournalIssue
605 entity_types = entity.get("@type", [])
606 if (
607 "http://purl.org/spar/fabio/JournalVolume" in entity_types
608 or "http://purl.org/spar/fabio/JournalIssue" in entity_types
609 ):
610 continue
612 # Extract OMID from entity ID
613 entity_id = entity.get("@id", "")
614 if entity_id:
615 omid = f"omid:br/{entity_id.split('/')[-1]}"
616 # Skip if already processed
617 if is_omid_processed(omid, redis_client):
618 continue
620 br_data = process_bibliographic_resource(
621 entity, input_dir, dir_split_number, items_per_file
622 )
623 if br_data:
624 results.append(br_data)
626 return results
629class ResultBuffer:
630 """Buffer to collect results and write them when reaching the row limit"""
632 def __init__(self, output_dir: str, max_rows: int = 3000):
633 self.buffer = []
634 self.output_dir = output_dir
635 self.max_rows = max_rows
636 self.file_counter = self._get_last_file_number() + 1
637 self.pbar = None
639 def _get_last_file_number(self) -> int:
640 """Find the highest file number from existing output files"""
641 if not os.path.exists(self.output_dir):
642 return -1
644 max_number = -1
645 for filename in os.listdir(self.output_dir):
646 if filename.startswith("output_") and filename.endswith(".csv"):
647 try:
648 number = int(filename[7:-4]) # Extract number from 'output_X.csv'
649 max_number = max(max_number, number)
650 except ValueError:
651 continue
652 return max_number
654 def set_progress_bar(self, total: int) -> None:
655 """Initialize progress bar with total number of files"""
656 self.pbar = tqdm(total=total, desc="Processing files")
658 def update_progress(self) -> None:
659 """Update progress bar"""
660 if self.pbar:
661 self.pbar.update(1)
663 def close_progress_bar(self) -> None:
664 """Close progress bar"""
665 if self.pbar:
666 self.pbar.close()
668 def add_results(self, results: List[Dict[str, str]]) -> None:
669 """Add results to buffer and write to file if max_rows is reached"""
670 self.buffer.extend(results)
671 while len(self.buffer) >= self.max_rows:
672 self._write_buffer_chunk()
674 def _write_buffer_chunk(self) -> None:
675 """Write max_rows records to a new file"""
676 chunk = self.buffer[: self.max_rows]
677 output_file = os.path.join(self.output_dir, f"output_{self.file_counter}.csv")
678 write_csv(output_file, chunk)
679 self.buffer = self.buffer[self.max_rows :]
680 self.file_counter += 1
682 def flush(self) -> None:
683 """Write any remaining results in buffer"""
684 if self.buffer:
685 output_file = os.path.join(
686 self.output_dir, f"output_{self.file_counter}.csv"
687 )
688 write_csv(output_file, self.buffer)
689 self.buffer = []
690 self.file_counter += 1
693def task_done(future: concurrent.futures.Future, result_buffer: ResultBuffer):
694 """Callback function for completed tasks"""
695 try:
696 results = future.result()
697 if results:
698 result_buffer.add_results(results)
699 result_buffer.update_progress() # Update progress after task completion
700 except Exception as e:
701 print(f"Task failed: {e}")
702 result_buffer.update_progress() # Update progress even if task fails
705def generate_csv(
706 input_dir: str,
707 output_dir: str,
708 dir_split_number: int,
709 items_per_file: int,
710 zip_output_rdf: bool,
711 redis_host: str = "localhost",
712 redis_port: int = 6379,
713 redis_db: int = 2,
714) -> None:
715 """Generate CSV files from RDF data using Pebble for process management"""
716 if not os.path.exists(output_dir):
717 os.makedirs(output_dir)
719 # Initialize Redis connection and load initial cache
720 redis_client = init_redis_connection(redis_host, redis_port, redis_db)
721 processed_count = load_processed_omids_to_redis(output_dir, redis_client)
723 # Process only files in the 'br' directory
724 br_dir = os.path.join(input_dir, "br")
725 if not os.path.exists(br_dir):
726 print(f"Error: bibliographic resources directory not found at {br_dir}")
727 return
729 # Collect all ZIP files
730 all_files = []
731 for root, _, files in os.walk(br_dir):
732 if "prov" in root:
733 continue
734 all_files.extend(os.path.join(root, f) for f in files if f.endswith(".zip"))
736 if not all_files:
737 print("No files found to process")
738 return
740 print(f"Processing {len(all_files)} files...")
742 # Redis connection parameters to pass to worker processes
743 redis_params = {"host": redis_host, "port": redis_port, "db": redis_db}
745 # Create result buffer and initialize progress bar
746 result_buffer = ResultBuffer(output_dir)
747 result_buffer.set_progress_bar(len(all_files))
749 # Process files one at a time using Pebble
750 with ProcessPool(max_workers=os.cpu_count(), max_tasks=1, context=multiprocessing.get_context('spawn')) as executor:
751 futures: List[concurrent.futures.Future] = []
752 for filepath in all_files:
753 future = executor.schedule(
754 function=process_single_file,
755 args=(
756 (
757 filepath,
758 input_dir,
759 dir_split_number,
760 items_per_file,
761 redis_params,
762 ),
763 ),
764 )
765 future.add_done_callback(lambda f: task_done(f, result_buffer))
766 futures.append(future)
768 # Wait for all futures to complete
769 for future in futures:
770 try:
771 future.result()
772 except Exception as e:
773 print(f"Error processing file: {e}")
774 return # Exit without clearing Redis if there's an error
776 # Flush any remaining results and close progress bar
777 result_buffer.flush()
778 result_buffer.close_progress_bar()
780 # Clear Redis cache only if we've successfully completed everything
781 redis_client.delete("processed_omids")
782 print("Processing complete. Redis cache cleared.")
785def write_csv(filepath: str, data: List[Dict[str, str]]) -> None:
786 """Write data to CSV file"""
787 with open(filepath, "w", newline="", encoding="utf-8") as f:
788 writer = csv.DictWriter(f, fieldnames=FIELDNAMES)
789 writer.writeheader()
790 writer.writerows(data)