Source code for asreview.project.api

# Copyright 2019-2025 The ASReview Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = [
    "ProjectError",
    "ProjectNotFoundError",
]

import functools
import json
import shutil
import tempfile
import time
import traceback
import warnings
import zipfile
from dataclasses import asdict
from pathlib import Path
from urllib.request import urlretrieve
from uuid import uuid4

import jsonschema
import numpy as np
import scipy.sparse as sp
from filelock import FileLock

from asreview.data.loader import _from_file
from asreview.data.loader import _get_reader
from asreview.data.utils import identify_record_groups
from asreview.database.database import Database
from asreview.datasets import DatasetManager
from asreview.learner import ActiveLearningCycle
from asreview.learner import ActiveLearningCycleData
from asreview.models import get_ai_config
from asreview.project.exceptions import ProjectError
from asreview.project.exceptions import ProjectNotFoundError
from asreview.project.migration import detect_version
from asreview.project.migration import migrate_project
from asreview.project.schema import SCHEMA
from asreview.utils import _get_filename_from_url
from asreview.utils import _is_url

try:
    from asreview._version import __version__
except ImportError:
    __version__ = "0.0.0"


[docs] def is_project(project_dir): """ Check if the given path is a valid ASReview project. Parameters ---------- project_dir : str | Path The path to the project directory. Returns ------- bool True if the path is a valid ASReview project, False otherwise. """ project_dir = Path(project_dir) if not project_dir.exists(): return False if not project_dir.is_dir(): return False project_config_fp = Path(project_dir, Project.PATH_CONFIG) if not project_config_fp.exists(): return False with open(project_config_fp) as f: project_config = json.load(f) if detect_version(project_config) != Project.VERSION: return False return True
[docs] class Project: """Project class for ASReview project files. This class represents the complete data file for a review project. This data is contained in a single directory with the following files and subdirectories: - `project.json`: A JSON file containing the configuration and metadata of the project. It's structure is described in `schema.py` - `data/`: A directory containing the input data file exactly as provided by the user. When exporting, this input data is merged with the results of the review to get the export file. - `feature_matrices/`: A directory containing all the feature matrices that are generated during the review. - `results.db`: An SQLite database containing all data generated by ASReview: the data parsed from the input file, the labeled records, the last model ranking etc. See `asreview/data` for information on the parsing of the input. See `asreview/state` for information on the model and the labeling decisions. """ VERSION = 3 MODE_SIMULATE = "simulate" PATH_CONFIG = "project.json" PATH_CONFIG_LOCK = "project.json.lock" PATH_FEATURE_MATRICES = "feature_matrices" PATH_DATA_DIR = "data" PATH_DB = "results.db" PATH_ERROR = "error.json" def __init__(self, project_path, project_id=None): self.project_path = Path(project_path) self.project_id = project_id self.data_dir = Path(self.project_path, self.PATH_DATA_DIR) self.db_path = Path(self.project_path, self.PATH_DB) self.error_path = Path(self.project_path, self.PATH_ERROR) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): """Close the project and release all resources. Closes the database connection if it was opened. Safe to call multiple times. """ if "db" in self.__dict__: self.db.close()
@functools.cached_property def db(self): return Database(self.db_path) @property def input_data_fp(self): datasets = self.config.get("datasets") if not datasets: return file_name = datasets[0]["name"] return self.data_dir / file_name
[docs] @classmethod def create( cls, project_path, project_id=None, project_mode="oracle", project_name=None, project_tags=None, ): """Initialize the necessary files specific to the web app.""" project_path = Path(project_path) if project_path.exists(): raise ValueError("Project path is not empty.") if project_id is None: project_id = project_path.stem if project_name is None: project_name = project_path.stem if project_path.is_dir(): raise IsADirectoryError(f"Project folder {project_path} already exists.") try: project_path.mkdir(parents=True, exist_ok=True) Path(project_path, "data").mkdir(exist_ok=True) Path(project_path, cls.PATH_FEATURE_MATRICES).mkdir(exist_ok=True) with Database(Path(project_path, cls.PATH_DB)) as database: database.create_tables() config = { "version": __version__, "id": project_id, "mode": project_mode, "name": project_name, "created_at_unix": int(time.time()), "feature_matrices": [], "tags": project_tags, "project_file_version": cls.VERSION, } jsonschema.validate(instance=config, schema=SCHEMA) project_fp = Path(project_path, cls.PATH_CONFIG) project_fp_lock = Path(project_path, cls.PATH_CONFIG_LOCK) lock = FileLock(project_fp_lock, timeout=3) with lock: with open(project_fp, "w") as f: json.dump(config, f) except Exception as err: shutil.rmtree(project_path) raise err return cls(project_path, project_id=project_id)
@property def config(self): try: return self._config except AttributeError: project_fp = Path(self.project_path, self.PATH_CONFIG) project_fp_lock = Path(self.project_path, self.PATH_CONFIG_LOCK) lock = FileLock(project_fp_lock, timeout=3) if not project_fp.exists(): raise ProjectNotFoundError(f"Project '{self.project_path}' not found") with lock: # read the file with project info with open(project_fp) as fp: config = json.load(fp) self._config = config return config @config.setter def config(self, config): project_fp = Path(self.project_path, self.PATH_CONFIG) project_fp_lock = Path(self.project_path, self.PATH_CONFIG_LOCK) lock = FileLock(project_fp_lock, timeout=3) with lock: with open(project_fp, "w") as f: json.dump(config, f) self._config = config
[docs] def update_config(self, **kwargs): """Update project info""" config = self.config config.update(kwargs.copy()) jsonschema.validate(instance=config, schema=SCHEMA) self.config = config return config
[docs] def add_dataset(self, fp, dataset_id=None, file_writer=None): """Add a dataset to the project file. Parameters ---------- fp: str, Path Filepath to the dataset. It will be copied to the correct location in the project file. """ if dataset_id is None: dataset_id = uuid4().hex self.data_dir.mkdir(exist_ok=True) if file_writer is not None: save_fp = self.data_dir / fp file_writer(save_fp) elif _is_url(fp): filename = _get_filename_from_url(fp) save_fp = self.data_dir / filename urlretrieve(fp, save_fp) elif Path(fp).exists(): save_fp = self.data_dir / Path(fp).name shutil.copy(fp, save_fp) else: dataset = DatasetManager().find(fp) if dataset is None: raise ValueError( "fp should be existing file, or URL or dataset, but does not" f" exist: {fp}" ) save_fp = self.data_dir / dataset.filename dataset.to_file(save_fp) file_name = save_fp.name records = _from_file(save_fp, dataset_id=dataset_id) # Internals of the records are leaking out here. We are checking for a specific # field and a specific value. If the presence of the field `included` is # necessary in the input data, we should move it from `Record` to the `Base` # class, so that all record implementations have it. if self.config["mode"] == self.MODE_SIMULATE and ( all([r.included is None for r in records]) ): raise ValueError( "Dataset for simulation mode must have labels for all records - " "got dataset without any labels" ) if self.config["mode"] == self.MODE_SIMULATE and ( any([r.included is None for r in records]) ): raise ValueError( "Dataset for simulation mode must be fully labeled - " "got records with missing labels" ) self.db.input.add_records(records=records) groups = identify_record_groups(records) self.db.input.set_groups(groups) # This config update assumes that the project only has one dataset. self.update_config( name=file_name.rsplit(".", 1)[0], datasets=[{"id": dataset_id, "name": file_name}], )
[docs] def label_priors(self): """Label prior knowledge from a partially labeled dataset. If the input dataset is partially labeled (some records have an ``included`` value of 0 or 1 while others are unlabeled), the labeled records are stored as prior knowledge in the results table. Fully labeled or fully unlabeled datasets are skipped. """ # Fetch record_id and included together so the positional alignment is # guaranteed. Querying them separately is not safe: SQLite does not # guarantee the same row order across separate queries without ORDER BY, # which can cause labels to be assigned to the wrong records. data = self.db.input[["record_id", "included"]] labeled = data[data["included"].notnull()] if 0 < len(labeled) < len(data): with self.db as db: for row in labeled.itertuples(index=False): db.label_record(int(row.record_id), int(row.included), user_id=None)
[docs] def remove_dataset(self): """Remove dataset from project.""" raise NotImplementedError("Removing datasets is not implemented yet")
[docs] def read_input_data(self, *args, **kwargs): reader = self.get_input_data_reader() return reader.read_data(self.input_data_fp, *args, **kwargs)
[docs] def get_input_data_reader(self): return _get_reader(self.input_data_fp)
@property def feature_matrices(self): try: return self.config["feature_matrices"] except Exception: return []
[docs] def add_feature_matrix(self, feature_matrix, name): """Add feature matrix to project file. Parameters ---------- feature_matrix: numpy.ndarray, scipy.sparse.csr.csr_matrix The feature matrix to add to the project file. name: str Name of the feature extractor. """ file_name = f"{name}_feature_matrix" file_path = Path(self.project_path, self.PATH_FEATURE_MATRICES, file_name) if sp.issparse(feature_matrix): sp.save_npz(str(file_path), feature_matrix) file_name += ".npz" elif isinstance(feature_matrix, np.ndarray): np.save(file_path, feature_matrix) file_name += ".npy" elif isinstance(feature_matrix, list): np.save(file_path, np.array(feature_matrix)) file_name += ".npy" else: raise ValueError("Unsupported feature matrix type") # Add the feature matrix to the project config. config = self.config feature_matrix_config = { "id": name, "filename": file_name, } # Add container for feature matrices. if "feature_matrices" not in config: config["feature_matrices"] = [] config["feature_matrices"].append(feature_matrix_config) self.config = config
[docs] def get_feature_matrix(self, name): """Get the feature matrix from the project file. Parameters ---------- name : str Name of the feature extractor for which to get the cached matrix. Returns ------- numpy.ndarray, scipy.sparse: (Sparse) feature matrix. """ feature_matrix_config = [ x for x in self.config["feature_matrices"] if x["id"] == name ] if len(feature_matrix_config) == 0: raise ValueError("Feature matrix not found") file_path = Path( self.project_path, self.PATH_FEATURE_MATRICES, feature_matrix_config[0]["filename"], ) if file_path.suffix == ".npz": return sp.load_npz(str(file_path)) elif file_path.suffix == ".npy": return np.load(file_path, allow_pickle=False) else: raise ValueError("Unsupported file extension")
@property def review(self): return self.config.get("review")
[docs] def add_review( self, cycle=None, reviewer=None, status="setup", ): """Add new review metadata. Parameters ---------- cycle: An active learning cycle object to add to the review. This object is used to store the configuration of the active learning cycle to file. reviewer: object A reviewer object with to_sql() method. status: str The status of the review. One of 'setup', 'running', 'finished'. """ if self.review is not None: raise ValueError("Review already exists.") self.update_review(model=cycle, status=status) if reviewer is not None: reviewer.to_sql(self.db_path) return self.config
[docs] def update_review(self, status=None, model_name=None, model=None): """Update review metadata.""" review_config = self.config.get("review", {"status": "setup", "model": {}}) if status is not None: review_config["status"] = status if model is not None: if not isinstance( model, (ActiveLearningCycle, ActiveLearningCycleData, dict) ): raise ValueError( "model should be of type 'dict', 'ActiveLearningCycle' or " "'ActiveLearningCycleData'" ) if isinstance(model, ActiveLearningCycle): model = model.to_meta() if isinstance(model, ActiveLearningCycleData): model = asdict(model) review_config["model"]["current_value"] = model if model_name is not None: review_config["model"]["name"] = model_name self.update_config(review=review_config)
[docs] def get_model_config(self): """Get the current model configuration of the review. Returns ------- dict | None Dictionary containing the model configuration. Returns None if there is no review yet in the project. """ return self.config.get("review", {}).get("model", {}).get("current_value")
[docs] def export(self, export_fp): if Path(export_fp).suffix != ".asreview": raise ValueError("Export file should have .asreview extension.") if Path(export_fp) == Path(self.project_path): raise ValueError("export_fp should not be identical to project path.") export_fp_tmp = Path(export_fp).with_suffix(".asreview.zip") # copy the source tree, but ignore pickle files shutil.copytree( self.project_path, export_fp_tmp, ignore=shutil.ignore_patterns("tmp", "*.lock"), ) # create the archive shutil.make_archive(export_fp_tmp, "zip", root_dir=export_fp_tmp) # remove the unzipped folder and move zip shutil.rmtree(export_fp_tmp) shutil.move(f"{export_fp_tmp}.zip", export_fp)
[docs] @classmethod def load( cls, asreview_file, project_path, safe_import=False, reset_model_if_not_found=False, ): with tempfile.TemporaryDirectory() as tmpdir: try: # Unzip the project file with zipfile.ZipFile(asreview_file, "r") as zip_obj: zip_filenames = zip_obj.namelist() # raise error if no ASReview project file if cls.PATH_CONFIG not in zip_filenames: raise ValueError("Project file is not valid project.") # extract all files to folder for f in zip_filenames: if not (f.endswith(".pickle") or f.endswith(".lock")): zip_obj.extract(f, path=tmpdir) except zipfile.BadZipFile: raise ValueError("File is not an ASReview file.") with open(Path(tmpdir, cls.PATH_CONFIG)) as f: project_config = json.load(f) # if migration is needed, do it here current_version = detect_version(project_config) if current_version < cls.VERSION: migrate_project(tmpdir, current_version, cls.VERSION) # Migration may have updated project_config, so we reload it. with open(Path(tmpdir, cls.PATH_CONFIG)) as f: project_config = json.load(f) if reset_model_if_not_found: try: model_config = ( project_config.get("review", {}) .get("model", {}) .get("current_value") ) ActiveLearningCycle.from_meta( ActiveLearningCycleData(**model_config) ) except ValueError as err: warnings.warn(str(err)) model = get_ai_config() project_config["review"]["model"] = { "name": model["name"], "current_value": asdict(model["value"]), } with open(Path(tmpdir, cls.PATH_CONFIG), "w") as f: json.dump(project_config, f) if safe_import: # assign a new id to the project. project_config["id"] = uuid4().hex with open(Path(tmpdir, cls.PATH_CONFIG), "w") as f: json.dump(project_config, f) shutil.copytree(tmpdir, Path(project_path, project_config["id"])) return cls(Path(project_path, project_config["id"]))
[docs] def get_review_error(self): if self.error_path.exists(): with open(self.error_path, "r") as f: return json.load(f) else: raise ValueError("No error found.")
[docs] def set_review_error(self, err): err_type = type(err).__name__ with open(self.error_path, "w") as f: json.dump( { "message": f"{err_type}: {err}", "type": f"{err_type}", "time": int(time.time()), "traceback": traceback.format_exc(), }, f, )
[docs] def remove_review_error(self): self.error_path.unlink(missing_ok=True)