# Copyright 2019-2022 The ASReview Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
from pathlib import Path
from urllib.error import HTTPError
from urllib.parse import urlparse
from urllib.request import urlopen
import numpy as np
import pandas as pd
from pandas.api.types import is_object_dtype
from pandas.api.types import is_string_dtype
from asreview.config import COLUMN_DEFINITIONS
from asreview.config import LABEL_NA
from asreview.datasets import DatasetManager
from asreview.datasets import DatasetNotFoundError
from asreview.exceptions import BadFileFormatError
from asreview.io import PaperRecord
from asreview.io.utils import convert_keywords
from asreview.io.utils import type_from_column
from asreview.utils import get_entry_points
from asreview.utils import is_iterable
from asreview.utils import is_url
[docs]def load_data(name, *args, **kwargs):
"""Load data from file, URL, or plugin.
Parameters
----------
name: str, pathlib.Path
File path, URL, or alias of extension dataset.
Returns
-------
asreview.ASReviewData:
Inititalized ASReview data object.
"""
# check is file or URL
if is_url(name) or Path(name).exists():
return ASReviewData.from_file(name, *args, **kwargs)
# check if dataset is plugin dataset\
try:
dataset_path = DatasetManager().find(name).filepath
return ASReviewData.from_file(dataset_path, *args, **kwargs)
except DatasetNotFoundError:
pass
# Could not find dataset, return None.
raise FileNotFoundError(
f"File, URL, or dataset does not exist: '{name}'")
def _get_filename_from_url(url):
if not is_url(url):
raise ValueError(f"'{url}' is not a valid URL.")
if Path(urlparse(url).path).suffix:
return Path(urlparse(url).path).name, url
else:
try:
return urlopen(url).headers.get_filename(), url
except HTTPError as err:
# 308 (Permanent Redirect) not supported
# See https://bugs.python.org/issue40321
if err.code == 308:
return _get_filename_from_url(err.headers.get("Location"))
else:
raise err
[docs]class ASReviewData():
"""Data object to the dataset with texts, labels, DOIs etc.
Arguments
---------
df: pandas.DataFrame
Dataframe containing the data for the ASReview data object.
column_spec: dict
Specification for which column corresponds to which standard
specification. Key is the standard specification, key is which column
it is actually in. Default: None.
Attributes
----------
record_ids: numpy.ndarray
Return an array representing the data in the Index.
texts: numpy.ndarray
Returns an array with either headings, bodies, or both.
headings: numpy.ndarray
Returns an array with dataset headings.
title: numpy.ndarray
Identical to headings.
bodies: numpy.ndarray
Returns an array with dataset bodies.
abstract: numpy.ndarray
Identical to bodies.
notes: numpy.ndarray
Returns an array with dataset notes.
keywords: numpy.ndarray
Returns an array with dataset keywords.
authors: numpy.ndarray
Returns an array with dataset authors.
doi: numpy.ndarray
Returns an array with dataset DOI.
included: numpy.ndarray
Returns an array with document inclusion markers.
final_included: numpy.ndarray
Pending deprecation! Returns an array with document inclusion markers.
labels: numpy.ndarray
Identical to included.
"""
def __init__(self,
df=None,
column_spec=None):
self.df = df
self.prior_idx = np.array([], dtype=int)
self.max_idx = max(df.index.values) + 1
# Infer column specifications if it is not given.
if column_spec is None:
self.column_spec = {}
for col_name in list(df):
data_type = type_from_column(col_name, COLUMN_DEFINITIONS)
if data_type is not None:
self.column_spec[data_type] = col_name
else:
self.column_spec = column_spec
if "included" not in self.column_spec:
self.column_spec["included"] = "included"
def __len__(self):
if self.df is None:
return 0
return len(self.df.index)
[docs] def hash(self):
"""Compute a hash from the dataset.
Returns
-------
str:
SHA1 hash, computed from the titles/abstracts of the dataframe.
"""
if ((len(self.df.index) < 1000 and self.bodies is not None) or
self.texts is None):
texts = " ".join(self.bodies)
else:
texts = " ".join(self.texts)
return hashlib.sha1(" ".join(texts).encode(
encoding='UTF-8', errors='ignore')).hexdigest()
[docs] @classmethod
def from_file(cls, fp, reader=None):
"""Create instance from supported file format.
It works in two ways; either manual control where the conversion
functions are supplied or automatic, where it searches in the entry
points for the right conversion functions.
Arguments
---------
fp: str, pathlib.Path
Read the data from this file or url.
reader: class
Reader to import the file.
"""
if reader is not None:
return cls(reader.read_data(fp))
# get the filename from a url else file path
if is_url(fp):
fn, fp = _get_filename_from_url(fp)
else:
fn = Path(fp).name
entry_points = get_entry_points(entry_name="asreview.readers")
try:
reader = entry_points[Path(fn).suffix].load()
except Exception:
raise BadFileFormatError(
f"Importing file {fp} not possible.")
df, column_spec = reader.read_data(fp)
return cls(df, column_spec=column_spec)
[docs] def record(self, i, by_index=True):
"""Create a record from an index.
Arguments
---------
i: int, iterable
Index of the record, or list of indices.
by_index: bool
If True, take the i-th value as used internally by the review.
If False, take the record with record_id==i.
Returns
-------
PaperRecord
The corresponding record if i was an integer, or a list of records
if i was an iterable.
"""
if not is_iterable(i):
index_list = [i]
else:
index_list = i
if by_index:
records = [
PaperRecord(**self.df.iloc[j],
column_spec=self.column_spec,
record_id=self.df.index.values[j])
for j in index_list
]
else:
records = [
PaperRecord(**self.df.loc[j, :],
record_id=j,
column_spec=self.column_spec) for j in index_list
]
if is_iterable(i):
return records
return records[0]
@property
def record_ids(self):
return self.df.index.values
@property
def texts(self):
if self.title is None:
return self.abstract
if self.abstract is None:
return self.title
cur_texts = np.array([
self.title[i] + " " + self.abstract[i] for i in range(len(self))
], dtype=object)
return cur_texts
@property
def headings(self):
return self.title
@property
def title(self):
try:
return self.df[self.column_spec["title"]].values
except KeyError:
return None
@property
def bodies(self):
return self.abstract
@property
def abstract(self):
try:
return self.df[self.column_spec["abstract"]].values
except KeyError:
return None
@property
def notes(self):
try:
return self.df[self.column_spec["notes"]].values
except KeyError:
return None
@property
def keywords(self):
try:
return self.df[self.column_spec["keywords"]].apply(
convert_keywords).values
except KeyError:
return None
@property
def authors(self):
try:
return self.df[self.column_spec["authors"]].values
except KeyError:
return None
@property
def doi(self):
try:
return self.df[self.column_spec["doi"]].values
except KeyError:
return None
@property
def url(self):
try:
return self.df[self.column_spec["url"]].values
except KeyError:
return None
[docs] def get(self, name):
"Get column with name."
try:
return self.df[self.column_spec[name]].values
except KeyError:
return self.df[name].values
@property
def prior_data_idx(self):
"Get prior_included, prior_excluded from dataset."
convert_array = np.full(self.max_idx, 999999999)
convert_array[self.df.index.values] = np.arange(len(self.df.index))
return convert_array[self.prior_idx]
@property
def included(self):
return self.labels
@included.setter
def included(self, labels):
self.labels = labels
@property # pending deprecation
def final_included(self):
return self.labels
@final_included.setter # pending deprecation
def final_included(self, labels):
self.labels = labels
@property
def labels(self):
try:
column = self.column_spec["included"]
return self.df[column].values
except KeyError:
return None
@labels.setter
def labels(self, labels):
try:
column = self.column_spec["included"]
self.df[column] = labels
except KeyError:
self.df["included"] = labels
[docs] def prior_labels(self, state, by_index=True):
"""Get the labels that are marked as 'prior'.
state: BaseState
Open state that contains the label information.
by_index: bool
If True, return internal indexing.
If False, return record_ids for indexing.
Returns
-------
numpy.ndarray
Array of indices that have the 'prior' property.
"""
prior_indices = state.get_priors()["record_id"].to_list()
if by_index:
return np.array(prior_indices, dtype=int)
else:
return self.df.index.values[prior_indices]
[docs] def to_file(self, fp, labels=None, ranking=None, writer=None):
"""Export data object to file.
RIS, CSV, TSV and Excel are supported file formats at the moment.
Arguments
---------
fp: str
Filepath to export to.
labels: list, numpy.ndarray
Labels to be inserted into the dataframe before export.
ranking: list, numpy.ndarray
Optionally, dataframe rows can be reordered.
writer: class
Writer to export the file.
"""
df = self.to_dataframe(labels=labels, ranking=ranking)
if writer is not None:
writer.write_data(df, fp, labels=labels, ranking=ranking)
else:
entry_points = get_entry_points(entry_name="asreview.writers")
best_suffix = None
for suffix, entry in entry_points.items():
if Path(fp).suffix == suffix:
if best_suffix is None or len(suffix) > len(best_suffix):
best_suffix = suffix
if best_suffix is None:
raise BadFileFormatError(f"Error exporting file {fp}, no capabilities "
"for exporting such a file.")
writer = entry_points[best_suffix].load()
writer.write_data(df, fp, labels=labels, ranking=ranking)
[docs] def to_dataframe(self, labels=None, ranking=None):
"""Create new dataframe with updated label (order).
Arguments
---------
labels: list, numpy.ndarray
Current labels will be overwritten by these labels
(including unlabelled). No effect if labels is None.
ranking: list
Reorder the dataframe according to these record_ids.
Default ordering if ranking is None.
Returns
-------
pandas.DataFrame
Dataframe of all available record data.
"""
result_df = pd.DataFrame.copy(self.df)
col_label = self.column_spec["included"]
# if there are labels, add them to the frame
if labels is not None:
# unnest the nested (record_id, label) tuples
labeled_record_ids = [x[0] for x in labels]
labeled_values = [x[1] for x in labels]
# remove the old results and write the values
result_df[col_label] = LABEL_NA
result_df.loc[labeled_record_ids, col_label] = labeled_values
# if there is a ranking, apply this ranking as order
if ranking is not None:
# sort the datasets based on the ranking
result_df = result_df.loc[ranking]
# append a column with 1 to n
result_df["asreview_ranking"] = np.arange(1, len(result_df) + 1)
# replace labeled NA values by np.nan
if col_label in list(result_df):
result_df[col_label] = result_df[col_label].astype(object)
result_df.loc[result_df[col_label] == LABEL_NA, col_label] = np.nan
return result_df
[docs] def duplicated(self, pid='doi'):
"""Return boolean Series denoting duplicate rows.
Identify duplicates based on titles and abstracts and if available,
on a persistent identifier (PID) such as the Digital Object Identifier
(`DOI <https://www.doi.org/>`_).
Arguments
---------
pid: string
Which persistent identifier to use for deduplication.
Default is 'doi'.
Returns
-------
pandas.Series
Boolean series for each duplicated rows.
"""
if pid in self.df.columns:
# in case of strings, strip whitespaces and replace empty strings with None
if is_string_dtype(self.df[pid]) or is_object_dtype(self.df[pid]):
s_pid = self.df[pid].str.strip().replace("", None)
else:
s_pid = self.df[pid]
# save boolean series for duplicates based on persistent identifiers
s_dups_pid = ((s_pid.duplicated()) & (s_pid.notnull()))
else:
s_dups_pid = None
# get the texts, clean them and replace empty strings with None
s = pd.Series(self.texts) \
.str.replace("[^A-Za-z0-9]", "", regex=True) \
.str.lower().str.strip().replace("", None)
# save boolean series for duplicates based on titles/abstracts
s_dups_text = ((s.duplicated()) & (s.notnull()))
# final boolean series for all duplicates
if s_dups_pid is not None:
s_dups = s_dups_pid | s_dups_text
else:
s_dups = s_dups_text
return s_dups
[docs] def drop_duplicates(self, pid='doi', inplace=False, reset_index=True):
"""Drop duplicate records.
Drop duplicates based on titles and abstracts and if available,
on a persistent identifier (PID) such the Digital Object Identifier
(`DOI <https://www.doi.org/>`_).
Arguments
---------
pid: string, default 'doi'
Which persistent identifier to use for deduplication.
inplace: boolean, default False
Whether to modify the DataFrame rather than creating a new one.
reset_index: boolean, default True
If True, the existing index column is reset to the default integer index.
Returns
-------
pandas.DataFrame or None
DataFrame with duplicates removed or None if inplace=True
"""
df = self.df[~self.duplicated(pid)]
if reset_index:
df = df.reset_index(drop=True)
if inplace:
self.df = df
return
return df