# Copyright 2019-2025 The ASReview Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = [
"ProjectError",
"ProjectNotFoundError",
]
import functools
import json
import shutil
import tempfile
import time
import traceback
import warnings
import zipfile
from dataclasses import asdict
from pathlib import Path
from urllib.request import urlretrieve
from uuid import uuid4
import jsonschema
import numpy as np
import scipy.sparse as sp
from filelock import FileLock
from asreview.data.loader import _from_file
from asreview.data.loader import _get_reader
from asreview.data.utils import identify_record_groups
from asreview.database.database import Database
from asreview.datasets import DatasetManager
from asreview.learner import ActiveLearningCycle
from asreview.learner import ActiveLearningCycleData
from asreview.models import get_ai_config
from asreview.project.exceptions import ProjectError
from asreview.project.exceptions import ProjectNotFoundError
from asreview.project.migration import detect_version
from asreview.project.migration import migrate_project
from asreview.project.schema import SCHEMA
from asreview.utils import _get_filename_from_url
from asreview.utils import _is_url
try:
from asreview._version import __version__
except ImportError:
__version__ = "0.0.0"
[docs]
def is_project(project_dir):
"""
Check if the given path is a valid ASReview project.
Parameters
----------
project_dir : str | Path
The path to the project directory.
Returns
-------
bool
True if the path is a valid ASReview project, False otherwise.
"""
project_dir = Path(project_dir)
if not project_dir.exists():
return False
if not project_dir.is_dir():
return False
project_config_fp = Path(project_dir, Project.PATH_CONFIG)
if not project_config_fp.exists():
return False
with open(project_config_fp) as f:
project_config = json.load(f)
if detect_version(project_config) != Project.VERSION:
return False
return True
[docs]
class Project:
"""Project class for ASReview project files.
This class represents the complete data file for a review project. This data is
contained in a single directory with the following files and subdirectories:
- `project.json`: A JSON file containing the configuration and metadata of the
project. It's structure is described in `schema.py`
- `data/`: A directory containing the input data file exactly as provided by the
user. When exporting, this input data is merged with the results of the review to
get the export file.
- `feature_matrices/`: A directory containing all the feature matrices that are
generated during the review.
- `results.db`: An SQLite database containing all data generated by ASReview: the
data parsed from the input file, the labeled records, the last model ranking etc.
See `asreview/data` for information on the parsing of the input. See
`asreview/state` for information on the model and the labeling decisions.
"""
VERSION = 3
MODE_SIMULATE = "simulate"
PATH_CONFIG = "project.json"
PATH_CONFIG_LOCK = "project.json.lock"
PATH_FEATURE_MATRICES = "feature_matrices"
PATH_DATA_DIR = "data"
PATH_DB = "results.db"
PATH_ERROR = "error.json"
def __init__(self, project_path, project_id=None):
self.project_path = Path(project_path)
self.project_id = project_id
self.data_dir = Path(self.project_path, self.PATH_DATA_DIR)
self.db_path = Path(self.project_path, self.PATH_DB)
self.error_path = Path(self.project_path, self.PATH_ERROR)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs]
def close(self):
"""Close the project and release all resources.
Closes the database connection if it was opened. Safe to call multiple
times.
"""
if "db" in self.__dict__:
self.db.close()
@functools.cached_property
def db(self):
return Database(self.db_path)
@property
def input_data_fp(self):
datasets = self.config.get("datasets")
if not datasets:
return
file_name = datasets[0]["name"]
return self.data_dir / file_name
[docs]
@classmethod
def create(
cls,
project_path,
project_id=None,
project_mode="oracle",
project_name=None,
project_tags=None,
):
"""Initialize the necessary files specific to the web app."""
project_path = Path(project_path)
if project_path.exists():
raise ValueError("Project path is not empty.")
if project_id is None:
project_id = project_path.stem
if project_name is None:
project_name = project_path.stem
if project_path.is_dir():
raise IsADirectoryError(f"Project folder {project_path} already exists.")
try:
project_path.mkdir(parents=True, exist_ok=True)
Path(project_path, "data").mkdir(exist_ok=True)
Path(project_path, cls.PATH_FEATURE_MATRICES).mkdir(exist_ok=True)
with Database(Path(project_path, cls.PATH_DB)) as database:
database.create_tables()
config = {
"version": __version__,
"id": project_id,
"mode": project_mode,
"name": project_name,
"created_at_unix": int(time.time()),
"feature_matrices": [],
"tags": project_tags,
"project_file_version": cls.VERSION,
}
jsonschema.validate(instance=config, schema=SCHEMA)
project_fp = Path(project_path, cls.PATH_CONFIG)
project_fp_lock = Path(project_path, cls.PATH_CONFIG_LOCK)
lock = FileLock(project_fp_lock, timeout=3)
with lock:
with open(project_fp, "w") as f:
json.dump(config, f)
except Exception as err:
shutil.rmtree(project_path)
raise err
return cls(project_path, project_id=project_id)
@property
def config(self):
try:
return self._config
except AttributeError:
project_fp = Path(self.project_path, self.PATH_CONFIG)
project_fp_lock = Path(self.project_path, self.PATH_CONFIG_LOCK)
lock = FileLock(project_fp_lock, timeout=3)
if not project_fp.exists():
raise ProjectNotFoundError(f"Project '{self.project_path}' not found")
with lock:
# read the file with project info
with open(project_fp) as fp:
config = json.load(fp)
self._config = config
return config
@config.setter
def config(self, config):
project_fp = Path(self.project_path, self.PATH_CONFIG)
project_fp_lock = Path(self.project_path, self.PATH_CONFIG_LOCK)
lock = FileLock(project_fp_lock, timeout=3)
with lock:
with open(project_fp, "w") as f:
json.dump(config, f)
self._config = config
[docs]
def update_config(self, **kwargs):
"""Update project info"""
config = self.config
config.update(kwargs.copy())
jsonschema.validate(instance=config, schema=SCHEMA)
self.config = config
return config
[docs]
def add_dataset(self, fp, dataset_id=None, file_writer=None):
"""Add a dataset to the project file.
Parameters
----------
fp: str, Path
Filepath to the dataset. It will be copied to the correct location in the
project file.
"""
if dataset_id is None:
dataset_id = uuid4().hex
self.data_dir.mkdir(exist_ok=True)
if file_writer is not None:
save_fp = self.data_dir / fp
file_writer(save_fp)
elif _is_url(fp):
filename = _get_filename_from_url(fp)
save_fp = self.data_dir / filename
urlretrieve(fp, save_fp)
elif Path(fp).exists():
save_fp = self.data_dir / Path(fp).name
shutil.copy(fp, save_fp)
else:
dataset = DatasetManager().find(fp)
if dataset is None:
raise ValueError(
"fp should be existing file, or URL or dataset, but does not"
f" exist: {fp}"
)
save_fp = self.data_dir / dataset.filename
dataset.to_file(save_fp)
file_name = save_fp.name
records = _from_file(save_fp, dataset_id=dataset_id)
# Internals of the records are leaking out here. We are checking for a specific
# field and a specific value. If the presence of the field `included` is
# necessary in the input data, we should move it from `Record` to the `Base`
# class, so that all record implementations have it.
if self.config["mode"] == self.MODE_SIMULATE and (
all([r.included is None for r in records])
):
raise ValueError(
"Dataset for simulation mode must have labels for all records - "
"got dataset without any labels"
)
if self.config["mode"] == self.MODE_SIMULATE and (
any([r.included is None for r in records])
):
raise ValueError(
"Dataset for simulation mode must be fully labeled - "
"got records with missing labels"
)
self.db.input.add_records(records=records)
groups = identify_record_groups(records)
self.db.input.set_groups(groups)
# This config update assumes that the project only has one dataset.
self.update_config(
name=file_name.rsplit(".", 1)[0],
datasets=[{"id": dataset_id, "name": file_name}],
)
[docs]
def label_priors(self):
"""Label prior knowledge from a partially labeled dataset.
If the input dataset is partially labeled (some records have an
``included`` value of 0 or 1 while others are unlabeled), the labeled
records are stored as prior knowledge in the results table.
Fully labeled or fully unlabeled datasets are skipped.
"""
# Fetch record_id and included together so the positional alignment is
# guaranteed. Querying them separately is not safe: SQLite does not
# guarantee the same row order across separate queries without ORDER BY,
# which can cause labels to be assigned to the wrong records.
data = self.db.input[["record_id", "included"]]
labeled = data[data["included"].notnull()]
if 0 < len(labeled) < len(data):
with self.db as db:
for row in labeled.itertuples(index=False):
db.label_record(int(row.record_id), int(row.included), user_id=None)
[docs]
def remove_dataset(self):
"""Remove dataset from project."""
raise NotImplementedError("Removing datasets is not implemented yet")
@property
def feature_matrices(self):
try:
return self.config["feature_matrices"]
except Exception:
return []
[docs]
def add_feature_matrix(self, feature_matrix, name):
"""Add feature matrix to project file.
Parameters
----------
feature_matrix: numpy.ndarray, scipy.sparse.csr.csr_matrix
The feature matrix to add to the project file.
name: str
Name of the feature extractor.
"""
file_name = f"{name}_feature_matrix"
file_path = Path(self.project_path, self.PATH_FEATURE_MATRICES, file_name)
if sp.issparse(feature_matrix):
sp.save_npz(str(file_path), feature_matrix)
file_name += ".npz"
elif isinstance(feature_matrix, np.ndarray):
np.save(file_path, feature_matrix)
file_name += ".npy"
elif isinstance(feature_matrix, list):
np.save(file_path, np.array(feature_matrix))
file_name += ".npy"
else:
raise ValueError("Unsupported feature matrix type")
# Add the feature matrix to the project config.
config = self.config
feature_matrix_config = {
"id": name,
"filename": file_name,
}
# Add container for feature matrices.
if "feature_matrices" not in config:
config["feature_matrices"] = []
config["feature_matrices"].append(feature_matrix_config)
self.config = config
[docs]
def get_feature_matrix(self, name):
"""Get the feature matrix from the project file.
Parameters
----------
name : str
Name of the feature extractor for which to get the cached matrix.
Returns
-------
numpy.ndarray, scipy.sparse:
(Sparse) feature matrix.
"""
feature_matrix_config = [
x for x in self.config["feature_matrices"] if x["id"] == name
]
if len(feature_matrix_config) == 0:
raise ValueError("Feature matrix not found")
file_path = Path(
self.project_path,
self.PATH_FEATURE_MATRICES,
feature_matrix_config[0]["filename"],
)
if file_path.suffix == ".npz":
return sp.load_npz(str(file_path))
elif file_path.suffix == ".npy":
return np.load(file_path, allow_pickle=False)
else:
raise ValueError("Unsupported file extension")
@property
def review(self):
return self.config.get("review")
[docs]
def add_review(
self,
cycle=None,
reviewer=None,
status="setup",
):
"""Add new review metadata.
Parameters
----------
cycle:
An active learning cycle object to add to the review. This object is used
to store the configuration of the active learning cycle to file.
reviewer: object
A reviewer object with to_sql() method.
status: str
The status of the review. One of 'setup', 'running',
'finished'.
"""
if self.review is not None:
raise ValueError("Review already exists.")
self.update_review(model=cycle, status=status)
if reviewer is not None:
reviewer.to_sql(self.db_path)
return self.config
[docs]
def update_review(self, status=None, model_name=None, model=None):
"""Update review metadata."""
review_config = self.config.get("review", {"status": "setup", "model": {}})
if status is not None:
review_config["status"] = status
if model is not None:
if not isinstance(
model, (ActiveLearningCycle, ActiveLearningCycleData, dict)
):
raise ValueError(
"model should be of type 'dict', 'ActiveLearningCycle' or "
"'ActiveLearningCycleData'"
)
if isinstance(model, ActiveLearningCycle):
model = model.to_meta()
if isinstance(model, ActiveLearningCycleData):
model = asdict(model)
review_config["model"]["current_value"] = model
if model_name is not None:
review_config["model"]["name"] = model_name
self.update_config(review=review_config)
[docs]
def get_model_config(self):
"""Get the current model configuration of the review.
Returns
-------
dict | None
Dictionary containing the model configuration. Returns None if there is no
review yet in the project.
"""
return self.config.get("review", {}).get("model", {}).get("current_value")
[docs]
def export(self, export_fp):
if Path(export_fp).suffix != ".asreview":
raise ValueError("Export file should have .asreview extension.")
if Path(export_fp) == Path(self.project_path):
raise ValueError("export_fp should not be identical to project path.")
export_fp_tmp = Path(export_fp).with_suffix(".asreview.zip")
# copy the source tree, but ignore pickle files
shutil.copytree(
self.project_path,
export_fp_tmp,
ignore=shutil.ignore_patterns("tmp", "*.lock"),
)
# create the archive
shutil.make_archive(export_fp_tmp, "zip", root_dir=export_fp_tmp)
# remove the unzipped folder and move zip
shutil.rmtree(export_fp_tmp)
shutil.move(f"{export_fp_tmp}.zip", export_fp)
[docs]
@classmethod
def load(
cls,
asreview_file,
project_path,
safe_import=False,
reset_model_if_not_found=False,
):
with tempfile.TemporaryDirectory() as tmpdir:
try:
# Unzip the project file
with zipfile.ZipFile(asreview_file, "r") as zip_obj:
zip_filenames = zip_obj.namelist()
# raise error if no ASReview project file
if cls.PATH_CONFIG not in zip_filenames:
raise ValueError("Project file is not valid project.")
# extract all files to folder
for f in zip_filenames:
if not (f.endswith(".pickle") or f.endswith(".lock")):
zip_obj.extract(f, path=tmpdir)
except zipfile.BadZipFile:
raise ValueError("File is not an ASReview file.")
with open(Path(tmpdir, cls.PATH_CONFIG)) as f:
project_config = json.load(f)
# if migration is needed, do it here
current_version = detect_version(project_config)
if current_version < cls.VERSION:
migrate_project(tmpdir, current_version, cls.VERSION)
# Migration may have updated project_config, so we reload it.
with open(Path(tmpdir, cls.PATH_CONFIG)) as f:
project_config = json.load(f)
if reset_model_if_not_found:
try:
model_config = (
project_config.get("review", {})
.get("model", {})
.get("current_value")
)
ActiveLearningCycle.from_meta(
ActiveLearningCycleData(**model_config)
)
except ValueError as err:
warnings.warn(str(err))
model = get_ai_config()
project_config["review"]["model"] = {
"name": model["name"],
"current_value": asdict(model["value"]),
}
with open(Path(tmpdir, cls.PATH_CONFIG), "w") as f:
json.dump(project_config, f)
if safe_import:
# assign a new id to the project.
project_config["id"] = uuid4().hex
with open(Path(tmpdir, cls.PATH_CONFIG), "w") as f:
json.dump(project_config, f)
shutil.copytree(tmpdir, Path(project_path, project_config["id"]))
return cls(Path(project_path, project_config["id"]))
[docs]
def get_review_error(self):
if self.error_path.exists():
with open(self.error_path, "r") as f:
return json.load(f)
else:
raise ValueError("No error found.")
[docs]
def set_review_error(self, err):
err_type = type(err).__name__
with open(self.error_path, "w") as f:
json.dump(
{
"message": f"{err_type}: {err}",
"type": f"{err_type}",
"time": int(time.time()),
"traceback": traceback.format_exc(),
},
f,
)
[docs]
def remove_review_error(self):
self.error_path.unlink(missing_ok=True)