# Copyright 2019-2022 The ASReview Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = ["EmbeddingLSTM"]
import gzip
import io
import logging
import time
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import cpu_count
from pathlib import Path
from urllib.request import urlopen
import numpy as np
from asreview.models.feature_extraction.base import BaseFeatureExtraction
from asreview.utils import get_data_home
try:
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
except ImportError:
TF_AVAILABLE = False
else:
TF_AVAILABLE = True
def _check_tensorflow():
if not TF_AVAILABLE:
raise ImportError("Install tensorflow package to use" " Embedding LSTM.")
[docs]
class EmbeddingLSTM(BaseFeatureExtraction):
"""Embedding LSTM feature extraction technique (``embedding-lstm``).
Feature extraction technique for
:class:`asreview.models.classifiers.LSTMBaseClassifier` and
:class:`asreview.models.classifiers.LSTMPoolClassifier` models.
.. note::
This feature extraction technique requires ``tensorflow`` to be
installed. Use ``pip install asreview[tensorflow]`` or install all
optional ASReview dependencies with ``pip install asreview[all]``
Arguments
---------
loop_sequence: bool
Instead of zeros at the start/end of sequence loop it.
num_words: int
Maximum number of unique words to be processed.
max_sequence_length: int
Maximum length of the sequence. Shorter get struncated.
Longer sequences get either padded with zeros or looped.
padding: str
Which side should be padded [pre/post].
truncating:
Which side should be truncated [pre/post].
n_jobs:
Number of processors used in reading the embedding matrix.
"""
name = "embedding-lstm"
label = "Embedding LSTM"
def __init__(
self,
*args,
loop_sequence=1,
num_words=20000,
max_sequence_length=1000,
padding="post",
truncating="post",
n_jobs=1,
**kwargs,
):
"""Initialize the embedding matrix feature extraction."""
super(EmbeddingLSTM, self).__init__(*args, **kwargs)
self.embedding = None
self.num_words = num_words
self.max_sequence_length = max_sequence_length
self.padding = padding
self.truncating = truncating
self.n_jobs = n_jobs
self.loop_sequence = loop_sequence
[docs]
def get_embedding_matrix(self, texts, embedding_fp):
_check_tensorflow()
self.fit_transform(texts)
if embedding_fp is None:
embedding_fp = Path(get_data_home(), EMBEDDING_EN["name"]).expanduser()
if not embedding_fp.exists():
logging.warning(
"Warning: will start to download large "
"embedding file in 10 seconds."
)
time.sleep(10)
download_embedding()
logging.info("Loading embedding matrix. " "This can take several minutes.")
embedding = load_embedding(embedding_fp, n_jobs=self.n_jobs)
return sample_embedding(embedding, self.word_index)
[docs]
def full_hyper_space(self):
from hyperopt import hp
hyper_space, hyper_choices = super(EmbeddingLSTM, self).full_hyper_space()
hyper_space.update({"fex_loop_sequences": hp.randint("fex_loop_sequences", 2)})
return hyper_space, hyper_choices
EMBEDDING_EN = {
"url": "https://dl.fbaipublicfiles.com/fasttext/vectors-crawl/cc.en.300.vec.gz", # noqa
"name": "fasttext.cc.en.300.vec",
}
def loop_sequences(X, max_sequence_length=1000):
# Loop the sequences instead of padding.
for i, old_x in enumerate(X):
nz = max_sequence_length - 1
while nz >= 0 and old_x[nz] == 0:
nz -= 1
# If there are only 0's (no data), continue.
if nz < 0:
continue
nz += 1
new_x = old_x.copy()
j = 1
# Copy the old data to the new matrix.
while nz * j < max_sequence_length:
cp_len = min(nz * (j + 1), max_sequence_length) - nz * j
new_x[nz * j : nz * j + cp_len] = old_x[0:cp_len]
j += 1
X[i] = new_x
return X
def text_to_features(
sequences,
loop_sequence=1,
num_words=20000,
max_sequence_length=1000,
padding="post",
truncating="post",
):
"""Convert text data into features.
Arguments
---------
sequences: list, numpy.ndarray, pandas.Series
The sequences to convert into features.
num_words: int
See keras Tokenizer
Returns
-------
np.ndarray, dict
The array with features and the dictiory that maps words to values.
"""
# fit on texts
tokenizer = Tokenizer(num_words=num_words)
tokenizer.fit_on_texts(sequences)
# tokenize sequences
tokens = tokenizer.texts_to_sequences(sequences)
# Pad sequences with zeros.
x = pad_sequences(
tokens, maxlen=max_sequence_length, padding=padding, truncating=truncating
)
if loop_sequence == 1:
x = loop_sequences(x, max_sequence_length)
# word index hack. see issue
# https://github.com/keras-team/keras/issues/8092
word_index = {e: i for e, i in tokenizer.word_index.items() if i <= num_words}
return x, word_index
def _embedding_reader(filename, input_queue, block_size=1000):
"""Process that reads the word embeddings from a file.
Parameters
----------
filename: str
File of trained embedding vectors.
input_queue: Queue
Queue to store jobs in.
block_size: int
Number of lines for each job.
"""
with open(filename, "r", encoding="utf-8", newline="\n") as f:
# Throw away the first line, since we don't care about the dimensions.
f.readline()
i_line = 0
buffer = []
# Read the embedding file line by line.
for line in f:
i_line += 1
buffer.append(line)
# If the buffer is full, write it to the queue.
if i_line == block_size:
input_queue.put(buffer)
i_line = 0
buffer = []
if i_line > 0:
input_queue.put(buffer)
# Put the string "DONE" in the queue, to ensure that the
# worker processes finish.
input_queue.put("DONE")
def _embedding_worker(input_queue, output_queue, emb_vec_dim, word_index=None):
"""Process that reads the word embeddings from a file.
Parameters
----------
input_queue: Queue
Queue in which the jobs are submitted.
output_queue: Queue
Queue to store the embedding in dictionary form.
emb_vec_dim: int
Dimension of each embedding vector.
word_index: dict
Dictionary of the sample embedding.
"""
bad_input = False
bad_values = {}
while True:
embedding = {}
buffer = input_queue.get()
if buffer == "DONE":
break
for line in buffer:
line = line.rstrip()
values = line.split(" ")
if len(values) != emb_vec_dim + 1:
if not bad_input:
print("Error: bad input in embedding vector.")
bad_input = True
bad_values = values
break
word = values[0]
if word_index is not None and word not in word_index:
continue
coefs = values[1 : emb_vec_dim + 1]
# store the results
embedding[word] = np.asarray(coefs, dtype=np.float32)
output_queue.put(embedding)
# We removed the "DONE" from the input queue, so put it back in for
# the other processes.
input_queue.put("DONE")
# Store the results in the output queue
if bad_input:
output_queue.put({"ErrorBadInputValues": bad_values})
output_queue.put("DONE")
def _embedding_aggregator(output_queue, n_worker):
"""Process that aggregates the results of the workers.
This should be the main/original process.
Parameters
----------
output_queue: Queue
This queue is the output queue of the workers.
n_worker: int
The number of worker processes.
Returns
-------
Aggregated embedding dictionary.
"""
embedding = {}
num_done = 0
while num_done < n_worker:
new_embedding = output_queue.get()
if new_embedding == "DONE":
num_done += 1
else:
embedding.update(new_embedding)
return embedding
def download_embedding(
url=EMBEDDING_EN["url"], name=EMBEDDING_EN["name"], data_home=None
):
"""Download word embedding file.
Download word embedding file, unzip the file and save to the
file system.
Parameters
----------
url: str
The URL of the gzipped word embedding file
name: str
The filename of the embedding file.
data_home: str
The location of the ASR datasets.
Default `asreview.utils.get_data_home()`
"""
if data_home is None:
data_home = get_data_home()
out_fp = Path(data_home, name)
logging.info(f"Start downloading: {url}")
r = urlopen(url)
compressed_file = io.BytesIO(r.read())
logging.info(f"Save embedding to {out_fp}")
decompressed_file = gzip.GzipFile(fileobj=compressed_file)
with open(out_fp, "wb") as out_file:
for line in decompressed_file:
out_file.write(line)
def load_embedding(fp, word_index=None, n_jobs=None):
"""Load embedding matrix from file.
The embedding matrix needs to be stored in the
FastText format.
Parameters
----------
fp: str
File path of the trained embedding vectors.
word_index: dict
Sample word embeddings.
n_jobs: int
Number of processes to parse the embedding (+1 process for reading).
verbose: int
The verbosity. Default 1.
Returns
-------
dict:
The embedding weights stored in a dict with the word as key and
the weights as values.
"""
# Maximum number of jobs in the queue.
queue_size = 500
# Set the number of reader processes to use.
if n_jobs is None:
n_jobs = 1
elif n_jobs == -1:
n_jobs = cpu_count() - 1
input_queue = Queue(queue_size)
output_queue = Queue()
with open(fp, "r", encoding="utf-8", newline="\n") as f:
n_words, emb_vec_dim = list(map(int, f.readline().split(" ")))
logging.debug(f"Reading {n_words} vectors with {emb_vec_dim} dimensions.")
worker_procs = []
p = Process(target=_embedding_reader, args=(fp, input_queue), daemon=True)
worker_procs.append(p)
for _ in range(n_jobs):
p = Process(
target=_embedding_worker,
args=(input_queue, output_queue, emb_vec_dim, word_index),
daemon=True,
)
worker_procs.append(p)
# Start workers.
for proc in worker_procs:
proc.start()
embedding = _embedding_aggregator(output_queue, n_jobs)
# Merge dictionaries of workers
# Join workers
for proc in worker_procs:
proc.join()
if "ErrorBadInputValues" in embedding:
bad_values = embedding["ErrorBadInputValues"]
raise ValueError(f"Check embedding matrix, bad format: {bad_values}")
logging.debug(f"Found {len(embedding)} word vectors.")
return embedding
def sample_embedding(embedding, word_index):
"""Sample embedding matrix
Parameters
----------
embedding: dict
A dictionary with the words and embedding vectors.
word_index: dict
A word_index like the output of Keras Tokenizer.word_index.
verbose: int
The verbosity. Default 1.
Returns
-------
(np.ndarray, list):
The embedding weights strored in a two dimensional
numpy array and a list with the corresponding words.
"""
n_words, emb_vec_dim = len(word_index), len(next(iter(embedding.values())))
logging.debug(
f"Creating matrix with {n_words} vectors " f"with dimension {emb_vec_dim}."
)
# n+1 because 0 is preserved in the tokenizing process.
embedding_matrix = np.zeros((n_words + 1, emb_vec_dim))
for word, i in word_index.items():
coefs = embedding.get(word)
if coefs is not None:
embedding_matrix[i] = coefs
logging.debug(f"Shape of embedding matrix: {embedding_matrix.shape}")
return embedding_matrix