Source code for corpustools.funcload.functional_load

import re
from collections import defaultdict
from math import *
import itertools
from math import factorial
import time

from corpustools.exceptions import FuncLoadError 
from corpustools.funcload.io import save_minimal_pairs 
from corpustools.corpus.classes.lexicon import EnvironmentFilter


def is_minpair(first, second, corpus_context, segment_pairs, environment_filter): 
    """Return True iff first/second are a minimal pair.
    Checks that all segments in those words are identical OR a valid segment pair
    (from segment_pairs) and fit the environment_filter, and that there is at least
    one difference between first and second.

    Parameters
    ----------

    first, second: Word
        Two words to evaluate as a minimal pair. 
    corpus_context: CorpusContext
        The context manager that the corpus should be evaluated from (e.g., canonical variants).
    segment_pairs: List
        list of length-2 tuples of str
    environment_filter: Environment
        The environment in which words should be evaluated for being a minimal pair.
    
    """
    first = getattr(first, corpus_context.sequence_type)
    second = getattr(second, corpus_context.sequence_type)
    if len(first) != len(second):
        return False
    has_difference = False
    for i in range(len(first)):
        if first[i] == second[i]:
            continue
        elif (conflateable(first[i], second[i], segment_pairs) 
            and fits_environment(first, second, i, environment_filter)): 
            has_difference = True
            continue
        else:
            return False
    if has_difference:
        return True


def conflateable(seg1, seg2, segment_pairs):
    """Return True iff seg1 and seg2 are exactly one of the segment pairs
    in segment_pairs (ignoring ordering of either).

    seg1 and seg2 will never be identical in the input.

    Parameters
    ----------

    seg1, seg2: Segment
        Two segments on which minimal pairs might hinge. 
    segment_pairs: List
        list of length-2 tuples of str
    
    """
    for segment_pair in segment_pairs:
        seg_set = set(segment_pair)
        if seg1 in seg_set and seg2 in seg_set:
            return True
    return False


def fits_environment(w1, w2, index, environment_filter):
    """Return True iff for both w1 and w2 (tiers), the environment
    of its i'th element passes the environment_filter.

    """
    if not environment_filter:
        return True

    def ready_for_re(word, index):
        w = [str(seg) for seg in word]
        w[index] = '_'
        return ' '.join(w)

    w1 = ready_for_re(w1, index)
    w2 = ready_for_re(w2, index)
    env_re = make_environment_re(environment_filter) 

    return (bool(re.search(env_re, w1)) and bool(re.search(env_re, w2)))


def ready_for_re(word, index):
        w = [str(seg) for seg in word]
        w[index] = '_'
        return ' '.join(w)

def make_environment_re(environment_filter): 
    if environment_filter.lhs:
        re_lhs = ' '.join(['('+('|'.join([seg for seg in position])+')') for position in environment_filter.lhs])
        re_lhs = re_lhs.replace('#', '^')
    else:
        re_lhs = ''

    if environment_filter.rhs:
        re_rhs = ' '.join(['('+('|'.join([seg for seg in position])+')') for position in environment_filter.rhs])
        re_rhs = re_rhs.replace('#', '$')
    else:
        re_rhs = ''

    if re_lhs and not re_lhs.endswith('^)'):
        re_lhs += ' '
    if re_rhs and not re_rhs.endswith('($'):
        re_rhs = ' ' + re_rhs
    return re_lhs + '_' + re_rhs


# This is the function I really edited
# I changed the parameter called 'relative_count' to 'relative_count_to_relevant_sounds' and changed its default value.
# I added a new parameter, 'relative_count_to_whole_corpus', and set its default to true.
# I updated the doc strings.
[docs]def minpair_fl(corpus_context, segment_pairs, relative_count_to_relevant_sounds = False, relative_count_to_whole_corpus = True, distinguish_homophones = False, environment_filter = None, prevent_normalization = False, stop_check = None, call_back = None): """Calculate the functional load of the contrast between two segments as a count of minimal pairs. Parameters ---------- corpus_context : CorpusContext Context manager for a corpus segment_pairs : list of length-2 tuples of str The pairs of segments to be conflated. relative_count_to_relevant_sounds : bool, optional If True, divide the number of minimal pairs by by the total number of words that contain either of the two segments. # changed the name of this from "relative_count" to "relative_count_to_relevant_sounds" # set its default to False above relative_count_to_whole_corpus : bool, optional If True, divide the number of minimal pairs by the total number of words in the corpus (regardless of whether those words contain the target sounds). Defaults to True. distinguish_homophones : bool, optional If False, then you'll count sock~shock (sock=clothing) and sock~shock (sock=punch) as just one minimal pair; but if True, you'll overcount alternative spellings of the same word, e.g. axel~actual and axle~actual. False is the value used by Wedel et al. environment_filter : EnvironmentFilter Allows the user to restrict the neutralization process to segments in particular segmental contexts stop_check : callable, optional Optional function to check whether to gracefully terminate early call_back : callable, optional Optional function to supply progress information during the function Returns ------- tuple(int or float, list) Tuple of: 0. if `relative_count_to_relevant_sounds`==False and `relative_count_to_whole_corpus`==False, an int of the raw number of minimal pairs; if `relative_count_to_relevant_sounds`==True, a float of that count divided by the total number of words in the corpus that include either `s1` or `s2`; if `relative_count_to_whole_corpus`==True, a float of the raw count divided by the total number of words in the corpus; and 1. list of minimal pairs. """ if stop_check is not None and stop_check(): return ## Count the number of words in the corpus (needed if relative_count_to_whole_corpus is True) num_words_in_corpus = len(corpus_context.corpus) ## Filter out words that have none of the target segments ## (for relative_count_to_relevant_sounds as well as improving runtime) contain_target_segment = [] if call_back is not None: call_back('Finding words with the specified segments...') call_back(0, len(corpus_context)) cur = 0 all_target_segments = list(itertools.chain.from_iterable(segment_pairs)) # creates a list of target segments from the list of tuples for w in corpus_context: # loops through the words in the context manager if stop_check is not None and stop_check(): return if call_back is not None: cur += 1 if cur % 100 == 0: call_back(cur) tier = getattr(w, corpus_context.sequence_type) if any([s in tier for s in all_target_segments]): contain_target_segment.append(w) if stop_check is not None and stop_check(): return ## Find minimal pairs minpairs = [] if call_back is not None: call_back('Finding minimal pairs...') if len(contain_target_segment) >= 2: call_back(0,factorial(len(contain_target_segment))/(factorial(len(contain_target_segment)-2)*2)) cur = 0 for first, second in itertools.combinations(contain_target_segment, 2): if stop_check is not None and stop_check(): return if call_back is not None: cur += 1 if cur % 100 == 0: call_back(cur) if is_minpair(first, second, corpus_context, segment_pairs, environment_filter): ordered_pair = sorted([(first, getattr(first, corpus_context.sequence_type)), (second, getattr(second, corpus_context.sequence_type))], key = lambda x: x[1]) # sort by tier/transcription minpairs.append(tuple(ordered_pair)) ## Generate output if not distinguish_homophones: actual_minpairs = {} for pair in minpairs: if stop_check is not None and stop_check(): return key = (pair[0][1], pair[1][1]) # Keys are tuples of transcriptions if key not in actual_minpairs: actual_minpairs[key] = (pair[0][0], pair[1][0]) # Values are words else: pair_freq = pair[0][0].frequency + pair[1][0].frequency existing_freq = actual_minpairs[key][0].frequency + \ actual_minpairs[key][1].frequency if pair_freq > existing_freq: actual_minpairs[key] = (pair[0][0], pair[1][0]) result = sum((x[0].frequency + x[1].frequency)/2 for x in actual_minpairs.values()) else: result = sum((x[0][0].frequency + x[1][0].frequency)/2 for x in minpairs) if relative_count_to_relevant_sounds and len(contain_target_segment) > 0: result /= sum(x.frequency for x in contain_target_segment) elif relative_count_to_whole_corpus: result = result / num_words_in_corpus return (result, minpairs)
[docs]def deltah_fl(corpus_context, segment_pairs, environment_filter = None, prevent_normalization = False, stop_check = None, call_back = None): """Calculate the functional load of the contrast between between two segments as the decrease in corpus entropy caused by a merger. Parameters ---------- corpus_context : CorpusContext Context manager for a corpus segment_pairs : list of length-2 tuples of str The pairs of segments to be conflated. environment_filter : EnvironmentFilter Allows the user to restrict the neutralization process to segments in particular segmental contexts stop_check : callable, optional Optional function to check whether to gracefully terminate early call_back : callable, optional Optional function to supply progress information during the function Returns ------- float The difference between a) the entropy of the choice among non-homophonous words in the corpus before a merger of `s1` and `s2` and b) the entropy of that choice after the merger. """ if call_back is not None: call_back('Finding instances of segments...') call_back(0, len(corpus_context)) cur = 0 freq_sum = 0 original_probs = defaultdict(float) all_target_segments = list(itertools.chain.from_iterable(segment_pairs)) if environment_filter: filled_environment = EnvironmentFilter(tuple(all_target_segments), environment_filter.lhs, environment_filter.rhs) for w in corpus_context: if stop_check is not None and stop_check(): return if call_back is not None: cur += 1 if cur % 20 == 0: call_back(cur) f = w.frequency original_probs[getattr(w, corpus_context.sequence_type)] += f freq_sum += f original_probs = {k:v/freq_sum for k,v in original_probs.items()} if stop_check is not None and stop_check(): return preneutr_h = entropy(original_probs.values()) neutralized_probs = defaultdict(float) if call_back is not None: call_back('Neutralizing instances of segments...') call_back(0, len(list(original_probs.keys()))) cur = 0 for k,v in original_probs.items(): if stop_check is not None and stop_check(): return if call_back is not None: cur += 1 if cur % 100 == 0: call_back(cur) if not environment_filter or k.find(filled_environment): n = [neutralize_segment(seg, segment_pairs) for seg in k] neutralized_probs['.'.join(n)] += v postneutr_h = entropy(neutralized_probs.values()) if stop_check is not None and stop_check(): return result = preneutr_h - postneutr_h if result < 1e-10: result = 0.0 if not prevent_normalization and preneutr_h > 0.0: result = result / preneutr_h return result
# This function is weirdly named. It should really be something like # average_minpair_fl # It has also been changed so as to have two "relativizer" options: # one to words containing the relevant segments and one to all # words in the corpus (though it basically does the calculation # by calling the above minpair_fl() function).
[docs]def relative_minpair_fl(corpus_context, segment, relative_count_to_relevant_sounds = False, relative_count_to_whole_corpus = True, distinguish_homophones = False, output_filename = None, environment_filter = None, prevent_normalization = False, stop_check = None, call_back = None): """Calculate the average functional load of the contrasts between a segment and all other segments, as a count of minimal pairs. Parameters ---------- corpus_context : CorpusContext Context manager for a corpus segment : str The target segment. relative_count_to_relevant_sounds : bool, optional If True, divide the number of minimal pairs by the total number of words that contain either of the two segments. relative_count_to_whole_corpus : bool, optional If True, divide the number of minimal pairs by the total number of words in the corpus (regardless of whether those words contain the target sounds). Defaults to True. distinguish_homophones : bool, optional If False, then you'll count sock~shock (sock=clothing) and sock~shock (sock=punch) as just one minimal pair; but if True, you'll overcount alternative spellings of the same word, e.g. axel~actual and axle~actual. False is the value used by Wedel et al. environment_filter : EnvironmentFilter Allows the user to restrict the neutralization process to segments in particular segmental contexts stop_check : callable, optional Optional function to check whether to gracefully terminate early call_back : callable, optional Optional function to supply progress information during the function Returns ------- int or float If `relative_count_to_relevant_sounds`==False and `relative_count_to_whole_corpus`==False, returns an int of the raw number of minimal pairs. If `relative_count_to_relevant_sounds`==True, returns a float of that count divided by the total number of words in the corpus that include either `s1` or `s2`. If `relative_count_to_whole_corpus`==True, a float of the raw count divided by the total number of words in the corpus. """ all_segments = corpus_context.inventory segment_pairs = [(segment,other.symbol) for other in all_segments if other.symbol != segment and other.symbol != '#'] results = [] to_output = [] for sp in segment_pairs: res = minpair_fl(corpus_context, [sp], relative_count_to_relevant_sounds = relative_count_to_relevant_sounds, relative_count_to_whole_corpus = relative_count_to_whole_corpus, distinguish_homophones = distinguish_homophones, environment_filter = environment_filter, prevent_normalization = prevent_normalization, stop_check = stop_check, call_back = call_back) results.append(res[0]) if output_filename is not None: to_output.append((sp, res[1])) if output_filename is not None: save_minimal_pairs(output_filename, to_output) return sum(results)/len(segment_pairs)
[docs]def relative_deltah_fl(corpus_context, segment, environment_filter = None, stop_check = None, call_back = None): """Calculate the average functional load of the contrasts between a segment and all other segments, as the decrease in corpus entropy caused by a merger. Parameters ---------- corpus_context : CorpusContext Context manager for a corpus segment : str The target segment. stop_check : callable, optional Optional function to check whether to gracefully terminate early call_back : callable, optional Optional function to supply progress information during the function Returns ------- float The difference between a) the entropy of the choice among non-homophonous words in the corpus before a merger of `s1` and `s2` and b) the entropy of that choice after the merger. """ all_segments = corpus_context.inventory segment_pairs = [(segment,other.symbol) for other in all_segments if other.symbol != segment and other.symbol != '#'] results = [] for sp in segment_pairs: results.append(deltah_fl(corpus_context, [sp], environment_filter=environment_filter, stop_check = stop_check, call_back = call_back)) return sum(results)/len(segment_pairs)
def collapse_segpairs_fl(corpus_context, **kwargs): func_type = kwargs.get('func_type') segment_pairs = kwargs.get('segment_pairs') relative_count = kwargs.get('relative_count') distinguish_homophones = kwargs.get('distinguish_homophones') if func_type == 'min_pairs': fl = minpair_fl(corpus_context, segment_pairs, relative_count, distinguish_homophones, environment_filter=environment_filter) elif func_type == 'entropy': fl = deltah_fl(corpus_context, segment_pairs, environment_filter=environment_filter) def individual_segpairs_fl(corpus_context, **kwargs): func_type = kwargs.get('func_type') segment_pairs = kwargs.get('segment_pairs') relative_count = kwargs.get('relative_count') distinguish_homophones = kwargs.get('distinguish_homophones') results = [] for pair in segment_pairs: if func_type == 'min_pairs': fl = minpair_fl(corpus_context, [pair], relative_count, distinguish_homophones, environment_filter=environment_filter) elif func_type == 'entropy': fl = deltah_fl(corpus_context, [pair], environment_filter=environment_filter) results.append(fl) def entropy(probabilities): """Calculate the entropy of a choice from the provided probability distribution. Parameters --------- probabilities : list of floats Contains the probability of each item in the list. Returns ------- float Entropy """ return -(sum([p*log(p,2) if p > 0 else 0 for p in probabilities])) def neutralize_segment(segment, segment_pairs): for sp in segment_pairs: try: s = segment.symbol except AttributeError: s = segment if s in sp: return 'NEUTR:'+''.join(str(x) for x in sp) return s # This one also now has two different "relative count" options. def all_pairwise_fls(corpus_context, relative_fl = False, algorithm = 'minpair', relative_count_to_relevant_sounds = False, relative_count_to_whole_corpus = True, distinguish_homophones = False, environment_filter = None, call_back = None, stop_check=None): """Calculate the functional load of the contrast between two segments as a count of minimal pairs. This version calculates the functional load for ALL pairs of segments in the inventory, which could be useful for visually mapping out phoneme inventories. Parameters ---------- corpus_context : CorpusContext Context manager for a corpus relative_fl : bool If False, return the FL for all segment pairs. If True, return the relative (average) FL for each segment. algorithm : str {'minpair', 'deltah'} Algorithm to use for calculating functional load: "minpair" for minimal pair count or "deltah" for change in entropy. relative_count_to_relevant_sounds : bool, optional If True, divide the number of minimal pairs by the total count by the total number of words that contain either of the two segments. relative_count_to_whole_corpus : bool, optional If True, divide the number of minimal pairs by the total number of words in the corpus (regardless of whether those words contain the target sounds). Defaults to True. distinguish_homophones : bool, optional If False, then you'll count sock~shock (sock=clothing) and sock~shock (sock=punch) as just one minimal pair; but if True, you'll overcount alternative spellings of the same word, e.g. axel~actual and axle~actual. False is the value used by Wedel et al. environment_filter : EnvironmentFilter Allows the user to restrict the neutralization process to segments in particular segmental contexts Returns ------- list of tuple(tuple(str, st), float) OR list of (str, float) Normally returns a list of all Segment pairs and their respective functional load values, as length-2 tuples ordered by FL. If calculating relative FL (i.e., average FL for a segment), returns a dictionary of each segment and its relative (average) FL, with entries ordered by FL. """ fls = {} total_calculations = ((((len(corpus_context.inventory)-1)**2)-len(corpus_context.inventory)-1)/2)+1 ct = 1 t = time.time() if '' in corpus_context.inventory: raise Exception('Warning: Calculation of functional load for all segment pairs requires that all items in corpus have a non-null transcription.') # Count the number of words in the corpus (needed if relative_count_to_whole_corpus is True) num_words_in_corpus = len(corpus_context.corpus) for i, s1 in enumerate(corpus_context.inventory[:-1]): for s2 in corpus_context.inventory[i+1:]: if s1 != '#' and s2 != '#': print('Performing FL calculation {} out of {} possible'.format(str(ct), str(total_calculations))) ct += 1 print('Duration of last calculation: {}'.format(str(time.time() - t))) t = time.time() if type(s1) != str: s1 = s1.symbol if type(s2) != str: s2 = s2.symbol if algorithm == 'minpair': fl = minpair_fl(corpus_context, [(s1, s2)], relative_count_to_relevant_sounds=relative_count_to_relevant_sounds, relative_count_to_whole_corpus=relative_count_to_whole_corpus, distinguish_homophones=distinguish_homophones, environment_filter=environment_filter)[0] elif algorithm == 'deltah': fl = deltah_fl(corpus_context, [(s1, s2)], environment_filter=environment_filter) fls[(s1, s2)] = fl if not relative_fl: ordered_fls = sorted([(pair, fls[pair]) for pair in fls], key=lambda p: p[1], reverse=True) return ordered_fls elif relative_fl: rel_fls = {} for s in corpus_context.inventory: if type(s) != str: s = s.symbol if s != '#': total = 0.0 for pair in fls: if s == pair[0] or s == pair[1]: total += fls[pair] rel_fls[s] = total / (len(corpus_context.inventory) - 1) ordered_rel_fls = sorted([(s, rel_fls[s]) for s in rel_fls], key=lambda p: p[1], reverse=True) return ordered_rel_fls ## Filter out words that have none of the target segments ## (for relative_count_to_relevant_sounds as well as improving runtime) contain_target_segment = [] if call_back is not None: call_back('Finding words with the specified segments...') call_back(0, len(corpus_context)) cur = 0 all_target_segments = list(itertools.chain.from_iterable(segment_pairs)) # creates a list of target segments from the list of tuples for w in corpus_context: # loops through the words in the context manager if stop_check is not None and stop_check(): return if call_back is not None: cur += 1 if cur % 100 == 0: call_back(cur) tier = getattr(w, corpus_context.sequence_type) if any([s in tier for s in all_target_segments]): contain_target_segment.append(w) if stop_check is not None and stop_check(): return ## Find minimal pairs minpairs = [] if call_back is not None: call_back('Finding minimal pairs...') if len(contain_target_segment) >= 2: call_back(0,factorial(len(contain_target_segment))/(factorial(len(contain_target_segment)-2)*2)) cur = 0 for first, second in itertools.combinations(contain_target_segment, 2): if stop_check is not None and stop_check(): return if call_back is not None: cur += 1 if cur % 100 == 0: call_back(cur) if is_minpair(first, second, corpus_context, segment_pairs, environment_filter): ordered_pair = sorted([(first, getattr(first, corpus_context.sequence_type)), (second, getattr(second, corpus_context.sequence_type))], key = lambda x: x[1]) # sort by tier/transcription minpairs.append(tuple(ordered_pair)) ## Generate output if not distinguish_homophones: actual_minpairs = {} for pair in minpairs: if stop_check is not None and stop_check(): return key = (pair[0][1], pair[1][1]) # Keys are tuples of transcriptions if key not in actual_minpairs: actual_minpairs[key] = (pair[0][0], pair[1][0]) # Values are words else: pair_freq = pair[0][0].frequency + pair[1][0].frequency existing_freq = actual_minpairs[key][0].frequency + \ actual_minpairs[key][1].frequency if pair_freq > existing_freq: actual_minpairs[key] = (pair[0][0], pair[1][0]) result = sum((x[0].frequency + x[1].frequency)/2 for x in actual_minpairs.values()) else: result = sum((x[0][0].frequency + x[1][0].frequency)/2 for x in minpairs) if relative_count_to_relevant_sounds and len(contain_target_segment) > 0: result /= sum(x.frequency for x in contain_target_segment) # added what to do if the relative count to the whole corpus is true, namely, divide by the number of words in the corpus elif relative_count_to_whole_corpus: result = result / num_words_in_corpus return (result, minpairs)