# Copyright 2017 Mario Graff Guerrero
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Union, Dict, Tuple, Callable
from sklearn.model_selection import BaseCrossValidator
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import recall_score
from microtc.utils import load_model, tweet_iterator, Counter
from scipy.sparse import csr_matrix
from os.path import join, dirname, isdir, isfile
from urllib import request
from urllib.error import HTTPError
from joblib import Parallel, delayed
import numpy as np
import hashlib
import os
import gzip
try:
USE_TQDM = True
from tqdm import tqdm
except ImportError:
USE_TQDM = False
MICROTC = '2.4.9'
MODEL_LANG = ['ar', 'ca', 'de', 'en', 'es', 'fr',
'hi', 'in', 'it', 'ja', 'ko', 'nl',
'pl', 'pt', 'ru', 'tl', 'tr', 'zh']
BASEURL = 'https://github.com/INGEOTEC/text_models/releases/download/models'
TAILORED = ['IberLEF2022_DAVINCIS_task1', 'IberLEF2023_DAVINCIS_task1',
'IberLEF2023_DIPROMATS_task1', 'IberLEF2023_HOMO-MEX',
'IberLEF2023_HOPE', 'IberLEF2023_HUHU_task1', 'IberLEF2023_PoliticEs_gender',
'IberLEF2023_PoliticEs_ideology_binary', 'IberLEF2023_PoliticEs_ideology_multiclass',
'IberLEF2023_PoliticEs_profession', 'IberLEF2023_RestMex_task1_country',
'IberLEF2023_RestMex_task1_polarity', 'IberLEF2023_RestMex_task1_type',
'evalita2023_ACTI', 'evalita2023_HODI', 'evalita2023_HaSpeeDe3',
'evalita2023_PoliticIT_gender', 'evalita2023_PoliticIT_ideology_binary',
'evalita2023_PoliticIT_ideology_multiclass', 'haha2018',
'semeval2017', 'semeval2018_anger', 'semeval2018_fear', 'semeval2018_joy',
'semeval2018_sadness', 'semeval2018_valence', 'semeval2023_task10_A_edos',
'semeval2023_task10_B_edos', 'semeval2023_task10_C_edos', 'tass2016',
'semeval2024_EDiReF_task1', 'semeval2024_EDiReF_task2',
'semeval2024_EDiReF_task3']
[docs]
class LabelEncoderWrapper(object):
"""Wrapper of LabelEncoder. The idea is to keep the order when the classes are numbers
at some point this will help improve the performance in ordinary classification problems
:param classifier: Specifies whether it is a classification problem
:type classifier: bool
"""
[docs]
def __init__(self, classifier=True):
self._m = {}
self._classifier = classifier
@property
def classifier(self):
"""Whether EvoMSA is acting as classifier"""
return self._classifier
[docs]
def fit(self, y):
"""Fit the label encoder
:param y: Independent variables
:type y: list or np.array
:rtype: self
"""
if not self.classifier:
return self
try:
n = [int(x) for x in y]
except ValueError:
return LabelEncoder().fit(y)
self.classes_ = np.unique(n)
self._m = {v: k for k, v in enumerate(self.classes_)}
self._inv = {v: k for k, v in self._m.items()}
return self
def transform(self, y):
if not self.classifier:
return np.array([float(_) for _ in y])
return np.array([self._m[int(x)] for x in y])
def inverse_transform(self, y):
if not self.classifier:
return y
return np.array([self._inv[int(x)] for x in y])
[docs]
class Cache(object):
"""Store the output of the text models"""
[docs]
def __init__(self, basename):
if basename is None:
self._cache = None
else:
dirname = os.path.dirname(basename)
if len(dirname) and not os.path.isdir(dirname):
os.mkdir(dirname)
self._cache = basename
def __iter__(self):
if self._cache is None:
while True:
yield None
for i in self.textModels:
yield i
@property
def textModels(self):
try:
return self._textModels
except AttributeError:
self._textModels = list()
return self._textModels
@property
def ml(self):
try:
return self._classifiers
except AttributeError:
self._classifiers = list()
return self._classifiers
def ml_train(self):
if self._cache is None or len(self.ml) == 0:
while True:
yield None
for i in self.ml:
yield i
def ml_kfold(self):
if self._cache is None or len(self.ml) == 0:
while True:
yield None
for i in self.ml:
yield i + '-K'
@staticmethod
def get_name(value):
if isinstance(value, str):
return hashlib.md5(value.encode()).hexdigest()
else:
try:
vv = value.__name__
except AttributeError:
vv = value.__class__.__name__
return vv
def append(self, value, ml=None):
if self._cache is None:
return
name = self._cache + "-%s" % self.get_name(value)
if ml is not None:
self.ml.append(name + '-' + self.get_name(ml))
self.textModels.append(name)
def download(model_fname, force=False):
if os.path.isfile(model_fname) and not force:
return model_fname
dirname = os.path.join(os.path.dirname(__file__), 'models')
if not os.path.isdir(dirname):
os.mkdir(dirname)
fname = os.path.join(dirname, model_fname)
if not os.path.isfile(fname) or force:
request.urlretrieve("https://github.com/INGEOTEC/EvoMSA/raw/master/EvoMSA/models/%s" % model_fname,
fname)
return fname
def get_model(model_fname):
fname = download(model_fname)
return load_model(fname)
[docs]
def linearSVC_array(classifiers):
"""Transform LinearSVC into weight stored in array.array
:param classifers: List of LinearSVC where each element is binary
:type classifers: list
"""
import array
intercept = array.array('d', [x.intercept_[0] for x in classifiers])
coef = np.vstack([x.coef_[0] for x in classifiers])
coef = array.array('d', coef.T.flatten())
return coef, intercept
def compute_p(syss):
from scipy.stats import wilcoxon
p = []
mu = syss.mean(axis=0)
best = mu.argmax()
for i in range(syss.shape[1]):
if i == best:
p.append(np.inf)
continue
try:
pv = wilcoxon(syss[:, best], syss[:, i])[1]
p.append(pv)
except ValueError:
p.append(np.inf)
ps = np.argsort(p)
alpha = [np.inf for _ in ps]
m = ps.shape[0] - 1
for r, i in enumerate(ps[:-1]):
alpha_c = (0.05 / (m + 1 - (r + 1)))
if p[i] > alpha_c:
break
alpha[i] = alpha_c
return p, alpha
[docs]
def bootstrap_confidence_interval(y: np.ndarray,
hy: np.ndarray,
metric: Callable[[float, float], float]=lambda y, hy: recall_score(y, hy,
average="macro"),
alpha: float=0.05,
nbootstrap: int=500) -> Tuple[float, float]:
"""Confidence interval from predictions"""
alpha /= 2
B = []
for _ in range(nbootstrap):
s = np.random.randint(hy.shape[0], size=hy.shape[0])
_ = metric(y[s], hy[s])
B.append(_)
return (np.percentile(B, alpha * 100), np.percentile(B, (1 - alpha) * 100))
[docs]
class ConfidenceInterval(object):
"""Estimate the confidence interval
>>> from EvoMSA import base
>>> from EvoMSA.utils import ConfidenceInterval
>>> from microtc.utils import tweet_iterator
>>> import os
>>> tweets = os.path.join(os.path.dirname(base.__file__), 'tests', 'tweets.json')
>>> D = list(tweet_iterator(tweets))
>>> X = [x['text'] for x in D]
>>> y = [x['klass'] for x in D]
>>> kw = dict(stacked_method="sklearn.naive_bayes.GaussianNB")
>>> ci = ConfidenceInterval(X, y, evomsa_kwargs=kw)
>>> result = ci.estimate()
"""
[docs]
def __init__(self, X: List[str], y: Union[np.ndarray, list],
Xtest: List[str]=None, y_test: Union[np.ndarray, list]=None,
evomsa_kwargs: Dict=dict(),
folds: Union[None, BaseCrossValidator]=None, ) -> None:
self._X = X
self._y = np.atleast_1d(y)
self._Xtest = Xtest
self._y_test = y_test
self._evomsa_kwargs = evomsa_kwargs
self._folds = folds
@property
def gold(self):
if self._y_test is not None:
return self._y_test
return self._y
@property
def hy(self):
from .base import EvoMSA
try:
return self._hy
except AttributeError:
if self._Xtest is not None:
m = EvoMSA(**self._evomsa_kwargs)
m.fit(self._X, self._y)
hy = m.predict(self._Xtest)
self._hy = hy
return hy
folds = self._folds
if folds is None:
folds = StratifiedKFold(n_splits=5,
shuffle=True, random_state=0)
hy = np.empty_like(self._y)
X, y = self._X, self._y
for tr, ts in folds.split(X, y):
m = EvoMSA(**self._evomsa_kwargs)
m.fit([X[x] for x in tr], y[tr])
hy[ts] = m.predict([X[x] for x in ts])
self._hy = hy
return self._hy
def estimate(self, alpha: float=0.05,
metric: Callable[[float, float], float]=lambda y, hy: recall_score(y, hy,
average="macro"),
nbootstrap: int=500)->Tuple[float, float]:
return bootstrap_confidence_interval(self.gold, self.hy,
metric=metric,
alpha=alpha,
nbootstrap=nbootstrap)
class Download(object):
def __init__(self, url, output='t.tmp') -> None:
self._url = url
self._output = output
try:
request.urlretrieve(url, output, reporthook=self.progress)
except HTTPError:
raise Exception(url)
self.close()
@property
def tqdm(self):
if not USE_TQDM:
return None
try:
return self._tqdm
except AttributeError:
self._tqdm = tqdm(total=self._nblocks, leave=False)
return self._tqdm
def close(self):
if USE_TQDM:
self.tqdm.close()
def update(self):
if USE_TQDM:
self.tqdm.update()
def progress(self, nblocks, block_size, total):
self._nblocks = total // block_size
self.update()
def load_bow(lang='es', d=17, func='most_common_by_type', v1=False):
def load(filename):
try:
with gzip.open(filename, 'rb') as fpt:
return str(fpt.read(), encoding='utf-8')
except Exception:
os.unlink(filename)
raise Exception(filename)
lang = lang.lower().strip()
assert lang in MODEL_LANG
diroutput = join(dirname(__file__), 'models')
if not isdir(diroutput):
os.mkdir(diroutput)
if v1:
filename = f'{lang}_2.4.2.microtc'
url = f'{BASEURL}/{filename}'
output = join(diroutput, filename)
if not isfile(output):
Download(url, output)
return load_model(output)
filename = f'{lang}_{MICROTC}_bow_{func}_{d}.json.gz'
url = f'{BASEURL}/{filename}'
output = join(diroutput, filename)
if not isfile(output):
Download(url, output)
return Counter.fromjson(load(output))
[docs]
class Linear(object):
"""
>>> from EvoMSA.model import Linear
>>> linear = Linear(coef=[12, 3], intercept=0.5, labels=[0, 'P'])
>>> X = np.array([[2, -1]])
>>> linear.decision_function(X)
21.5
>>> linear.predict(X)[0]
'P'
"""
[docs]
def __init__(self, coef: Union[list, np.ndarray],
intercept: float=0,
labels: Union[list, np.ndarray, None]=None,
N: int=0) -> None:
self.coef = np.atleast_1d(coef)
self.intercept = intercept
self.labels = np.atleast_1d(labels) if labels is not None else labels
self.N = N
@property
def N(self):
"""Size"""
return self._N
@N.setter
def N(self, value):
self._N = value
@property
def labels(self):
"""Classes"""
return self._labels
@labels.setter
def labels(self, value):
self._labels = value
@property
def coef(self):
"""Coefficients"""
return self._coef
@coef.setter
def coef(self, value):
self._coef = value
@property
def intercept(self):
"""Bias or intercept"""
return self._intercept
@intercept.setter
def intercept(self, value):
self._intercept = value
@labels.setter
def labels(self, v):
self._labels = v
def decision_function(self, X: Union[np.ndarray, csr_matrix]) -> np.ndarray:
if isinstance(X, np.ndarray):
return np.dot(X, self.coef) + self.intercept
return X.dot(self.coef) + self.intercept
def predict(self, X: Union[np.ndarray, csr_matrix]) -> np.ndarray:
hy = self.decision_function(X)
if self.labels is not None:
return self.labels[np.where(hy > 0, 1, 0)]
return np.where(hy > 0, 1, -1)
def __sklearn_clone__(self):
klass = self.__class__
ins = klass(self.coef, intercept=self.intercept,
labels=self.labels, N=self.N)
return ins
def load_url(url, n_jobs=1):
def linear(data):
import json
_ = json.loads(data)
return Linear(**_)
def load(filename):
import gzip
try:
with gzip.open(filename) as fpt:
return Parallel(n_jobs=n_jobs)(delayed(linear)(x)
for x in fpt)
except Exception:
os.unlink(filename)
diroutput = join(dirname(__file__), 'models')
output = join(diroutput, url)
url = f'{BASEURL}/{url}'
if not isfile(output):
Download(url, output)
models = load(output)
return models
def _load_text_repr(lang='es', name='emojis',
k=None, d=17, func='most_common_by_type',
v1=False, n_jobs=1):
lang = lang.lower().strip()
assert lang in MODEL_LANG
diroutput = join(dirname(__file__), 'models')
if not isdir(diroutput):
os.mkdir(diroutput)
if v1:
filename = f'{lang}_{name}_muTC2.4.2.json.gz'
else:
filename = f'{lang}_{MICROTC}_{name}_{func}_{d}.json.gz'
models = load_url(filename, n_jobs=n_jobs)
if k is None:
return models
return models[k]
def load_emoji(lang='es', emoji=None,
d=17, func='most_common_by_type',
v1=False, n_jobs=1):
lang = lang.lower().strip()
assert lang in MODEL_LANG
return _load_text_repr(lang, 'emojis',
emoji, d=d, func=func,
v1=v1, n_jobs=n_jobs)
def load_keyword(lang='es', keyword=None,
d=17, func='most_common_by_type',
v1=False, n_jobs=1):
lang = lang.lower().strip()
assert lang in MODEL_LANG
return _load_text_repr(lang, 'keywords',
keyword, d=d, func=func,
v1=v1, n_jobs=n_jobs)
[docs]
def load_dataset(lang='es', name='HA',
k=None, d=17, func='most_common_by_type',
v1=False):
"""
Download and load the Dataset representation
:param lang: ['ar', 'zh', 'en', 'es']
:type lang: str
:param emoji: emoji identifier
:type emoji: int
>>> from EvoMSA.utils import load_dataset, load_bow
>>> bow = load_bow(lang='en')
>>> ds = load_dataset(lang='en', name='travel', k=0)
>>> X = bow.transform(['this is funny'])
>>> df = ds.decision_function(X)
"""
lang = lang.lower().strip()
assert lang in ['ar', 'zh', 'en', 'es']
return _load_text_repr(lang, name,
k, d=d, func=func,
v1=v1)
def b4msa_params(lang, dim=15):
from microtc.params import OPTION_DELETE, OPTION_NONE
assert lang in MODEL_LANG
tm_kwargs=dict(num_option=OPTION_NONE,
usr_option=OPTION_DELETE,
url_option=OPTION_DELETE,
emo_option=OPTION_NONE,
hashtag_option=OPTION_NONE,
ent_option=OPTION_NONE,
lc=True,
del_dup=False,
del_punc=True,
del_diac=True,
select_ent=False,
select_suff=False,
select_conn=False,
max_dimension=True,
unit_vector=True,
token_max_filter=2**dim)
if lang == 'ja' or lang == 'zh':
tm_kwargs['token_list'] = [1, 2, 3]
else:
tm_kwargs['token_list'] = [-2, -1, 2, 3, 4]
return tm_kwargs