1
Fork 0
mirror of https://github.com/Steffo99/unimore-bda-6.git synced 2024-11-22 16:04:18 +00:00
bda-6-steffo/unimore_bda_6/analysis/tf_text.py
2023-02-08 19:46:05 +01:00

145 lines
5.7 KiB
Python

import tensorflow
import logging
from ..database import Text, Category, CachedDatasetFunc
from ..config import TENSORFLOW_EMBEDDING_SIZE, TENSORFLOW_MAX_FEATURES, TENSORFLOW_EPOCHS
from ..tokenizer import BaseTokenizer
from .base import BaseSentimentAnalyzer, AlreadyTrainedError, NotTrainedError, TrainingFailedError
log = logging.getLogger(__name__)
if len(tensorflow.config.list_physical_devices(device_type="GPU")) == 0:
log.warning("Tensorflow reports no GPU acceleration available.")
else:
log.debug("Tensorflow successfully found GPU acceleration!")
class TensorflowSentimentAnalyzer(BaseSentimentAnalyzer):
def __init__(self, *, tokenizer: BaseTokenizer):
if not tokenizer.supports_tensorflow():
raise TypeError("Tokenizer does not support Tensorflow")
super().__init__(tokenizer=tokenizer)
self.trained: bool = False
self.text_vectorization_layer: tensorflow.keras.layers.TextVectorization = self._build_vectorizer(tokenizer)
self.model: tensorflow.keras.Sequential = self._build_model()
self.history: tensorflow.keras.callbacks.History | None = None
@staticmethod
def _build_dataset(dataset_func: CachedDatasetFunc) -> tensorflow.data.Dataset:
"""
Convert a `CachedDatasetFunc` to a `tensorflow.data.Dataset`.
"""
def dataset_func_with_tensor_tuple():
for review in dataset_func():
yield review.to_tensor_tuple()
log.debug("Creating dataset...")
dataset = tensorflow.data.Dataset.from_generator(
dataset_func_with_tensor_tuple,
output_signature=(
tensorflow.TensorSpec(shape=(), dtype=tensorflow.string, name="text"),
tensorflow.TensorSpec(shape=(1, 5,), dtype=tensorflow.float32, name="category"),
)
)
log.debug("Caching dataset...")
dataset = dataset.cache()
log.debug("Configuring dataset prefetch...")
dataset = dataset.prefetch(buffer_size=tensorflow.data.AUTOTUNE)
return dataset
@staticmethod
def _build_model() -> tensorflow.keras.Sequential:
log.debug("Creating model...")
model = tensorflow.keras.Sequential([
tensorflow.keras.layers.Embedding(
input_dim=TENSORFLOW_MAX_FEATURES.__wrapped__ + 1,
output_dim=TENSORFLOW_EMBEDDING_SIZE.__wrapped__,
),
tensorflow.keras.layers.Dropout(0.25),
tensorflow.keras.layers.GlobalAveragePooling1D(),
tensorflow.keras.layers.Dropout(0.25),
tensorflow.keras.layers.Dense(25),
tensorflow.keras.layers.Dense(5, activation="softmax"),
])
log.debug("Compiling model: %s", model)
model.compile(
optimizer=tensorflow.keras.optimizers.Adam(global_clipnorm=1.0),
loss=tensorflow.keras.losses.CategoricalCrossentropy(),
metrics=[
tensorflow.keras.metrics.CategoricalAccuracy(),
]
)
log.debug("Compiled model: %s", model)
return model
@staticmethod
def _build_vectorizer(tokenizer: BaseTokenizer) -> tensorflow.keras.layers.TextVectorization:
return tensorflow.keras.layers.TextVectorization(
standardize=tokenizer.tokenize_tensorflow,
max_tokens=TENSORFLOW_MAX_FEATURES.__wrapped__
)
def train(self, training_dataset_func: CachedDatasetFunc, validation_dataset_func: CachedDatasetFunc) -> None:
if self.trained:
log.error("Tried to train an already trained model.")
raise AlreadyTrainedError()
log.debug("Building datasets...")
training_set = self._build_dataset(training_dataset_func)
validation_set = self._build_dataset(validation_dataset_func)
log.debug("Built dataset: %s", training_set)
log.debug("Preparing training_set for %s...", self.text_vectorization_layer.adapt)
only_text_set = training_set.map(lambda text, category: text)
log.debug("Adapting text_vectorization_layer: %s", self.text_vectorization_layer)
self.text_vectorization_layer.adapt(only_text_set)
log.debug("Adapted text_vectorization_layer: %s", self.text_vectorization_layer)
log.debug("Preparing training_set for %s...", self.model.fit)
training_set = training_set.map(lambda text, category: (self.text_vectorization_layer(text), category))
validation_set = validation_set.map(lambda text, category: (self.text_vectorization_layer(text), category))
log.info("Training: %s", self.model)
self.history: tensorflow.keras.callbacks.History | None = self.model.fit(
training_set,
validation_data=validation_set,
epochs=TENSORFLOW_EPOCHS.__wrapped__,
callbacks=[
tensorflow.keras.callbacks.TerminateOnNaN()
],
)
if len(self.history.epoch) < TENSORFLOW_EPOCHS.__wrapped__:
log.error("Model %s training failed: only %d epochs computed", self.model, len(self.history.epoch))
raise TrainingFailedError()
else:
log.info("Model %s training succeeded!", self.model)
self.trained = True
def use(self, text: Text) -> Category:
if not self.trained:
log.error("Tried to use a non-trained model.")
raise NotTrainedError()
vector = self.text_vectorization_layer(text)
prediction = self.model.predict(vector, verbose=False)
max_i = None
max_p = None
for i, p in enumerate(iter(prediction[0])):
if max_p is None or p > max_p:
max_i = i
max_p = p
result = float(max_i) + 1.0
return result