import math
import time
import csv
import regex as re
from corpustools.exceptions import MutualInfoError
from corpustools.corpus.classes.lexicon import Corpus, Word
def mi_env_filter(corpus_context, envs, context_output_path='', word_boundary=True):
Environment filter
It extracts only those words that satisfy environment condition and
returns a new corpus_context. The output is to be an argument of the original MI function
as the substitute of an original corpus_context
Spelling and frequency of each word, frequency_threshold, and other parameters of corpus_context retained.
corpus_context : CorpusContext
Context manager for a corpus
envs : list of EnvironmentFilter
List of EnvironmentFilter objects that specify environments
context_output_path : str
Path to save the list of 'clipped' words as a txt file (optional)
word_boundary : bool
Whether word boundary should be considered as a part of bigram
Defaults to True
with only words that satisfy environment filter.
All transcription removed except for the two position which will be compared against the bigram user inputs
pattern = ''
wb_in_env = [False, False]
clipped_corpus = Corpus(
num_lhs = len(envs[0].lhs)
num_rhs = len(envs[0].rhs)
if num_lhs + num_rhs == 0:
return corpus_context
for left_string in envs[0].lhs:
half_env = "(" + "|".join(left_string) + ")"
if"#", half_env) is not None:
wb_in_env[0] = True
pattern = pattern + half_env
pattern = pattern + ".."
for right_string in envs[0].rhs:
half_env = "(" + "|".join(right_string) + ")"
if"#", half_env) is not None:
wb_in_env[1] = True
pattern = pattern + half_env
pattern = re.compile(pattern)
context_pair = []
for word in corpus_context:
tier = getattr(word, corpus_context.sequence_type)
if word_boundary or (wb_in_env[0] + wb_in_env[1]) == 2:
tier_search_from = "".join(tier.with_word_boundaries())
elif wb_in_env[0] + wb_in_env[1]:
tier_search_from = "#" + "".join(tier) if wb_in_env[0] else "".join(tier) + "#"
tier_search_from = "".join(tier)
found = pattern.finditer(tier_search_from, overlapped=True)
env_context = []
for f in found:
if len(env_context) == 0:
env_context = list(f.span())
elif f.span()[0] < env_context[1]:
env_context[1] = f.span()[1]
newword = tier_search_from[env_context[0]:env_context[1]]
context_pair.append((str(word),) + clip_context(newword, word, clipped_corpus))
env_context = list(f.span())
if env_context:
newword = tier_search_from[env_context[0]:env_context[1]]
context_pair.append((str(word),) + clip_context(newword, word, clipped_corpus))
if bool(clipped_corpus.wordlist): # if the clipped corpus is not empty, set it as the context for calculating MI
corpus_context.corpus = clipped_corpus
else: # if the matrix corpus does not have any case of satisfying the specified env., prompt a warning
raise MutualInfoError('Warning! Mutual information could not be calculated'
'because the specified environment is not in the corpus.')
if context_output_path != '':
with open(context_output_path, mode='w', encoding='utf-8-sig', newline='') as f:
writer = csv.writer(f, delimiter='\t')
writer.writerow(['Orthography', 'Transcription', 'Environment', 'Context'])
for context in context_pair:
writer.writerow([context[0], context[1], str(envs[0]), context[2]])
return corpus_context # corpus_context (clipped), to be fed into the original function
def clip_context(new_trans, word, clipped_corpus):
kwargs = {}
new_trans = list(new_trans)
original_word = getattr(word, word._transcription_name)
kwargs[word._transcription_name] = new_trans
kwargs[word._spelling_name] = str(word)
kwargs[word._freq_name] = word._frequency
kwargs['_freq_name'] = word._freq_name
new_word = Word(**kwargs)
clipped_corpus.add_word(new_word, allow_duplicates=True) # add word to clipped_corpus
return str(original_word), ''.join(new_trans) # print the 'word' that satisfies the environment (and to be added)
[docs]def pointwise_mi(corpus_context, query, env_filtered=False, word_boundary='Word-end only', in_word=False,
stop_check=None, call_back=None):
Calculate the mutual information for a bigram.
corpus_context : CorpusContext
Context manager for a corpus
query : tuple
Tuple of two strings, each a segment/letter
env_filtered : bool
True if a env filter selected by the user.
Defaults to False
word_boundary : str or bool
How to count word boundaries once per word. str if no env filter selected, bool with env filters
'Word-end only' counts once,
'Both sides' counts twice (word-initial and word-final), and
'Ignored' does not count word boundaries.
Trueː env filter selected and # can be a part of a bigram.
Defaults to 'Word-end only' (count word boundary once in word-final position)
in_word : bool
Flag to calculate non-local, non-ordered mutual information,
defaults to False
stop_check : callable or None
Optional function to check whether to gracefully terminate early
call_back : callable or None
Optional function to supply progress information during the function
Mutual information of the bigram
if call_back is not None:
call_back("Generating probabilities...")
cur = 0
if in_word:
unigram_dict = get_in_word_unigram_frequencies(corpus_context, query)
bigram_dict = get_in_word_bigram_frequency(corpus_context, query)
need_wd = True
halve_edges = True
if env_filtered or word_boundary == 'Ignored':
need_wd = False # if env filtered, c(orpus) already has needed word boundaries when being clipped!
elif word_boundary == 'Both sides':
halve_edges = False
unigram_dict = corpus_context.get_frequency_base(gramsize = 1, halve_edges = halve_edges,
probability=True, need_wb=need_wd)
bigram_dict = corpus_context.get_frequency_base(gramsize = 2, halve_edges = halve_edges,
probability=True, need_wb=need_wd)
prob_s1 = unigram_dict[query[0]]
except KeyError:
raise(MutualInfoError('The segment {} was not found in the corpus, '
'or in the environment, if you specified one. '.format(query[0])))
prob_s2 = unigram_dict[query[1]]
except KeyError:
raise(MutualInfoError('The segment {} was not found in the corpus, '
'or in the environment, if you specified one. '.format(query[1])))
prob_bg = bigram_dict[query]
except KeyError:
raise MutualInfoError('The bigram {} was not found in the corpus using {}s'.format(''.join(query), corpus_context.sequence_type))
if unigram_dict[query[0]] == 0.0:
raise MutualInfoError('Warning! Mutual information could not be calculated because the unigram {} is not in the corpus.'.format(query[0]))
if unigram_dict[query[1]] == 0.0:
raise MutualInfoError('Warning! Mutual information could not be calculated because the unigram {} is not in the corpus.'.format(query[1]))
if bigram_dict[query] == 0.0:
raise MutualInfoError('Warning! Mutual information could not be calculated because the bigram {} is not in the corpus.'.format(str(query)))
return math.log((prob_bg/(prob_s1*prob_s2)), 2)
def get_in_word_unigram_frequencies(corpus_context, query):
totals = [0 for x in query]
for word in corpus_context:
for i, q in enumerate(query):
if q in getattr(word, corpus_context.sequence_type):
totals[i] += word.frequency
return {k: totals[i] / len(corpus_context) for i, k in enumerate(query)}
def get_in_word_bigram_frequency(corpus_context, query):
total = 0
for word in corpus_context:
tier = getattr(word, corpus_context.sequence_type)
if all(x in tier for x in query):
total += word.frequency
return {query: total / len(corpus_context)}
def all_mis(corpus_context,
word_boundary, in_word = False,
stop_check = None, call_back = None):
mis = {}
total_calculations = ((len(corpus_context.inventory)**2)-len(corpus_context.inventory)/2)+1
ct = 1
t = time.time()
for s1 in corpus_context.inventory:
for s2 in corpus_context.inventory:
#print('Performing MI calculation {} out of {} possible'.format(str(ct), str(total_calculations)))
ct += 1
#print('Duration of last calculation: {}'.format(str(time.time() - t)))
t = time.time()
if type(s1) != str:
s1 = s1.symbol
if type(s2) != str:
s2 = s2.symbol
mi = pointwise_mi(corpus_context, (s1, s2), word_boundary = word_boundary, in_word = in_word)
mis[(s1,s2)] = mi
ordered_mis = sorted([(pair, str(mis[pair])) for pair in mis], key=lambda p: p[1])
return ordered_mis