Coverage for oc_meta/plugins/csv_generator_lite/csv_generator_lite.py: 35%
401 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-07-14 14:06 +0000
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# Copyright (c) 2024 Arcangelo Massari <arcangelo.massari@unibo.it>
4#
5# Permission to use, copy, modify, and/or distribute this software for any purpose
6# with or without fee is hereby granted, provided that the above copyright notice
7# and this permission notice appear in all copies.
8#
9# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
11# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
12# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE,
13# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
14# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
15# SOFTWARE.
17from __future__ import annotations
19import concurrent.futures
20import csv
21import json
22import os
23import re
24from typing import Dict, List, Optional
25from zipfile import ZipFile
27import redis
28from pebble import ProcessPool
29from tqdm import tqdm
31# Increase CSV field size limit to handle large fields
32csv.field_size_limit(2**31 - 1) # Set to max value for 32-bit systems
34# Constants
35FIELDNAMES = [
36 "id",
37 "title",
38 "author",
39 "issue",
40 "volume",
41 "venue",
42 "page",
43 "pub_date",
44 "type",
45 "publisher",
46 "editor",
47]
49URI_TYPE_DICT = {
50 "http://purl.org/spar/doco/Abstract": "abstract",
51 "http://purl.org/spar/fabio/ArchivalDocument": "archival document",
52 "http://purl.org/spar/fabio/AudioDocument": "audio document",
53 "http://purl.org/spar/fabio/Book": "book",
54 "http://purl.org/spar/fabio/BookChapter": "book chapter",
55 "http://purl.org/spar/fabio/ExpressionCollection": "book section",
56 "http://purl.org/spar/fabio/BookSeries": "book series",
57 "http://purl.org/spar/fabio/BookSet": "book set",
58 "http://purl.org/spar/fabio/ComputerProgram": "computer program",
59 "http://purl.org/spar/doco/Part": "book part",
60 "http://purl.org/spar/fabio/Expression": "",
61 "http://purl.org/spar/fabio/DataFile": "dataset",
62 "http://purl.org/spar/fabio/DataManagementPlan": "data management plan",
63 "http://purl.org/spar/fabio/Thesis": "dissertation",
64 "http://purl.org/spar/fabio/Editorial": "editorial",
65 "http://purl.org/spar/fabio/Journal": "journal",
66 "http://purl.org/spar/fabio/JournalArticle": "journal article",
67 "http://purl.org/spar/fabio/JournalEditorial": "journal editorial",
68 "http://purl.org/spar/fabio/JournalIssue": "journal issue",
69 "http://purl.org/spar/fabio/JournalVolume": "journal volume",
70 "http://purl.org/spar/fabio/Newspaper": "newspaper",
71 "http://purl.org/spar/fabio/NewspaperArticle": "newspaper article",
72 "http://purl.org/spar/fabio/NewspaperIssue": "newspaper issue",
73 "http://purl.org/spar/fr/ReviewVersion": "peer review",
74 "http://purl.org/spar/fabio/AcademicProceedings": "proceedings",
75 "http://purl.org/spar/fabio/Preprint": "preprint",
76 "http://purl.org/spar/fabio/Presentation": "presentation",
77 "http://purl.org/spar/fabio/ProceedingsPaper": "proceedings article",
78 "http://purl.org/spar/fabio/ReferenceBook": "reference book",
79 "http://purl.org/spar/fabio/ReferenceEntry": "reference entry",
80 "http://purl.org/spar/fabio/ReportDocument": "report",
81 "http://purl.org/spar/fabio/RetractionNotice": "retraction notice",
82 "http://purl.org/spar/fabio/Series": "series",
83 "http://purl.org/spar/fabio/SpecificationDocument": "standard",
84 "http://purl.org/spar/fabio/WebContent": "web content",
85}
87# Global cache for JSON files
88_json_cache = {}
91def init_redis_connection(
92 host: str = "localhost", port: int = 6379, db: int = 2
93) -> redis.Redis:
94 """Initialize Redis connection with given parameters"""
95 client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
96 # Test the connection by sending a PING command
97 client.ping() # This will raise redis.ConnectionError if connection fails
98 return client
101def increase_csv_field_limit() -> int:
102 """Incrementally increase the CSV field size limit until it works.
103 Returns the successful limit value."""
104 current_limit = csv.field_size_limit()
105 while True:
106 try:
107 csv.field_size_limit(current_limit)
108 return current_limit
109 except OverflowError:
110 # If we get an overflow, try a smaller value
111 if current_limit >= sys.maxsize:
112 current_limit = int(sys.maxsize / 2)
113 else:
114 break
115 except Exception:
116 break
118 # Try next value
119 current_limit *= 2
121 # If we get here, we need to try increasing from the default
122 current_limit = csv.field_size_limit()
123 while True:
124 try:
125 next_limit = current_limit * 2
126 csv.field_size_limit(next_limit)
127 current_limit = next_limit
128 return current_limit
129 except Exception as e:
130 # If we can't increase further, return the last working value
131 csv.field_size_limit(current_limit)
132 return current_limit
135def load_processed_omids_to_redis(output_dir: str, redis_client: redis.Redis) -> int:
136 """Load all previously processed OMID from existing CSV files into Redis.
137 This is done only once at startup to create the initial cache."""
138 if not os.path.exists(output_dir):
139 return 0
141 # Clear any existing data in Redis
142 redis_client.delete("processed_omids")
144 count = 0
145 BATCH_SIZE = 1000 # Process commands in batches of 1000
147 # Get list of CSV files first
148 csv_files = [f for f in os.listdir(output_dir) if f.endswith(".csv")]
149 print(f"Loading {len(csv_files)} CSV files into Redis cache...")
151 # Read all CSV files and collect OMIDs with progress bar
152 for filename in tqdm(csv_files, desc="Loading existing identifiers"):
153 filepath = os.path.join(output_dir, filename)
154 with open(filepath, "r", encoding="utf-8") as f:
155 reader = csv.DictReader(f)
156 batch_pipe = redis_client.pipeline()
157 batch_count = 0
159 for row in reader:
160 # Extract all OMIDs from the id field
161 omids = [
162 id_part.strip()
163 for id_part in row["id"].split()
164 if id_part.startswith("omid:br/")
165 ]
166 # Add to Redis pipeline
167 for omid in omids:
168 batch_pipe.sadd("processed_omids", omid)
169 batch_count += 1
170 count += 1
172 # Execute batch if we've reached the batch size
173 if batch_count >= BATCH_SIZE:
174 batch_pipe.execute()
175 batch_pipe = redis_client.pipeline()
176 batch_count = 0
178 # Execute any remaining commands in the pipeline
179 if batch_count > 0:
180 batch_pipe.execute()
182 print(f"Loaded {count} identifiers into Redis cache")
183 return count
186def is_omid_processed(omid: str, redis_client: redis.Redis) -> bool:
187 """Check if an OMID has been processed by looking it up in Redis"""
188 return redis_client.sismember("processed_omids", omid)
191def find_file(
192 rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str
193) -> Optional[str]:
194 """Find the file path for a given URI based on the directory structure"""
195 entity_regex: str = (
196 r"^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$"
197 )
198 entity_match = re.match(entity_regex, uri)
199 if entity_match:
200 cur_number = int(entity_match.group(4))
201 cur_file_split = (
202 (cur_number - 1) // items_per_file
203 ) * items_per_file + items_per_file
204 cur_split = (
205 (cur_number - 1) // dir_split_number
206 ) * dir_split_number + dir_split_number
208 short_name = entity_match.group(2)
209 sub_folder = entity_match.group(3) or ""
210 cur_dir_path = os.path.join(rdf_dir, short_name, sub_folder, str(cur_split))
211 cur_file_path = os.path.join(cur_dir_path, str(cur_file_split)) + ".zip"
213 return cur_file_path if os.path.exists(cur_file_path) else None
214 return None
217def load_json_from_file(filepath: str) -> dict:
218 """Load JSON data from a ZIP file, using cache if available"""
219 if filepath in _json_cache:
220 return _json_cache[filepath]
222 try:
223 with ZipFile(filepath, "r") as zip_file:
224 json_filename = zip_file.namelist()[0]
225 with zip_file.open(json_filename) as json_file:
226 json_content = json_file.read().decode("utf-8")
227 json_data = json.loads(json_content)
228 _json_cache[filepath] = json_data
229 return json_data
230 except Exception as e:
231 print(f"Error loading file {filepath}: {e}")
232 return {}
235def process_identifier(id_data: dict) -> Optional[str]:
236 """Process identifier data to extract schema and value"""
237 try:
238 id_schema = id_data["http://purl.org/spar/datacite/usesIdentifierScheme"][0][
239 "@id"
240 ].split("/datacite/")[1]
241 literal_value = id_data[
242 "http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue"
243 ][0]["@value"]
244 return f"{id_schema}:{literal_value}"
245 except (KeyError, IndexError):
246 return None
249def process_responsible_agent(
250 ra_data: dict, ra_uri: str, rdf_dir: str, dir_split_number: int, items_per_file: int
251) -> Optional[str]:
252 """Process responsible agent data to extract name and identifiers"""
253 try:
254 # Process name
255 family_name = ra_data.get("http://xmlns.com/foaf/0.1/familyName", [{}])[0].get(
256 "@value", ""
257 )
258 given_name = ra_data.get("http://xmlns.com/foaf/0.1/givenName", [{}])[0].get(
259 "@value", ""
260 )
261 foaf_name = ra_data.get("http://xmlns.com/foaf/0.1/name", [{}])[0].get(
262 "@value", ""
263 )
265 # Determine name format
266 if family_name or given_name:
267 if family_name and given_name:
268 name = f"{family_name}, {given_name}"
269 elif family_name:
270 name = f"{family_name},"
271 else:
272 name = f", {given_name}"
273 elif foaf_name:
274 name = foaf_name
275 else:
276 return None
278 # Get OMID
279 omid = ra_uri.split("/")[-1]
280 identifiers = [f"omid:ra/{omid}"]
282 # Process other identifiers
283 if "http://purl.org/spar/datacite/hasIdentifier" in ra_data:
284 for identifier in ra_data["http://purl.org/spar/datacite/hasIdentifier"]:
285 id_uri = identifier["@id"]
286 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
287 if id_file:
288 id_data = load_json_from_file(id_file)
289 for graph in id_data:
290 for entity in graph.get("@graph", []):
291 if entity["@id"] == id_uri:
292 id_value = process_identifier(entity)
293 if id_value:
294 identifiers.append(id_value)
296 if identifiers:
297 return f"{name} [{' '.join(identifiers)}]"
298 return name
299 except (KeyError, IndexError):
300 return None
303def process_venue_title(
304 venue_data: dict,
305 venue_uri: str,
306 rdf_dir: str,
307 dir_split_number: int,
308 items_per_file: int,
309) -> str:
310 """Process venue title and its identifiers"""
311 venue_title = venue_data.get("http://purl.org/dc/terms/title", [{}])[0].get(
312 "@value", ""
313 )
314 if not venue_title:
315 return ""
317 # Get OMID
318 omid = venue_uri.split("/")[-1]
319 identifiers = [f"omid:br/{omid}"]
321 # Process other identifiers
322 if "http://purl.org/spar/datacite/hasIdentifier" in venue_data:
323 for identifier in venue_data["http://purl.org/spar/datacite/hasIdentifier"]:
324 id_uri = identifier["@id"]
325 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
326 if id_file:
327 id_data = load_json_from_file(id_file)
328 for graph in id_data:
329 for entity in graph.get("@graph", []):
330 if entity["@id"] == id_uri:
331 id_value = process_identifier(entity)
332 if id_value:
333 identifiers.append(id_value)
335 return f"{venue_title} [{' '.join(identifiers)}]" if identifiers else venue_title
338def process_hierarchical_venue(
339 entity: dict, rdf_dir: str, dir_split_number: int, items_per_file: int
340) -> Dict[str, str]:
341 """Process hierarchical venue structure recursively"""
342 result = {"volume": "", "issue": "", "venue": ""}
343 entity_types = entity.get("@type", [])
345 if "http://purl.org/spar/fabio/JournalIssue" in entity_types:
346 result["issue"] = entity.get(
347 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}]
348 )[0].get("@value", "")
349 elif "http://purl.org/spar/fabio/JournalVolume" in entity_types:
350 result["volume"] = entity.get(
351 "http://purl.org/spar/fabio/hasSequenceIdentifier", [{}]
352 )[0].get("@value", "")
353 else:
354 # It's a main venue (journal, book series, etc.)
355 result["venue"] = process_venue_title(
356 entity, entity["@id"], rdf_dir, dir_split_number, items_per_file
357 )
358 return result
360 # Process parent if exists
361 if "http://purl.org/vocab/frbr/core#partOf" in entity:
362 parent_uri = entity["http://purl.org/vocab/frbr/core#partOf"][0]["@id"]
363 parent_file = find_file(rdf_dir, dir_split_number, items_per_file, parent_uri)
364 if parent_file:
365 parent_data = load_json_from_file(parent_file)
366 for graph in parent_data:
367 for parent_entity in graph.get("@graph", []):
368 if parent_entity["@id"] == parent_uri:
369 parent_info = process_hierarchical_venue(
370 parent_entity, rdf_dir, dir_split_number, items_per_file
371 )
372 # Merge parent info, keeping our current values
373 for key, value in parent_info.items():
374 if not result[key]: # Only update if we don't have a value
375 result[key] = value
377 return result
380def find_first_ar_by_role(
381 agent_roles: Dict, next_relations: Dict, role_type: str
382) -> Optional[str]:
383 """Find the first AR for a specific role type that isn't referenced by any other AR of the same role"""
384 # Get all ARs of this role type
385 role_ars = {
386 ar_uri: ar_data
387 for ar_uri, ar_data in agent_roles.items()
388 if role_type
389 in ar_data.get("http://purl.org/spar/pro/withRole", [{}])[0].get("@id", "")
390 }
392 # Get all "next" relations between ARs of this role type
393 role_next_relations = {
394 ar_uri: next_ar
395 for ar_uri, next_ar in next_relations.items()
396 if ar_uri in role_ars and next_ar in role_ars
397 }
399 # Find the AR that isn't referenced as next by any other AR of this role
400 referenced_ars = set(role_next_relations.values())
401 for ar_uri in role_ars:
402 if ar_uri not in referenced_ars:
403 return ar_uri
405 # If no first AR found, take the first one from the role ARs
406 return next(iter(role_ars)) if role_ars else None
409def process_bibliographic_resource(
410 br_data: dict, rdf_dir: str, dir_split_number: int, items_per_file: int
411) -> Optional[Dict[str, str]]:
412 """Process bibliographic resource data and its related entities"""
413 # Skip if the entity is a JournalVolume or JournalIssue
414 br_types = br_data.get("@type", [])
415 if (
416 "http://purl.org/spar/fabio/JournalVolume" in br_types
417 or "http://purl.org/spar/fabio/JournalIssue" in br_types
418 ):
419 return None
421 output = {field: "" for field in FIELDNAMES}
423 try:
424 # Extract OMID and basic BR information
425 entity_id = br_data.get("@id", "")
426 identifiers = [f'omid:br/{entity_id.split("/")[-1]}'] if entity_id else []
428 output["title"] = br_data.get("http://purl.org/dc/terms/title", [{}])[0].get(
429 "@value", ""
430 )
431 output["pub_date"] = br_data.get(
432 "http://prismstandard.org/namespaces/basic/2.0/publicationDate", [{}]
433 )[0].get("@value", "")
435 # Extract type
436 br_types = [
437 t
438 for t in br_data.get("@type", [])
439 if t != "http://purl.org/spar/fabio/Expression"
440 ]
441 output["type"] = URI_TYPE_DICT.get(br_types[0], "") if br_types else ""
443 # Process identifiers
444 if "http://purl.org/spar/datacite/hasIdentifier" in br_data:
445 for identifier in br_data["http://purl.org/spar/datacite/hasIdentifier"]:
446 id_uri = identifier["@id"]
447 id_file = find_file(rdf_dir, dir_split_number, items_per_file, id_uri)
448 if id_file:
449 id_data = load_json_from_file(id_file)
450 for graph in id_data:
451 for entity in graph.get("@graph", []):
452 if entity["@id"] == id_uri:
453 id_value = process_identifier(entity)
454 if id_value:
455 identifiers.append(id_value)
456 output["id"] = " ".join(identifiers)
458 # Process authors, editors and publishers through agent roles
459 authors = []
460 editors = []
461 publishers = []
463 # Create a dictionary to store agent roles and their next relations
464 agent_roles = {}
465 next_relations = {}
467 if "http://purl.org/spar/pro/isDocumentContextFor" in br_data:
468 # First pass: collect all agent roles and their next relations
469 for ar_data in br_data["http://purl.org/spar/pro/isDocumentContextFor"]:
470 ar_uri = ar_data["@id"]
471 ar_file = find_file(rdf_dir, dir_split_number, items_per_file, ar_uri)
472 if ar_file:
473 ar_data = load_json_from_file(ar_file)
474 for graph in ar_data:
475 for entity in graph.get("@graph", []):
476 if entity["@id"] == ar_uri:
477 # Store the agent role data
478 agent_roles[ar_uri] = entity
479 # Store the next relation if it exists
480 if "https://w3id.org/oc/ontology/hasNext" in entity:
481 next_ar = entity[
482 "https://w3id.org/oc/ontology/hasNext"
483 ][0]["@id"]
484 next_relations[ar_uri] = next_ar
486 # Process each role type separately
487 for role_type, role_list in [
488 ("author", authors),
489 ("editor", editors),
490 ("publisher", publishers),
491 ]:
492 first_ar = find_first_ar_by_role(agent_roles, next_relations, role_type)
493 if not first_ar:
494 continue
496 # Process agent roles in order for this role type
497 current_ar = first_ar
498 processed_ars = set()
499 max_iterations = len(agent_roles)
500 iterations = 0
502 while current_ar and current_ar in agent_roles:
503 if current_ar in processed_ars or iterations >= max_iterations:
504 print(
505 f"Warning: Detected cycle in hasNext relations or exceeded maximum iterations at AR: {current_ar}"
506 )
507 break
509 processed_ars.add(current_ar)
510 iterations += 1
512 # Process the current AR
513 entity = agent_roles[current_ar]
514 role = entity.get("http://purl.org/spar/pro/withRole", [{}])[0].get(
515 "@id", ""
516 )
518 # Only process if it matches our current role type
519 if role_type in role:
520 if "http://purl.org/spar/pro/isHeldBy" in entity:
521 ra_uri = entity["http://purl.org/spar/pro/isHeldBy"][0][
522 "@id"
523 ]
524 ra_file = find_file(
525 rdf_dir, dir_split_number, items_per_file, ra_uri
526 )
527 if ra_file:
528 ra_data = load_json_from_file(ra_file)
529 for ra_graph in ra_data:
530 for ra_entity in ra_graph.get("@graph", []):
531 if ra_entity["@id"] == ra_uri:
532 agent_name = process_responsible_agent(
533 ra_entity,
534 ra_uri,
535 rdf_dir,
536 dir_split_number,
537 items_per_file,
538 )
539 if agent_name:
540 role_list.append(agent_name)
542 # Move to next agent role
543 current_ar = next_relations.get(current_ar)
545 output["author"] = "; ".join(authors)
546 output["editor"] = "; ".join(editors)
547 output["publisher"] = "; ".join(publishers)
549 # Process venue information
550 if "http://purl.org/vocab/frbr/core#partOf" in br_data:
551 venue_uri = br_data["http://purl.org/vocab/frbr/core#partOf"][0]["@id"]
552 venue_file = find_file(rdf_dir, dir_split_number, items_per_file, venue_uri)
553 if venue_file:
554 venue_data = load_json_from_file(venue_file)
555 for graph in venue_data:
556 for entity in graph.get("@graph", []):
557 if entity["@id"] == venue_uri:
558 venue_info = process_hierarchical_venue(
559 entity, rdf_dir, dir_split_number, items_per_file
560 )
561 output.update(venue_info)
563 # Process page information
564 if "http://purl.org/vocab/frbr/core#embodiment" in br_data:
565 page_uri = br_data["http://purl.org/vocab/frbr/core#embodiment"][0]["@id"]
566 page_file = find_file(rdf_dir, dir_split_number, items_per_file, page_uri)
567 if page_file:
568 page_data = load_json_from_file(page_file)
569 for graph in page_data:
570 for entity in graph.get("@graph", []):
571 if entity["@id"] == page_uri:
572 start_page = entity.get(
573 "http://prismstandard.org/namespaces/basic/2.0/startingPage",
574 [{}],
575 )[0].get("@value", "")
576 end_page = entity.get(
577 "http://prismstandard.org/namespaces/basic/2.0/endingPage",
578 [{}],
579 )[0].get("@value", "")
580 if start_page or end_page:
581 output["page"] = f"{start_page}-{end_page}"
583 except (KeyError, IndexError) as e:
584 print(f"Error processing bibliographic resource: {e}")
586 return output
589def process_single_file(args):
590 """Process a single file and return the results"""
591 filepath, input_dir, dir_split_number, items_per_file, redis_params = args
592 results = []
594 # Initialize Redis client once
595 redis_client = redis.Redis(
596 host=redis_params["host"],
597 port=redis_params["port"],
598 db=redis_params["db"],
599 decode_responses=True,
600 )
602 data = load_json_from_file(filepath)
603 for graph in data:
604 for entity in graph.get("@graph", []):
605 # Skip if this is a JournalVolume or JournalIssue
606 entity_types = entity.get("@type", [])
607 if (
608 "http://purl.org/spar/fabio/JournalVolume" in entity_types
609 or "http://purl.org/spar/fabio/JournalIssue" in entity_types
610 ):
611 continue
613 # Extract OMID from entity ID
614 entity_id = entity.get("@id", "")
615 if entity_id:
616 omid = f"omid:br/{entity_id.split('/')[-1]}"
617 # Skip if already processed
618 if is_omid_processed(omid, redis_client):
619 continue
621 br_data = process_bibliographic_resource(
622 entity, input_dir, dir_split_number, items_per_file
623 )
624 if br_data:
625 results.append(br_data)
627 return results
630class ResultBuffer:
631 """Buffer to collect results and write them when reaching the row limit"""
633 def __init__(self, output_dir: str, max_rows: int = 3000):
634 self.buffer = []
635 self.output_dir = output_dir
636 self.max_rows = max_rows
637 self.file_counter = self._get_last_file_number() + 1
638 self.pbar = None
640 def _get_last_file_number(self) -> int:
641 """Find the highest file number from existing output files"""
642 if not os.path.exists(self.output_dir):
643 return -1
645 max_number = -1
646 for filename in os.listdir(self.output_dir):
647 if filename.startswith("output_") and filename.endswith(".csv"):
648 try:
649 number = int(filename[7:-4]) # Extract number from 'output_X.csv'
650 max_number = max(max_number, number)
651 except ValueError:
652 continue
653 return max_number
655 def set_progress_bar(self, total: int) -> None:
656 """Initialize progress bar with total number of files"""
657 self.pbar = tqdm(total=total, desc="Processing files")
659 def update_progress(self) -> None:
660 """Update progress bar"""
661 if self.pbar:
662 self.pbar.update(1)
664 def close_progress_bar(self) -> None:
665 """Close progress bar"""
666 if self.pbar:
667 self.pbar.close()
669 def add_results(self, results: List[Dict[str, str]]) -> None:
670 """Add results to buffer and write to file if max_rows is reached"""
671 self.buffer.extend(results)
672 while len(self.buffer) >= self.max_rows:
673 self._write_buffer_chunk()
675 def _write_buffer_chunk(self) -> None:
676 """Write max_rows records to a new file"""
677 chunk = self.buffer[: self.max_rows]
678 output_file = os.path.join(self.output_dir, f"output_{self.file_counter}.csv")
679 write_csv(output_file, chunk)
680 self.buffer = self.buffer[self.max_rows :]
681 self.file_counter += 1
683 def flush(self) -> None:
684 """Write any remaining results in buffer"""
685 if self.buffer:
686 output_file = os.path.join(
687 self.output_dir, f"output_{self.file_counter}.csv"
688 )
689 write_csv(output_file, self.buffer)
690 self.buffer = []
691 self.file_counter += 1
694def task_done(future: concurrent.futures.Future, result_buffer: ResultBuffer):
695 """Callback function for completed tasks"""
696 try:
697 results = future.result()
698 if results:
699 result_buffer.add_results(results)
700 result_buffer.update_progress() # Update progress after task completion
701 except Exception as e:
702 print(f"Task failed: {e}")
703 result_buffer.update_progress() # Update progress even if task fails
706def generate_csv(
707 input_dir: str,
708 output_dir: str,
709 dir_split_number: int,
710 items_per_file: int,
711 zip_output_rdf: bool,
712 redis_host: str = "localhost",
713 redis_port: int = 6379,
714 redis_db: int = 2,
715) -> None:
716 """Generate CSV files from RDF data using Pebble for process management"""
717 if not os.path.exists(output_dir):
718 os.makedirs(output_dir)
720 # Initialize Redis connection and load initial cache
721 redis_client = init_redis_connection(redis_host, redis_port, redis_db)
722 processed_count = load_processed_omids_to_redis(output_dir, redis_client)
724 # Process only files in the 'br' directory
725 br_dir = os.path.join(input_dir, "br")
726 if not os.path.exists(br_dir):
727 print(f"Error: bibliographic resources directory not found at {br_dir}")
728 return
730 # Collect all ZIP files
731 all_files = []
732 for root, _, files in os.walk(br_dir):
733 if "prov" in root:
734 continue
735 all_files.extend(os.path.join(root, f) for f in files if f.endswith(".zip"))
737 if not all_files:
738 print("No files found to process")
739 return
741 print(f"Processing {len(all_files)} files...")
743 # Redis connection parameters to pass to worker processes
744 redis_params = {"host": redis_host, "port": redis_port, "db": redis_db}
746 # Create result buffer and initialize progress bar
747 result_buffer = ResultBuffer(output_dir)
748 result_buffer.set_progress_bar(len(all_files))
750 # Process files one at a time using Pebble
751 with ProcessPool(max_workers=os.cpu_count(), max_tasks=1) as executor:
752 futures: List[concurrent.futures.Future] = []
753 for filepath in all_files:
754 future = executor.schedule(
755 function=process_single_file,
756 args=(
757 (
758 filepath,
759 input_dir,
760 dir_split_number,
761 items_per_file,
762 redis_params,
763 ),
764 ),
765 )
766 future.add_done_callback(lambda f: task_done(f, result_buffer))
767 futures.append(future)
769 # Wait for all futures to complete
770 for future in futures:
771 try:
772 future.result()
773 except Exception as e:
774 print(f"Error processing file: {e}")
775 return # Exit without clearing Redis if there's an error
777 # Flush any remaining results and close progress bar
778 result_buffer.flush()
779 result_buffer.close_progress_bar()
781 # Clear Redis cache only if we've successfully completed everything
782 redis_client.delete("processed_omids")
783 print("Processing complete. Redis cache cleared.")
786def write_csv(filepath: str, data: List[Dict[str, str]]) -> None:
787 """Write data to CSV file"""
788 with open(filepath, "w", newline="", encoding="utf-8") as f:
789 writer = csv.DictWriter(f, fieldnames=FIELDNAMES)
790 writer.writeheader()
791 writer.writerows(data)