Source code for asreview.data.base

# Copyright 2019-2022 The ASReview Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = ["ASReviewData", "load_data"]

import hashlib
from io import StringIO
from pathlib import Path

import numpy as np
import pandas as pd
from pandas.api.types import is_object_dtype
from pandas.api.types import is_string_dtype

from asreview.config import COLUMN_DEFINITIONS
from asreview.config import LABEL_NA
from asreview.datasets import DatasetManager
from asreview.datasets import DatasetNotFoundError
from asreview.exceptions import BadFileFormatError
from asreview.io import PaperRecord
from asreview.io.utils import convert_keywords
from asreview.io.utils import type_from_column
from asreview.utils import _entry_points
from asreview.utils import _get_filename_from_url
from asreview.utils import is_iterable
from asreview.utils import is_url


[docs] def load_data(name, **kwargs): """Load data from file, URL, or plugin. Parameters ---------- name: str, pathlib.Path File path, URL, or alias of extension dataset. **kwargs: Keyword arguments passed to the reader. Returns ------- asreview.ASReviewData: Inititalized ASReview data object. """ # check is file or URL if is_url(name) or Path(name).exists(): return ASReviewData.from_file(name, **kwargs) # check if dataset is plugin dataset try: return ASReviewData.from_extension(name, **kwargs) except DatasetNotFoundError: pass # Could not find dataset, return None. raise FileNotFoundError(f"File, URL, or dataset does not exist: '{name}'")
[docs] class ASReviewData: """Data object to the dataset with texts, labels, DOIs etc. Arguments --------- df: pandas.DataFrame Dataframe containing the data for the ASReview data object. column_spec: dict Specification for which column corresponds to which standard specification. Key is the standard specification, key is which column it is actually in. Default: None. Attributes ---------- record_ids: numpy.ndarray Return an array representing the data in the Index. texts: numpy.ndarray Returns an array with either headings, bodies, or both. headings: numpy.ndarray Returns an array with dataset headings. title: numpy.ndarray Identical to headings. bodies: numpy.ndarray Returns an array with dataset bodies. abstract: numpy.ndarray Identical to bodies. notes: numpy.ndarray Returns an array with dataset notes. keywords: numpy.ndarray Returns an array with dataset keywords. authors: numpy.ndarray Returns an array with dataset authors. doi: numpy.ndarray Returns an array with dataset DOI. included: numpy.ndarray Returns an array with document inclusion markers. final_included: numpy.ndarray Pending deprecation! Returns an array with document inclusion markers. labels: numpy.ndarray Identical to included. """ def __init__(self, df=None, column_spec=None): self.df = df self.prior_idx = np.array([], dtype=int) self.max_idx = max(df.index.values) + 1 # Infer column specifications if it is not given. if column_spec is None: self.column_spec = {} for col_name in list(df): data_type = type_from_column(col_name, COLUMN_DEFINITIONS) if data_type is not None: self.column_spec[data_type] = col_name else: self.column_spec = column_spec if "included" not in self.column_spec: self.column_spec["included"] = "included" def __len__(self): if self.df is None: return 0 return len(self.df.index)
[docs] def hash(self): """Compute a hash from the dataset. Returns ------- str: SHA1 hash, computed from the titles/abstracts of the dataframe. """ if ( len(self.df.index) < 1000 and self.bodies is not None ) or self.texts is None: texts = " ".join(self.bodies) else: texts = " ".join(self.texts) return hashlib.sha1( " ".join(texts).encode(encoding="UTF-8", errors="ignore") ).hexdigest()
[docs] @classmethod def from_file(cls, fp, reader=None): """Create instance from supported file format. It works in two ways; either manual control where the conversion functions are supplied or automatic, where it searches in the entry points for the right conversion functions. Arguments --------- fp: str, pathlib.Path Read the data from this file or url. reader: class Reader to import the file. """ if reader is not None: return cls(reader.read_data(fp)) # get the filename from a url else file path if is_url(fp): fn = _get_filename_from_url(fp) else: fn = Path(fp).name try: reader = _entry_points( group="asreview.readers")[Path(fn).suffix].load() except Exception: raise BadFileFormatError(f"Importing file {fp} not possible.") df, column_spec = reader.read_data(fp) return cls(df, column_spec=column_spec)
[docs] @classmethod def from_extension(cls, name, reader=None): """Load a dataset from extension. Arguments --------- fp: str, pathlib.Path Read the data from this file or url. reader: class Reader to import the file. """ dataset = DatasetManager().find(name) if dataset.filepath: fp = dataset.filepath else: # build dataset to temporary file reader = dataset.reader() fp = StringIO(dataset.to_file()) if reader is None: # get the filename from a url else file path if is_url(fp): fn = _get_filename_from_url(fp) else: fn = Path(fp).name try: reader = _entry_points( group="asreview.readers")[Path(fn).suffix].load() except Exception: raise BadFileFormatError(f"Importing file {fp} not possible.") df, column_spec = reader.read_data(fp) return cls(df, column_spec=column_spec)
[docs] def record(self, i, by_index=True): """Create a record from an index. Arguments --------- i: int, iterable Index of the record, or list of indices. by_index: bool If True, take the i-th value as used internally by the review. If False, take the record with record_id==i. Returns ------- PaperRecord The corresponding record if i was an integer, or a list of records if i was an iterable. """ if not is_iterable(i): index_list = [i] else: index_list = i if by_index: records = [ PaperRecord( **self.df.iloc[j], column_spec=self.column_spec, record_id=self.df.index.values[j], ) for j in index_list ] else: records = [ PaperRecord( **self.df.loc[j, :], record_id=j, column_spec=self.column_spec ) for j in index_list ] if is_iterable(i): return records return records[0]
@property def record_ids(self): return self.df.index.values @property def texts(self): if self.title is None: return self.abstract if self.abstract is None: return self.title cur_texts = np.array( [self.title[i] + " " + self.abstract[i] for i in range(len(self))], dtype=object, ) return cur_texts @property def headings(self): return self.title @property def title(self): try: return self.df[self.column_spec["title"]].values except KeyError: return None @property def bodies(self): return self.abstract @property def abstract(self): try: return self.df[self.column_spec["abstract"]].values except KeyError: return None @property def notes(self): try: return self.df[self.column_spec["notes"]].values except KeyError: return None @property def keywords(self): try: return self.df[self.column_spec["keywords"]].apply(convert_keywords).values except KeyError: return None @property def authors(self): try: return self.df[self.column_spec["authors"]].values except KeyError: return None @property def doi(self): try: return self.df[self.column_spec["doi"]].values except KeyError: return None @property def url(self): try: return self.df[self.column_spec["url"]].values except KeyError: return None
[docs] def get(self, name): "Get column with name." try: return self.df[self.column_spec[name]].values except KeyError: return self.df[name].values
@property def prior_data_idx(self): "Get prior_included, prior_excluded from dataset." convert_array = np.full(self.max_idx, 999999999) convert_array[self.df.index.values] = np.arange(len(self.df.index)) return convert_array[self.prior_idx] @property def included(self): return self.labels @included.setter def included(self, labels): self.labels = labels @property # pending deprecation def final_included(self): return self.labels @final_included.setter # pending deprecation def final_included(self, labels): self.labels = labels @property def labels(self): try: column = self.column_spec["included"] return self.df[column].values except KeyError: return None @labels.setter def labels(self, labels): try: column = self.column_spec["included"] self.df[column] = labels except KeyError: self.df["included"] = labels
[docs] def prior_labels(self, state, by_index=True): """Get the labels that are marked as 'prior'. state: BaseState Open state that contains the label information. by_index: bool If True, return internal indexing. If False, return record_ids for indexing. Returns ------- numpy.ndarray Array of indices that have the 'prior' property. """ prior_indices = state.get_priors()["record_id"].to_list() if by_index: return np.array(prior_indices, dtype=int) else: return self.df.index.values[prior_indices]
[docs] def to_file( self, fp, labels=None, ranking=None, writer=None, keep_old_labels=False): """Export data object to file. RIS, CSV, TSV and Excel are supported file formats at the moment. Arguments --------- fp: str Filepath to export to. labels: list, numpy.ndarray Labels to be inserted into the dataframe before export. ranking: list, numpy.ndarray Optionally, dataframe rows can be reordered. writer: class Writer to export the file. keep_old_labels: bool If True, the old labels are kept in a column 'asreview_label_to_validate'. Default False. """ df = self.to_dataframe( labels=labels, ranking=ranking, keep_old_labels=keep_old_labels ) if writer is not None: writer.write_data(df, fp, labels=labels, ranking=ranking) else: best_suffix = None for entry in _entry_points(group="asreview.writers"): if Path(fp).suffix == entry.name: if best_suffix is None or len(entry.name) > len(best_suffix): best_suffix = entry.name if best_suffix is None: raise BadFileFormatError( f"Error exporting file {fp}, no capabilities " "for exporting such a file." ) writer = _entry_points(group="asreview.writers")[best_suffix].load() writer.write_data(df, fp, labels=labels, ranking=ranking)
[docs] def to_dataframe(self, labels=None, ranking=None, keep_old_labels=False): """Create new dataframe with updated label (order). Arguments --------- labels: list, numpy.ndarray Current labels will be overwritten by these labels (including unlabelled). No effect if labels is None. ranking: list Reorder the dataframe according to these record_ids. Default ordering if ranking is None. keep_old_labels: bool If True, the old labels are kept in a column 'asreview_label_to_validate'. Default False. Returns ------- pandas.DataFrame Dataframe of all available record data. """ result_df = pd.DataFrame.copy(self.df) col_label = self.column_spec["included"] # if there are labels, add them to the frame if labels is not None: # unnest list of nested (record_id, label) tuples labeled_record_ids = [x[0] for x in labels] labeled_values = [x[1] for x in labels] if keep_old_labels: result_df["asreview_label_to_validate"] = \ result_df[col_label].replace(LABEL_NA, None).astype("Int64") # remove the old results and write the values result_df[col_label] = LABEL_NA result_df.loc[labeled_record_ids, col_label] = labeled_values result_df[col_label] = result_df[col_label] \ .replace(LABEL_NA, None).astype("Int64") # if there is a ranking, apply this ranking as order if ranking is not None: # sort the datasets based on the ranking result_df = result_df.loc[ranking] # append a column with 1 to n result_df["asreview_ranking"] = np.arange(1, len(result_df) + 1) return result_df
[docs] def duplicated(self, pid="doi"): """Return boolean Series denoting duplicate rows. Identify duplicates based on titles and abstracts and if available, on a persistent identifier (PID) such as the Digital Object Identifier (`DOI <https://www.doi.org/>`_). Arguments --------- pid: string Which persistent identifier to use for deduplication. Default is 'doi'. Returns ------- pandas.Series Boolean series for each duplicated rows. """ if pid in self.df.columns: # in case of strings, strip whitespaces and replace empty strings with None if is_string_dtype(self.df[pid]) or is_object_dtype(self.df[pid]): s_pid = self.df[pid].str.strip().replace("", None) if pid == "doi": s_pid = s_pid.str.lower().str.replace( r"^https?://(www\.)?doi\.org/", "", regex=True ) else: s_pid = self.df[pid] # save boolean series for duplicates based on persistent identifiers s_dups_pid = (s_pid.duplicated()) & (s_pid.notnull()) else: s_dups_pid = None # get the texts, clean them and replace empty strings with None s = ( pd.Series(self.texts) .str.replace("[^A-Za-z0-9]", "", regex=True) .str.lower() .str.strip() .replace("", None) ) # save boolean series for duplicates based on titles/abstracts s_dups_text = (s.duplicated()) & (s.notnull()) # final boolean series for all duplicates if s_dups_pid is not None: s_dups = s_dups_pid | s_dups_text else: s_dups = s_dups_text return s_dups
[docs] def drop_duplicates(self, pid="doi", inplace=False, reset_index=True): """Drop duplicate records. Drop duplicates based on titles and abstracts and if available, on a persistent identifier (PID) such the Digital Object Identifier (`DOI <https://www.doi.org/>`_). Arguments --------- pid: string, default 'doi' Which persistent identifier to use for deduplication. inplace: boolean, default False Whether to modify the DataFrame rather than creating a new one. reset_index: boolean, default True If True, the existing index column is reset to the default integer index. Returns ------- pandas.DataFrame or None DataFrame with duplicates removed or None if inplace=True """ df = self.df[~self.duplicated(pid)] if reset_index: df = df.reset_index(drop=True) if inplace: self.df = df return return df