Coverage for oc_ds_converter / preprocessing / nih.py: 88%
72 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-25 18:06 +0000
1# SPDX-FileCopyrightText: 2023 Arcangelo Massari <arcangelo.massari@unibo.it>
2#
3# SPDX-License-Identifier: ISC
5import csv
6import os.path
7import sys
8from os import makedirs
9from os.path import exists
11import pandas as pd
12from oc_ds_converter.preprocessing.base import Preprocessing
13from tqdm import tqdm
16class NIHPreProcessing(Preprocessing):
17 """This class aims at pre-processing iCite Database Snapshots (NIH Open
18 Citation Collection + ICite Metadata), available at:
19 https://nih.figshare.com/search?q=iCite+Database+Snapshot. In particular,
20 NIHPreProcessing splits the original CSV file in many lighter CSV files,
21 each one containing the number of entities specified in input by the user"""
22 def __init__(self, input_dir, output_dir, interval, filter=None):
23 self._req_type = ".csv"
24 self._input_dir = input_dir
25 self._output_dir = output_dir
26 if not exists(self._output_dir):
27 makedirs(self._output_dir)
28 self._interval = interval
29 if filter:
30 self._filter = filter
31 else:
32 self._filter = ["pmid","doi","title","authors","year","journal", "cited_by","references"]
33 super(NIHPreProcessing, self).__init__()
35 def split_input(self):
36 maxInt = sys.maxsize
37 while True:
38 # decrease the maxInt value by factor 10
39 # as long as the OverflowError occurs.
40 try:
41 csv.field_size_limit(maxInt)
42 break
43 except OverflowError:
44 maxInt = int(maxInt / 10)
46 all_files = self.get_all_files(self._input_dir, self._req_type)
47 count = 0
48 lines = []
49 headers = self._filter
50 for file_idx, file in enumerate(all_files):
51 if isinstance(file, list):
52 file = file[0]
53 if file:
54 #df = pd.DataFrame()
55 iter_csv = pd.read_csv(file, usecols=self._filter, chunksize=1000, engine='python')
56 try:
57 for chunk in tqdm(iter_csv):
58 try:
59 f = pd.concat([chunk], ignore_index=True)
60 f.fillna("", inplace=True)
61 #df = pd.read_csv(file, usecols=self._filter, low_memory=True, dtype={'pmid': str, 'doi': str, 'title': str, 'authors': str, 'year': int, 'journal': str, 'is_research_article': bool, 'citation_count':int, 'field_citation_rate': float, 'expected_citations_per_year': float, 'citations_per_year':float, 'relative_citation_ratio': float, 'nih_percentile': int, 'human': float, 'animal': float, 'molecular_cellular': float, 'x_coord':float, 'y_coord': float, 'apt': float, 'is_clinical': bool, 'cited_by_clin': str, 'cited_by': str, 'references':str, 'provisional': str})
62 #df.fillna("", inplace=True)
63 f1 = f.values.tolist()
65 for line in tqdm(f1):
66 try:
67 count += 1
68 lines.append(line)
69 if int(count) != 0 and int(count) % int(self._interval) == 0:
70 lines = self.splitted_to_file(
71 count, self._interval, self._output_dir, lines, headers
72 )
73 except:
74 print("error with line:", line)
75 except:
76 print("error with chunk:", chunk)
77 except:
78 print("error with pd.read_csv")
80 if len(lines) > 0:
81 self.splitted_to_file(count, self._interval, self._output_dir, lines, headers)
84 def splitted_to_file(self, cur_n, target_n, out_dir, data, headers=None):
85 if int(cur_n) != 0 and int(cur_n) % int(target_n) == 0:
86 # to be logged: print("Processed lines:", cur_n, ". Reduced csv nr.", cur_n // target_n)
87 filename = "CSVFile_" + str(cur_n // target_n) + self._req_type
88 with (
89 open(os.path.join(out_dir, filename), "w", encoding="utf8", newline="")
90 ) as f_out:
91 writer = csv.writer(f_out)
92 writer.writerow(headers)
93 writer.writerows(data)
94 lines = []
95 return lines
96 else:
97 # to be logged: print("Processed lines:", cur_n)
98 filename = "CSVFile_" + "Rem" + self._req_type
99 with (
100 open(os.path.join(out_dir, filename), "w", encoding="utf8", newline="")
101 ) as f_out:
102 writer = csv.writer(f_out)
103 writer.writerow(headers)
104 writer.writerows(data)
105 return