This commit is contained in:
2021-12-06 22:13:06 -03:00
parent 9d2504f016
commit 34b429530f
6 changed files with 837 additions and 0 deletions

Binary file not shown.

285
python/src/ai/dictionary.py Normal file
View File

@ -0,0 +1,285 @@
import json
import os
import numpy as np
import sklearn
import enlighten
from sklearn.preprocessing import LabelEncoder
import src.contabilidad.pdf as pdf
import src.contabilidad.text_handler as th
from src.ai.models import Phrase, phrase_factory, Word, word_factory
from src.contabilidad.log import LOG_LEVEL
class Dictionary:
def __init__(self, filename, logger):
self.filename = filename
self._logger = logger
self.__processed = []
self.__phrases = None
self.__words = None
self.load()
def load(self):
if not os.path.isfile(self.filename):
return
with open(self.filename, 'r') as file:
data = json.load(file)
if 'words' in data.keys():
self.__words = []
[self.__words.append(word_factory(w)) for w in data['words']]
if 'phrases' in data.keys():
self.__phrases = []
[self.__phrases.append(phrase_factory(ph)) for ph in data['phrases']]
if 'processed' in data.keys():
self.__processed = []
self.__processed = data['processed']
def save(self):
self.sort_words()
self.sort_phrases()
with open(self.filename, 'w') as file:
json.dump(self.to_json(), file, indent=2)
def to_data(self):
encoder = LabelEncoder()
data = encoder.fit_transform([w.get_word() for w in self.get_words()])
[self.__words[i].set_fit(f) for i, f in enumerate(data)]
print(data)
# return [ph.to_data() for ph in self.get_phrases()]
def to_json(self):
output = {
'processed': [],
'words': [],
'phrases': []
}
if self.__processed is not None and len(self.__processed) > 0:
output['processed'] = self.__processed
if self.__words is not None and len(self.__words) > 0:
output['words'] = [w.to_json() for w in self.__words]
if self.__phrases is not None and len(self.__phrases) > 0:
output['phrases'] = [p.to_json() for p in self.__phrases]
return output
def find_phrase(self, phrase: Phrase = None, phrase_dict: dict = None, phrase_list: list = None):
if not self.__phrases:
return -1
if phrase is not None:
phrase_list = [w.get_word() for w in phrase.get_words()]
elif phrase_dict is not None:
phrase_list = phrase_dict['words']
elif phrase_list is not None:
pass
else:
return -1
return find_phrase(self.__phrases, phrase_list)
def add_phrase(self, phrase: Phrase = None, phrase_dict: dict = None, phrase_list: list = None):
if self.__phrases is None:
self.__phrases = []
if phrase is not None:
pass
elif phrase_dict is not None:
phrase = phrase_factory(phrase_dict)
elif phrase_list is not None:
phrase = phrase_factory({'words': phrase_list})
else:
return self
i = self.find_phrase(phrase)
if i > -1:
self.__phrases[i].add_freq()
return self
self.__phrases.append(phrase)
return self
def add_phrases(self, phrase_list: list):
if self.__phrases is None:
self.__phrases = []
phs = [sorted(w.get_word() for w in p) for p in self.__phrases]
with enlighten.get_manager() as manager:
with manager.counter(total=len(phrase_list), desc='Phrases', unit='phrases', color='green') as bar1:
for i, phrase in enumerate(phrase_list):
# print(f'Adding phrase {i}.')
p2 = sorted([w.get_word() for w in phrase])
if p2 in phs:
k = phs.index(p2)
self.__phrases[k].add_freq()
continue
ph = phrase_factory({'words': phrase})
self.__phrases.append(ph)
phs.append(p2)
bar1.update()
def get_phrases(self):
return self.__phrases
def sort_phrases(self):
if self.__phrases is None:
return
try:
def sort_phrase(p):
if p is None:
return 0
if isinstance(p, Phrase):
return p.get_freq(), p.get_type().get_desc(), len(p.get_words())
return p['frequency'], p['type']['description'], len(p['words'])
self.__phrases = sorted(self.__phrases,
key=sort_phrase)
except Exception as e:
self._logger.log(repr(self.__phrases), LOG_LEVEL.ERROR)
self._logger.log(e)
return self
def sort_words(self):
if self.__words is None:
return
try:
def sort_word(w):
if w is None:
return 0
if isinstance(w, Word):
return w.get_freq(), w.get_type().get_desc(), w.get_word()
return w['frequency'], w['type']['description'], w['word']
self.__words = sorted(self.__words, key=sort_word, reverse=True)
except Exception as e:
self._logger.log(repr(self.__words))
self._logger.log(e)
return self
def find_word(self, word: Word = None, word_dict: dict = None, word_str: str = None):
if not self.__words:
return -1
if word is not None:
word_str = word.get_word()
elif word_dict is not None:
word_str = word_dict['word']
elif word_str is not None:
pass
else:
return -1
return find_word(self.__words, word_str)
def add_word(self, word: Word = None, word_dict: dict = None, word_str: str = None):
if self.__words is None:
self.__words = []
if word is not None:
pass
elif word_dict is not None:
word = word_factory(word_dict)
elif word_str is not None:
word = word_factory({'word': word_str})
else:
return self
i = self.find_word(word)
if i > -1:
self.__words[i].add_freq()
return self
self.__words.append(word)
return self
def add_words(self, words: list):
[self.add_word(word=w) for w in words if isinstance(w, Word)]
[self.add_word(word_dict=w) for w in words if isinstance(w, dict)]
[self.add_word(word_str=w) for w in words if isinstance(w, str)]
return self
def get_words(self):
return filter_unique_words(self.__words)
def match_words(self, word_list: list):
new_list = []
for w in word_list:
wi = self.find_word(word_str=w)
new_list.append(self.__words[wi])
return new_list
def append_to_phrase(self, seed: list = None, length: int = 1):
if seed is None:
return [self.__words[0]]
max_index = max(seed) + length
if max_index > len(self.__words):
if length == 1:
return False
return self.append_to_phrase(seed, length - 1)
return seed + self.__words[max_index]
def get_possible_phrases(self, word_list):
print('Adding words.')
self.add_words(word_list)
print('Creating phrases.')
with enlighten.get_manager() as manager:
with manager.counter(total=len(word_list)**2, desc='Phrases', unit='words', color='red') as bar1:
phrases = []
for length in range(1, len(word_list) + 1):
bar2 = bar1.add_subcounter(color='green')
for start in range(0, len(word_list)):
phrase = build_phrase(word_list, start, start + length)
phrase = self.match_words(phrase)
phrases.append(phrase)
start += length
bar2.update()
bar1.update()
print(f'Created {len(phrases)} phrases.')
phrases = sorted(phrases, key=lambda e: len(e))
print('Adding phrases.')
# Really slow (~115000 phrases in one pdf)
self.add_phrases(phrases)
return self.__phrases
def is_processed(self, filename: str):
return os.path.basename(filename) in self.__processed
def process(self, filename: str, password: str = None):
if self.is_processed(filename):
print('Already processed.')
return
t = filename.split('.')
temp = os.path.realpath(os.path.join(os.path.dirname(filename), t[0] + '-temp.pdf'))
print('Removing PDF encryption.')
pdf.remove_encryption(filename, password, temp)
print('Getting text')
obj = pdf.get_text(temp)
os.remove(temp)
print('Getting possible phrases.')
phrases = self.get_possible_phrases(th.split_words(obj))
self.__processed.append(os.path.basename(filename))
return phrases
def build_phrase(word_list, start: int, end: int = None):
if end is None:
return word_list[start:]
return word_list[start:end]
def filter_unique_words(words):
new_list = []
for w in words:
if w not in new_list:
new_list.append(w)
return new_list
def validate_phrase(phrase):
return True
def find_phrase(phrases: list, phrase: list):
phrase_list = [sorted([w.get_word() for w in p.get_words()]) for p in phrases]
sphrase = sorted(phrase)
if sphrase in phrase_list:
return phrase_list.index(sphrase)
return -1
def find_word(words: list, word: str):
word_list = [w.get_word() for w in words]
if word in word_list:
return word_list.index(word)
return -1

243
python/src/ai/models.py Normal file
View File

@ -0,0 +1,243 @@
import json
class Type:
def __init__(self, _id, _description):
self.__id = _id
self.__description = _description
def get_id(self):
return self.__id
def get_desc(self):
return self.__description
def to_json(self):
return self.get_id()
def __repr__(self):
return json.dumps({
'id': self.get_id(),
'description': self.get_desc()
})
def type_factory(_type: str, _id: int):
if _type == 'Word' or _type == 'WordType':
t = WordType()
elif _type == 'Phrase' or _type == 'PhraseType':
t = PhraseType()
else:
return None
t.load(_id)
return t
class WordType(Type):
STRING = 0
NUMERIC = 1
CURRENCY = 2
DATE = 4
def __init__(self):
super().__init__(0, 'string')
def load(self, word_type: int):
if word_type == self.STRING:
self.__description = 'string'
elif word_type == self.NUMERIC:
self.__description = 'numeric'
elif word_type == self.CURRENCY:
self.__description = 'currency'
elif word_type == self.DATE:
self.__description = 'date'
return self
class PhraseType(Type):
TEXT = 0
TITLE = 1
HEADER = 2
MOVEMENT = 4
INVALID = 99
def __init__(self):
super(PhraseType, self).__init__(0, 'text')
def load(self, phrase_type: int):
if phrase_type == self.TEXT:
self.__description = 'text'
elif phrase_type == self.TITLE:
self.__description = 'title'
elif phrase_type == self.HEADER:
self.__description = 'header'
class Word:
def __init__(self):
self.__id = 0
self.__word = None
self.__type_id = 0
self.__type = None
self.__frequency = 1
def set_id(self, idx: int):
self.__id = idx
return self
def set_word(self, word: str):
self.__word = word
return self
def set_type(self, word_type):
if isinstance(word_type, WordType):
self.__type_id = word_type.get_id()
# self.__type = word_type
if isinstance(word_type, int):
self.__type_id = word_type
# self.__type = type_factory('Word', word_type)
return self
def add_freq(self, amount: int = 1):
self.__frequency += amount
return self
def get_id(self) -> int:
return self.__id
def get_word(self) -> str:
return self.__word
def get_type_id(self) -> int:
return self.__type_id
def get_type(self) -> WordType:
if self.__type is None:
self.__type = type_factory('Word', self.__type_id)
return self.__type
def get_freq(self) -> int:
return self.__frequency
def to_json(self) -> dict:
output = {
'id': self.get_id(),
'word': self.get_word(),
'type': self.get_type_id(),
'freq': self.get_freq()
}
return output
def __repr__(self):
return json.dumps(self.to_json())
def word_factory(word: dict) -> Word:
w = Word()
w.set_id(word['id'])
w.set_word(word['word'])
if 'type' in word:
w.set_type(word['type'])
if 'freq' in word:
w.add_freq(word['freq'] - 1)
return w
class Phrase:
def __init__(self):
self.__id = 0
self.__words = None
self.__type_id = 0
self.__type = None
self.__frequency = 1
def set_id(self, idx: int):
self.__id = idx
return self
def add_word(self, word):
if isinstance(word, Word):
self.__words.append(word.get_id())
if isinstance(word, dict):
if 'id' in word:
self.__words.append(word['id'])
if isinstance(word, int):
self.__words.append(word)
return self
def set_words(self, words: list):
if self.__words is None:
self.__words = []
for w in words:
if isinstance(w, Word):
self.add_word(w)
if isinstance(w, dict):
self.add_word(w)
if isinstance(w, int):
self.add_word(w)
return self
def set_type(self, phrase_type):
if isinstance(phrase_type, PhraseType):
self.__type_id = phrase_type.get_id()
# self.__type = phrase_type
if isinstance(phrase_type, int):
self.__type_id = phrase_type
# self.__type = type_factory('Phrase', phrase_type)
return self
def add_freq(self, amount: int = 1):
self.__frequency += amount
return self
def get_id(self) -> int:
return self.__id
def get_words(self) -> list:
return self.__words
def get_type_id(self) -> int:
return self.__type_id
def get_type(self) -> PhraseType:
if self.__type is None:
self.__type = type_factory('Phrase', self.__type_id)
return self.__type
def get_freq(self) -> int:
return self.__frequency
def match(self, word_list: list):
if len(word_list) != len(self.__words):
return False
new_words = sorted(self.__words)
new_list = sorted(word_list)
if new_words == new_list:
return True
return False
def to_json(self):
output = {
'id': self.get_id(),
'words': self.get_words(),
'type': self.get_type_id(),
'freq': self.get_freq()
}
return output
def __repr__(self):
return json.dumps(self.to_json())
def __len__(self):
return len(self.get_words())
def phrase_factory(phrase: dict) -> Phrase:
ph = Phrase()
ph.set_id(phrase['id'])
ph.set_words(phrase['words'])
if 'type' in phrase:
ph.set_type(phrase['type'])
if 'freq' in phrase:
ph.add_freq(phrase['freq'] - 1)
return ph

123
python/src/ai/network.py Normal file
View File

@ -0,0 +1,123 @@
import json
import os
import tensorflow as tf
import sklearn
import numpy as np
from sklearn.preprocessing import LabelEncoder
import src.contabilidad.pdf as pdf
import src.contabilidad.text_handler as th
class Layer:
def __init__(self):
self.__weights = None
self.__bias = None
def set_size(self, inputs: int, size: int):
self.__weights = [[0 for j in range(0, inputs)] for i in range(0, size)]
self.__bias = [0 for i in range(0, size)]
def add_weight(self, vector: list, idx: int = None):
if idx is None:
self.__weights.append(vector)
return self
self.__weights = self.__weights[:idx] + [vector] + self.__weights[idx:]
return self
def set_weight(self, value: float, weight_index: int, input_index: int):
self.__weights[weight_index][input_index] = value
def set_bias(self, value: list):
self.__bias = value
def train(self, input_values: list, output_values: list):
output = self.get_output(input_values)
errors = []
for i, v in enumerate(output):
error = (output_values[i] - v) / output_values[i]
new_value = v * error
def to_json(self):
return {
'bias': self.__bias,
'weights': self.__weights
}
def get_output(self, vector: list):
output = []
for i, weight in enumerate(self.__weights):
val = 0
for j, v in enumerate(weight):
val += v * vector[j]
output[i] = val + self.__bias[i]
return output
def layer_factory(layer_dict: dict):
layer = Layer()
layer.set_bias(layer_dict['bias'])
[layer.add_weight(w) for w in layer_dict['weights']]
return layer
class Network:
def __init__(self, filename: str):
self._filename = filename
self.__layers = None
def load(self):
with open(self._filename) as f:
data = json.load(f)
if 'layers' in data.keys():
self.add_layers(data['layers'])
def add_layers(self, layers: list):
for lr in layers:
layer = layer_factory(lr)
self.__layers.append(layer)
class AI:
def __init__(self, dictionary_filename, logger):
self.__dict = None
self.__network = None
self.__sources = None
self.filename = ''
def add_source(self, text):
if self.__sources is None:
self.__sources = []
self.__sources.append(text)
return self
def set_filename(self, filename: str):
self.filename = filename
return self
def process_sources(self):
for source in self.__sources:
self.process(**source)
def process(self, filename, password):
encoder = LabelEncoder()
t = filename.split('.')
temp = os.path.realpath(os.path.join(os.path.dirname(filename), t[0] + '-temp.pdf'))
pdf.remove_encryption(filename, password, temp)
obj = pdf.get_text(temp)
os.remove(temp)
word_list = th.split_words(obj)
fits = encoder.fit_transform(word_list)
print(fits)
phrases = []
for length in range(1, len(word_list) + 1):
for start in range(0, len(word_list)):
phrase = word_list[start:(start + length)]
phrase = np.append(np.array([fits[word_list.index(w)] for w in phrase]),
np.zeros([len(word_list) - len(phrase)]))
phrases.append(phrase)
phrases = np.array(phrases)
print(phrases.shape)
def active_train(self):
pass

102
python/src/ai/phrase.py Normal file
View File

@ -0,0 +1,102 @@
import json
from src.ai.word import Word, WordType
class PhraseType:
TEXT = 0
TITLE = 1
HEADER = 2
MOVEMENT = 3
INVALID = 99
def __init__(self):
self.__id = 0
self.__description = 'text'
def get_id(self):
return self.__id
def get_desc(self):
return self.__description
def to_json(self):
return self.__id
def load(self, phrase_id: int):
self.__id = phrase_id
if phrase_id == self.TITLE:
self.__description = 'title'
elif phrase_id == self.HEADER:
self.__description = 'header'
elif phrase_id == self.MOVEMENT:
self.__description = 'movement'
elif phrase_id == self.INVALID:
self.__description = 'invalid'
return self
def phrase_factory(phrase: list, phrase_type: int = None, frec: int = 1):
pt = PhraseType()
if phrase_type is not None:
pt.load(phrase_type)
ph = Phrase()
ph.set_phrase(phrase).set_type(pt).add_frec(frec - 1)
return ph
class Phrase:
def __init__(self):
self.__phrase = None
self.__type = None
self.__frec = 1
def to_json(self):
return {
'phrase': [w.to_json() for w in self.__phrase],
'type': self.__type.to_json(),
'frec': self.__frec
}
def set_phrase(self, phrase: list):
[self.add_word(w) for w in phrase]
return self
def get_phrase(self):
return self.__phrase
def set_type(self, phrase_type: PhraseType):
self.__type = phrase_type
return self
def get_type(self):
return self.__type
def add_word(self, word: Word, pos: int = None):
if self.__phrase is None:
self.__phrase = []
if pos is None:
self.__phrase.append(word)
return self
self.__phrase = self.__phrase[:pos] + [word] + self.__phrase[pos:]
return self
def add_frec(self, amount: int = 1):
self.__frec += amount
def match(self, words: list):
if len(words) != len(self.__phrase):
return False
for w in self.__phrase:
if w not in words:
return False
return True
def __repr__(self):
print(self.__phrase)
return json.dumps({
'phrase': [w.to_json() for w in self.get_phrase()],
'type': self.get_type().get_desc()
})
def __len__(self):
return len(self.__phrase)

84
python/src/ai/word.py Normal file
View File

@ -0,0 +1,84 @@
import json
class WordType:
STRING = 0
NUMERIC = 1
CURRENCY = 2
DATE = 3
def __init__(self):
self.__id = 0
self.__description = 'string'
def to_json(self):
return self.__id
def load(self, word_id: int):
self.__id = word_id
if word_id == self.NUMERIC:
self.__description = 'numeric'
elif word_id == self.CURRENCY:
self.__description = 'currency'
elif word_id == self.DATE:
self.__description = 'data'
return self
def get_id(self):
return self.__id
def get_desc(self):
return self.__description
def __repr__(self):
return {
'id': self.get_id(),
'description': self.get_desc()
}
def word_factory(word: str, word_type: int = None, frec: int = 1):
wt = WordType()
if word_type is not None:
wt.load(word_type)
w = Word()
w.set_word(word).set_type(wt).add_frec(frec - 1)
return w
class Word:
def __init__(self):
self.__word = None
self.__type = None
self.__frec = 1
def to_json(self):
return {
'word': self.__word,
'type': self.__type.to_json(),
'frec': self.__frec
}
def set_word(self, word: str):
self.__word = word
return self
def get_word(self):
return self.__word
def set_type(self, word_type: WordType):
self.__type = word_type
return self
def get_type(self):
return self.__type
def add_frec(self, amount: int = 1):
self.__frec += amount
return self
def __repr__(self):
return json.dumps({
'word': self.get_word(),
'type': self.get_type().get_desc()
})