Source code for corpustools.mutualinfo.mutual_information

import math
import time
import csv
import regex as re

from corpustools.exceptions import MutualInfoError
from corpustools.corpus.classes.lexicon import Corpus, Word


def mi_env_filter(corpus_context, envs, context_output_path='', word_boundary=True):
    """
    Environment filter
    It extracts only those words that satisfy environment condition and
    returns a new corpus_context. The output is to be an argument of the original MI function
    as the substitute of an original corpus_context
    Spelling and frequency of each word, frequency_threshold, and other parameters of corpus_context retained.

    Parameters
    ----------
    corpus_context : CorpusContext
        Context manager for a corpus
    envs : list of EnvironmentFilter
        List of EnvironmentFilter objects that specify environments
    context_output_path : str
        Path to save the list of 'clipped' words as a txt file (optional)
    word_boundary : bool
        Whether word boundary should be considered as a part of bigram
        Defaults to True

    Returns
    -------
    CorpusContext
        with only words that satisfy environment filter.
        All transcription removed except for the two position which will be compared against the bigram user inputs
    """
    pattern = ''
    wb_in_env = [False, False]
    clipped_corpus = Corpus(corpus_context.corpus.name)

    num_lhs = len(envs[0].lhs)
    num_rhs = len(envs[0].rhs)

    if num_lhs + num_rhs == 0:
        return corpus_context

    for left_string in envs[0].lhs:
        half_env = "(" + "|".join(left_string) + ")"
        if re.search(r"#", half_env) is not None:
            wb_in_env[0] = True
        pattern = pattern + half_env
    pattern = pattern + ".."
    for right_string in envs[0].rhs:
        half_env = "(" + "|".join(right_string) + ")"
        if re.search(r"#", half_env) is not None:
            wb_in_env[1] = True
        pattern = pattern + half_env
    pattern = re.compile(pattern)

    context_pair = []
    for word in corpus_context:
        tier = getattr(word, corpus_context.sequence_type)

        if word_boundary or (wb_in_env[0] + wb_in_env[1]) == 2:
            tier_search_from = "".join(tier.with_word_boundaries())
        elif wb_in_env[0] + wb_in_env[1]:
            tier_search_from = "#" + "".join(tier) if wb_in_env[0] else "".join(tier) + "#"
        else:
            tier_search_from = "".join(tier)

        found = pattern.finditer(tier_search_from, overlapped=True)
        env_context = []
        for f in found:
            if len(env_context) == 0:
                env_context = list(f.span())
            elif f.span()[0] < env_context[1]:
                env_context[1] = f.span()[1]
            else:
                newword = tier_search_from[env_context[0]:env_context[1]]
                context_pair.append((str(word),) + clip_context(newword, word, clipped_corpus))
                env_context = list(f.span())

        if env_context:
            newword = tier_search_from[env_context[0]:env_context[1]]
            context_pair.append((str(word),) + clip_context(newword, word, clipped_corpus))

    if bool(clipped_corpus.wordlist):  # if the clipped corpus is not empty, set it as the context for calculating MI
        corpus_context.corpus = clipped_corpus
    else:    # if the matrix corpus does not have any case of satisfying the specified env., prompt a warning
        raise MutualInfoError('Warning! Mutual information could not be calculated'
                              'because the specified environment is not in the corpus.')

    if context_output_path != '':
        with open(context_output_path, mode='w', encoding='utf-8-sig', newline='') as f:
            writer = csv.writer(f, delimiter='\t')
            writer.writerow(['Orthography', 'Transcription', 'Environment',  'Context'])
            for context in context_pair:
                writer.writerow([context[0], context[1], str(envs[0]), context[2]])

    return corpus_context  # corpus_context (clipped), to be fed into the original function


def clip_context(new_trans, word, clipped_corpus):
    kwargs = {}
    new_trans = list(new_trans)
    original_word = getattr(word, word._transcription_name)
    kwargs[word._transcription_name] = new_trans
    kwargs[word._spelling_name] = str(word)
    kwargs[word._freq_name] = word._frequency
    kwargs['_freq_name'] = word._freq_name

    new_word = Word(**kwargs)
    clipped_corpus.add_word(new_word, allow_duplicates=True)  # add word to clipped_corpus
    return str(original_word), ''.join(new_trans)  # print the 'word' that satisfies the environment (and to be added)


[docs]def pointwise_mi(corpus_context, query, env_filtered=False, word_boundary='Word-end only', in_word=False, stop_check=None, call_back=None): """ Calculate the mutual information for a bigram. Parameters ---------- corpus_context : CorpusContext Context manager for a corpus query : tuple Tuple of two strings, each a segment/letter env_filtered : bool True if a env filter selected by the user. Defaults to False word_boundary : str or bool How to count word boundaries once per word. str if no env filter selected, bool with env filters 'Word-end only' counts once, 'Both sides' counts twice (word-initial and word-final), and 'Ignored' does not count word boundaries. Trueː env filter selected and # can be a part of a bigram. Defaults to 'Word-end only' (count word boundary once in word-final position) in_word : bool Flag to calculate non-local, non-ordered mutual information, defaults to False stop_check : callable or None Optional function to check whether to gracefully terminate early call_back : callable or None Optional function to supply progress information during the function Returns ------- float Mutual information of the bigram """ if call_back is not None: call_back("Generating probabilities...") call_back(0,0) cur = 0 if in_word: unigram_dict = get_in_word_unigram_frequencies(corpus_context, query) bigram_dict = get_in_word_bigram_frequency(corpus_context, query) else: need_wd = True halve_edges = True if env_filtered or word_boundary == 'Ignored': need_wd = False # if env filtered, c(orpus) already has needed word boundaries when being clipped! elif word_boundary == 'Both sides': halve_edges = False unigram_dict = corpus_context.get_frequency_base(gramsize = 1, halve_edges = halve_edges, probability=True, need_wb=need_wd) bigram_dict = corpus_context.get_frequency_base(gramsize = 2, halve_edges = halve_edges, probability=True, need_wb=need_wd) try: prob_s1 = unigram_dict[query[0]] except KeyError: raise(MutualInfoError('The segment {} was not found in the corpus, ' 'or in the environment, if you specified one. '.format(query[0]))) try: prob_s2 = unigram_dict[query[1]] except KeyError: raise(MutualInfoError('The segment {} was not found in the corpus, ' 'or in the environment, if you specified one. '.format(query[1]))) try: prob_bg = bigram_dict[query] except KeyError: raise MutualInfoError('The bigram {} was not found in the corpus using {}s'.format(''.join(query), corpus_context.sequence_type)) if unigram_dict[query[0]] == 0.0: raise MutualInfoError('Warning! Mutual information could not be calculated because the unigram {} is not in the corpus.'.format(query[0])) if unigram_dict[query[1]] == 0.0: raise MutualInfoError('Warning! Mutual information could not be calculated because the unigram {} is not in the corpus.'.format(query[1])) if bigram_dict[query] == 0.0: raise MutualInfoError('Warning! Mutual information could not be calculated because the bigram {} is not in the corpus.'.format(str(query))) return math.log((prob_bg/(prob_s1*prob_s2)), 2)
def get_in_word_unigram_frequencies(corpus_context, query): totals = [0 for x in query] for word in corpus_context: for i, q in enumerate(query): if q in getattr(word, corpus_context.sequence_type): totals[i] += word.frequency return {k: totals[i] / len(corpus_context) for i, k in enumerate(query)} def get_in_word_bigram_frequency(corpus_context, query): total = 0 for word in corpus_context: tier = getattr(word, corpus_context.sequence_type) if all(x in tier for x in query): total += word.frequency return {query: total / len(corpus_context)} def all_mis(corpus_context, word_boundary, in_word = False, stop_check = None, call_back = None): mis = {} total_calculations = ((len(corpus_context.inventory)**2)-len(corpus_context.inventory)/2)+1 ct = 1 t = time.time() for s1 in corpus_context.inventory: for s2 in corpus_context.inventory: #print('Performing MI calculation {} out of {} possible'.format(str(ct), str(total_calculations))) ct += 1 #print('Duration of last calculation: {}'.format(str(time.time() - t))) t = time.time() if type(s1) != str: s1 = s1.symbol if type(s2) != str: s2 = s2.symbol #print(s1,s2) mi = pointwise_mi(corpus_context, (s1, s2), word_boundary = word_boundary, in_word = in_word) mis[(s1,s2)] = mi ordered_mis = sorted([(pair, str(mis[pair])) for pair in mis], key=lambda p: p[1]) return ordered_mis