Source code for

# Copyright 2019-2020 The ASReview Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC
from abc import abstractmethod
import warnings

import numpy as np

from asreview.models.balance.simple import SimpleBalance
from asreview.config import DEFAULT_N_INSTANCES, LABEL_NA
from asreview.models.feature_extraction.tfidf import Tfidf
from asreview.models.classifiers import NaiveBayesClassifier
from asreview.models.query.max import MaxQuery
from asreview.models.query.random import RandomQuery
from asreview.settings import ASReviewSettings
from asreview.state.utils import open_state

def get_pool_idx(X, train_idx):
    "Return pool indices from training indices."
    return np.delete(np.arange(X.shape[0]), train_idx, axis=0)

def _merge_prior_knowledge(included, excluded, return_labels=True):
    """Merge prior included and prior excluded.

    included: numpy.ndarray
        Array of indices which should be included.
    excluded: numpy.ndarray
        Array of indices which should be excluded.
    return_labels: bool
        Return the labels of the merged priors.

        Merged prior inclusions + exclusions.
        (Optional) labels of the merged priors.

    if included is None:
        included = []
    if excluded is None:
        excluded = []

    prior_indices = np.array(np.append(included, excluded),

    if return_labels:
        prior_included_labels = np.ones((len(included), ), dtype=int)
        prior_excluded_labels = np.zeros((len(excluded), ), dtype=int)

        labels = np.concatenate([prior_included_labels, prior_excluded_labels])
        return prior_indices, labels
    return prior_indices

[docs]class BaseReview(ABC): """Base class for Systematic Review. Arguments --------- as_data: asreview.ASReviewData The data object which contains the text, labels, etc. model: BaseModel Initialized model to fit the data during active learning. See for possible models. query_model: BaseQueryModel Initialized model to query new instances for review, such as random sampling or max sampling. See for query models. balance_model: BaseBalanceModel Initialized model to redistribute the training data during the active learning process. They might either resample or undersample specific papers. feature_model: BaseFeatureModel Feature extraction model that converts texts and keywords to feature matrices. n_papers: int Number of papers to review during the active learning process, excluding the number of initial priors. To review all papers, set n_papers to None. n_instances: int Number of papers to query at each step in the active learning process. n_queries: int Number of steps/queries to perform. Set to None for no limit. start_idx: numpy.ndarray Start the simulation/review with these indices. They are assumed to be already labeled. Failing to do so might result bad behaviour. state_file: str Path to state file. Replaces log_file argument. """ name = "base" def __init__( self, as_data, model=None, query_model=None, balance_model=None, feature_model=None, n_papers=None, n_instances=DEFAULT_N_INSTANCES, n_queries=None, start_idx=[], state_file=None, log_file=None, ): """Initialize base class for systematic reviews.""" super(BaseReview, self).__init__() # Default to Naive Bayes model if model is None: model = NaiveBayesClassifier() if query_model is None: query_model = MaxQuery() if balance_model is None: balance_model = SimpleBalance() if feature_model is None: feature_model = Tfidf() self.as_data = as_data self.y = as_data.labels if self.y is None: self.y = np.full(len(as_data), LABEL_NA) self.model = model self.balance_model = balance_model self.query_model = query_model self.feature_model = feature_model self.shared = {"query_src": {}, "current_queries": {}} self.model.shared = self.shared self.query_model.shared = self.shared self.balance_model.shared = self.shared self.n_papers = n_papers self.n_instances = n_instances self.n_queries = n_queries self.start_idx = start_idx if log_file is not None: warnings.warn( "The log_file argument for BaseReview will be" " replaced by state_file.", category=FutureWarning) self.state_file = log_file else: self.state_file = state_file self.query_i = 0 self.query_i_classified = 0 self.train_idx = np.array([], self.model_trained = False # Restore the state from a file or initialize said file. with open_state(self.state_file) as state: # From file if not state.is_empty(): startup = state.startup_vals() # If there are start indices not in the training add them. if not set(startup["train_idx"]) >= set(start_idx): new_idx = list(set(start_idx) - set(startup["train_idx"])) self.classify(new_idx, self.y[new_idx], state, method="initial") startup = state.startup_vals() self.train_idx = startup["train_idx"] self.y = startup["labels"] self.shared["query_src"] = startup["query_src"] self.query_i = startup["query_i"] self.query_i_classified = startup["query_i_classified"] # From scratch else: state.set_labels(self.y) state.settings = self.settings self.classify(start_idx, self.y[start_idx], state, method="initial") self.query_i_classified = len(start_idx) # Try to retrieve feature matrix from the state file. try: self.X = state.get_feature_matrix(as_data.hash()) except KeyError: self.X = feature_model.fit_transform(as_data.texts, as_data.headings, as_data.bodies, as_data.keywords) state._add_as_data(as_data, feature_matrix=self.X) if self.X.shape[0] != len(self.y): raise ValueError("The state file does not correspond to the " "given data file, please use another state " "file or dataset.") self.load_current_query(state) @property def settings(self): """Get an ASReview settings object""" extra_kwargs = {} if hasattr(self, 'n_prior_included'): extra_kwargs['n_prior_included'] = self.n_prior_included if hasattr(self, 'n_prior_excluded'): extra_kwargs['n_prior_excluded'] = self.n_prior_excluded return ASReviewSettings(,,,,, n_instances=self.n_instances, n_queries=self.n_queries, n_papers=self.n_papers, model_param=self.model.param, query_param=self.query_model.param, balance_param=self.balance_model.param, feature_param=self.feature_model.param, data_name=self.as_data.data_name, **extra_kwargs) @abstractmethod def _get_labels(self, ind): """Classify the provided indices.""" pass def _stop_iter(self, query_i, n_pool): """Criteria for stopping iteration. Stop iterating if: - n_queries is reached - the pool is empty """ stop_iter = False n_train = self.X.shape[0] - n_pool # if the pool is empty, always stop if n_pool == 0: stop_iter = True # If we are exceeding the number of papers, stop. if self.n_papers is not None and n_train >= self.n_papers: stop_iter = True # don't stop if there is no stopping criteria if self.n_queries is not None and query_i >= self.n_queries: stop_iter = True return stop_iter
[docs] def n_pool(self): """Number of indices left in the pool. Returns ------- int: Number of indices left in the pool. """ return self.X.shape[0] - len(self.train_idx)
def _next_n_instances(self): # Could be merged with _stop_iter someday. """Get the batch size for the next query.""" n_instances = self.n_instances n_pool = self.n_pool() n_instances = min(n_instances, n_pool) if self.n_papers is not None: papers_left = self.n_papers - len(self.train_idx) n_instances = min(n_instances, papers_left) return n_instances def _do_review(self, state, stop_after_class=True, instant_save=False): if self._stop_iter(self.query_i, self.n_pool()): return # train the algorithm with prior knowledge self.train() self.log_probabilities(state) n_pool = self.X.shape[0] - len(self.train_idx) while not self._stop_iter(self.query_i - 1, n_pool): # STEP 1: Make a new query query_idx = self.query(n_instances=self._next_n_instances()) self.log_current_query(state) # STEP 2: Classify the queried papers. if instant_save: for idx in query_idx: idx_array = np.array([idx], self.classify(idx_array, self._get_labels(idx_array), state) self.query_i_classified += 1 else: self.classify(query_idx, self._get_labels(query_idx), state) self.query_i_classified += len(query_idx) # Option to stop after the classification set instead of training. if (stop_after_class and self._stop_iter(self.query_i, self.n_pool())): break # STEP 3: Train the algorithm with new data # Update the training data and pool afterwards self.train() self.log_probabilities(state)
[docs] def review(self, *args, **kwargs): """Do the systematic review, writing the results to the state file. Arguments --------- stop_after_class: bool When to stop; if True stop after classification step, otherwise stop after training step. instant_save: bool If True, save results after each single classification. """ with open_state(self.state_file) as state: self._do_review(state, *args, **kwargs)
[docs] def log_probabilities(self, state): """Store the modeling probabilities of the training indices and pool indices.""" if not self.model_trained: return pool_idx = get_pool_idx(self.X, self.train_idx) # Log the probabilities of samples in the pool being included. pred_proba = self.shared.get('pred_proba', np.array([])) if len(pred_proba) == 0: pred_proba = self.model.predict_proba(self.X) self.shared['pred_proba'] = pred_proba proba_1 = np.array([x[1] for x in pred_proba]) state.add_proba(pool_idx, self.train_idx, proba_1, self.query_i)
def log_current_query(self, state): state.set_current_queries(self.shared["current_queries"]) def load_current_query(self, state): try: self.shared["current_queries"] = state.get_current_queries() except KeyError: self.shared["current_queries"] = {}
[docs] def query(self, n_instances, query_model=None): """Query records from pool. Arguments --------- n_instances: int Batch size of the queries, i.e. number of records to be queried. query_model: BaseQueryModel Query strategy model to use. If None, the query model of the reviewer is used. Returns ------- numpy.ndarray: Indices of records queried. """ pool_idx = get_pool_idx(self.X, self.train_idx) n_instances = min(n_instances, len(pool_idx)) # If the model is not trained, choose random papers. if not self.model_trained and query_model is None: query_model = RandomQuery() if not self.model_trained: classifier = None else: classifier = self.model if query_model is None: query_model = self.query_model # Make a query from the pool. query_idx, _ = query_model.query( X=self.X, classifier=classifier, pool_idx=pool_idx, n_instances=n_instances, shared=self.shared, ) return query_idx
[docs] def classify(self, query_idx, inclusions, state, method=None): """Classify new papers and update the training indices. It automaticaly updates the state. Arguments --------- query_idx: list, numpy.ndarray Indices to classify. inclusions: list, numpy.ndarray Labels of the query_idx. state: BaseLogger Logger to store the classification in. method: str If not set to None, all inclusions have this query method. """ query_idx = np.array(query_idx, self.y[query_idx] = inclusions query_idx = query_idx[np.isin(query_idx, self.train_idx, invert=True)] self.train_idx = np.append(self.train_idx, query_idx) if method is None: # Find query method in current queries. methods = [] for idx in query_idx: method = self.shared["current_queries"].pop(idx, None) if method is None: method = "unknown" methods.append(method) if method in self.shared["query_src"]: self.shared["query_src"][method].append(idx) else: self.shared["query_src"][method] = [idx] else: methods = np.full(len(query_idx), method) if method in self.shared["query_src"]: self.shared["query_src"][method].extend(query_idx.tolist()) else: self.shared["query_src"][method] = query_idx.tolist() state.add_classification(query_idx, inclusions, methods=methods, query_i=self.query_i) state.set_labels(self.y)
[docs] def train(self): """Train the model.""" num_zero = np.count_nonzero(self.y[self.train_idx] == 0) num_one = np.count_nonzero(self.y[self.train_idx] == 1) if num_zero == 0 or num_one == 0: return # Get the training data. X_train, y_train = self.balance_model.sample(self.X, self.y, self.train_idx, shared=self.shared) # Train the model on the training data. X=X_train, y=y_train, ) self.shared["pred_proba"] = self.model.predict_proba(self.X) self.model_trained = True if self.query_i_classified > 0: self.query_i += 1 self.query_i_classified = 0
[docs] def statistics(self): """Get statistics on the current state of the review. Returns ------- dict: A dictonary with statistics like n_included and last_inclusion. """ try: n_initial = len(self.shared['query_src']['initial']) except KeyError: n_initial = 0 try: if np.count_nonzero(self.y[self.train_idx[n_initial:]] == 1) == 0: last_inclusion = len(self.train_idx[n_initial:]) else: last_inclusion = np.nonzero( self.y[self.train_idx[n_initial:]][::-1] == 1)[0][0] except ValueError: last_inclusion = 0 stats = { "n_included": np.count_nonzero(self.y[self.train_idx] == 1), "n_excluded": np.count_nonzero(self.y[self.train_idx] == 0), "n_papers": len(self.y), "n_reviewed": len(self.train_idx), "n_pool": self.n_pool(), "last_inclusion": last_inclusion, "n_initial": n_initial, } return stats