import re
from collections import defaultdict
from math import *
import itertools
import queue
import copy
from math import factorial
import time

from corpustools.exceptions import FuncLoadError
from .io import save_minimal_pairs

def matches(first, second):
    Determine if two neutralized transcriptions are a minimal pair or not

    first : tuple
        Tuple of the neutralized sequence, the spelling of the word,
        and the unneutralized sequence
    second : tuple
        Tuple of the neutralized sequence, the spelling of the word,
        and the unneutralized sequence

        Returns True if the neutralized sequences match, they both contain
        neutralized segments, and the spellings and original transcriptions
        are different; otherwise returns False
    return (first[0] == second[0] and first[1] != second[1]
        and 'NEUTR:' in first[0] and 'NEUTR:' in second[0]
        and first[2] != second[2])

[docs]def minpair_fl(corpus_context, segment_pairs, relative_count = True, distinguish_homophones = False, stop_check = None, call_back = None): """Calculate the functional load of the contrast between two segments as a count of minimal pairs. Parameters ---------- corpus_context : CorpusContext Context manager for a corpus segment_pairs : list of length-2 tuples of str The pairs of segments to be conflated. relative_count : bool, optional If True, divide the number of minimal pairs by the total count by the total number of words that contain either of the two segments. distinguish_homophones : bool, optional If False, then you'll count sock~shock (sock=clothing) and sock~shock (sock=punch) as just one minimal pair; but if True, you'll overcount alternative spellings of the same word, e.g. axel~actual and axle~actual. False is the value used by Wedel et al. stop_check : callable, optional Optional function to check whether to gracefully terminate early call_back : callable, optional Optional function to supply progress information during the function Returns ------- tuple(int or float, list) Tuple of: 0. if `relative_count`==False, an int of the raw number of minimal pairs; if `relative_count`==True, a float of that count divided by the total number of words in the corpus that include either `s1` or `s2`; and 1. list of minimal pairs. """ if stop_check is not None and stop_check(): return all_segments = list(itertools.chain.from_iterable(segment_pairs)) neutralized = [] if call_back is not None: call_back('Finding and neutralizing instances of segments...') call_back(0, len(corpus_context)) cur = 0 for w in corpus_context: if stop_check is not None and stop_check(): return if call_back is not None: cur += 1 if cur % 100 == 0: call_back(cur) tier = getattr(w, corpus_context.sequence_type) if any([s in tier for s in all_segments]): n = [neutralize_segment(seg, segment_pairs) for seg in tier] neutralized.append(('.'.join(n), w, tier)) if stop_check is not None and stop_check(): return minpairs = [] if call_back is not None: call_back('Counting minimal pairs...') call_back(0,factorial(len(neutralized))/(factorial(len(neutralized)-2)*2)) cur = 0 for first,second in itertools.combinations(neutralized, 2): if stop_check is not None and stop_check(): return if call_back is not None: cur += 1 if cur % 100 == 0: call_back(cur) if not matches(first,second): continue ordered_pair = sorted([(first[1],first[2]), (second[1], second[2])], key = lambda x: x[1]) minpairs.append(tuple(ordered_pair)) if not distinguish_homophones: actual_minpairs = {} for pair in minpairs: key = (pair[0][1], pair[1][1]) # Keys are tuples of tiers if key not in actual_minpairs: actual_minpairs[key] = (pair[0][0], pair[1][0]) # Values are words else: pair_freq = pair[0][0].frequency + pair[1][0].frequency existing_freq = actual_minpairs[key][0].frequency + \ actual_minpairs[key][1].frequency if pair_freq > existing_freq: actual_minpairs[key] = (pair[0][0], pair[1][0]) result = sum((x[0].frequency + x[1].frequency)/2 for x in actual_minpairs.values()) else: result = sum((x[0][0].frequency + x[1][0].frequency)/2 for x in minpairs) if relative_count and len(neutralized) > 0: result /= sum(x[1].frequency for x in neutralized) return (result, minpairs)
[docs]def deltah_fl(corpus_context, segment_pairs, stop_check = None, call_back = None): """Calculate the functional load of the contrast between between two segments as the decrease in corpus entropy caused by a merger. Parameters ---------- corpus_context : CorpusContext Context manager for a corpus segment_pairs : list of length-2 tuples of str The pairs of segments to be conflated. stop_check : callable, optional Optional function to check whether to gracefully terminate early call_back : callable, optional Optional function to supply progress information during the function Returns ------- float The difference between a) the entropy of the choice among non-homophonous words in the corpus before a merger of `s1` and `s2` and b) the entropy of that choice after the merger. """ if call_back is not None: call_back('Finding instances of segments...') call_back(0, len(corpus_context)) cur = 0 freq_sum = 0 original_probs = defaultdict(float) for w in corpus_context: if stop_check is not None and stop_check(): return if call_back is not None: cur += 1 if cur % 20 == 0: call_back(cur) f = w.frequency original_probs[getattr(w, corpus_context.sequence_type)] += f freq_sum += f original_probs = {k:v/freq_sum for k,v in original_probs.items()} if stop_check is not None and stop_check(): return preneutr_h = entropy(original_probs.values()) neutralized_probs = defaultdict(float) if call_back is not None: call_back('Neutralizing instances of segments...') call_back(0, len(list(original_probs.keys()))) cur = 0 for k,v in original_probs.items(): if stop_check is not None and stop_check(): return if call_back is not None: cur += 1 if cur % 100 == 0: call_back(cur) neutralized_probs['.'.join([neutralize_segment(s, segment_pairs) for s in k])] += v postneutr_h = entropy(neutralized_probs.values()) if stop_check is not None and stop_check(): return result = preneutr_h - postneutr_h if result < 1e-10: result = 0.0 return result
[docs]def relative_minpair_fl(corpus_context, segment, relative_count = True, distinguish_homophones = False, output_filename = None, stop_check = None, call_back = None): """Calculate the average functional load of the contrasts between a segment and all other segments, as a count of minimal pairs. Parameters ---------- corpus_context : CorpusContext Context manager for a corpus segment : str The target segment. relative_count : bool, optional If True, divide the number of minimal pairs by the total count by the total number of words that contain either of the two segments. distinguish_homophones : bool, optional If False, then you'll count sock~shock (sock=clothing) and sock~shock (sock=punch) as just one minimal pair; but if True, you'll overcount alternative spellings of the same word, e.g. axel~actual and axle~actual. False is the value used by Wedel et al. stop_check : callable, optional Optional function to check whether to gracefully terminate early call_back : callable, optional Optional function to supply progress information during the function Returns ------- int or float If `relative_count`==False, returns an int of the raw number of minimal pairs. If `relative_count`==True, returns a float of that count divided by the total number of words in the corpus that include either `s1` or `s2`. """ all_segments = corpus_context.inventory segment_pairs = [(segment,other) for other in all_segments if other != segment and other != '#'] results = [] to_output = [] for sp in segment_pairs: res = minpair_fl(corpus_context, [sp], relative_count = relative_count, distinguish_homophones = distinguish_homophones, stop_check = stop_check, call_back = call_back) results.append(res[0]) if output_filename is not None: to_output.append((sp, res[1])) if output_filename is not None: save_minimal_pairs(output_filename, to_output) return sum(results)/len(segment_pairs)
[docs]def relative_deltah_fl(corpus_context, segment, stop_check = None, call_back = None): """Calculate the average functional load of the contrasts between a segment and all other segments, as the decrease in corpus entropy caused by a merger. Parameters ---------- corpus_context : CorpusContext Context manager for a corpus segment : str The target segment. stop_check : callable, optional Optional function to check whether to gracefully terminate early call_back : callable, optional Optional function to supply progress information during the function Returns ------- float The difference between a) the entropy of the choice among non-homophonous words in the corpus before a merger of `s1` and `s2` and b) the entropy of that choice after the merger. """ all_segments = corpus_context.inventory segment_pairs = [(segment,other) for other in all_segments if other != segment and other != '#'] results = [] for sp in segment_pairs: results.append(deltah_fl(corpus_context, [sp], stop_check = stop_check, call_back = call_back)) return sum(results)/len(segment_pairs)
def collapse_segpairs_fl(corpus_context, **kwargs): func_type = kwargs.get('func_type') segment_pairs = kwargs.get('segment_pairs') relative_count = kwargs.get('relative_count') distinguish_homophones = kwargs.get('distinguish_homophones') if func_type == 'min_pairs': fl = minpair_fl(corpus_context, segment_pairs, relative_count, distinguish_homophones) elif func_type == 'entropy': fl = deltah_fl(corpus_context, segment_pairs) def individual_segpairs_fl(corpus_context, **kwargs): func_type = kwargs.get('func_type') segment_pairs = kwargs.get('segment_pairs') relative_count = kwargs.get('relative_count') distinguish_homophones = kwargs.get('distinguish_homophones') results = [] for pair in segment_pairs: if func_type == 'min_pairs': fl = minpair_fl(corpus_context, [pair], relative_count, distinguish_homophones) elif func_type == 'entropy': fl = deltah_fl(corpus_context, [pair]) results.append(fl) def entropy(probabilities): """Calculate the entropy of a choice from the provided probability distribution. Parameters --------- probabilities : list of floats Contains the probability of each item in the list. Returns ------- float Entropy """ return -(sum([p*log(p,2) if p > 0 else 0 for p in probabilities])) def neutralize_segment(segment, segment_pairs): for sp in segment_pairs: try: s = segment.symbol except AttributeError: s = segment if s in sp: return 'NEUTR:'+''.join(str(x) for x in sp) return s def all_pairwise_fls(corpus_context, relative_fl = False, algorithm = 'minpair', relative_count = True, distinguish_homophones = False): """Calculate the functional load of the contrast between two segments as a count of minimal pairs. Parameters ---------- corpus_context : CorpusContext Context manager for a corpus relative_fl : bool If False, return the FL for all segment pairs. If True, return the relative (average) FL for each segment. algorithm : str {'minpair', 'deltah'} Algorithm to use for calculating functional load: "minpair" for minimal pair count or "deltah" for change in entropy. relative_count : bool, optional If True, divide the number of minimal pairs by the total count by the total number of words that contain either of the two segments. distinguish_homophones : bool, optional If False, then you'll count sock~shock (sock=clothing) and sock~shock (sock=punch) as just one minimal pair; but if True, you'll overcount alternative spellings of the same word, e.g. axel~actual and axle~actual. False is the value used by Wedel et al. Returns ------- list of tuple(tuple(str, st), float) OR list of (str, float) Normally returns a list of all Segment pairs and their respective functional load values, as length-2 tuples ordered by FL. If calculating relative FL, returns a dictionary of each segment and its relative (average) FL, with entries ordered by FL. """ fls = {} total_calculations = ((((len(corpus_context.inventory)-1)**2)-len(corpus_context.inventory)-1)/2)+1 ct = 1 t = time.time() if '' in corpus_context.inventory: raise Exception('Warning: Calculation of functional load for all segment pairs requires that all items in corpus have a non-null transcription.') for i, s1 in enumerate(corpus_context.inventory[:-1]): for s2 in corpus_context.inventory[i+1:]: if s1 != '#' and s2 != '#': print('Performing FL calculation {} out of {} possible'.format(str(ct), str(total_calculations))) ct += 1 print('Duration of last calculation: {}'.format(str(time.time() - t))) t = time.time() if type(s1) != str: s1 = s1.symbol if type(s2) != str: s2 = s2.symbol if algorithm == 'minpair': fl = minpair_fl(corpus_context, [(s1, s2)], relative_count=relative_count, distinguish_homophones=distinguish_homophones)[0] elif algorithm == 'deltah': fl = deltah_fl(corpus_context, [(s1, s2)]) fls[(s1, s2)] = fl if not relative_fl: ordered_fls = sorted([(pair, fls[pair]) for pair in fls], key=lambda p: p[1], reverse=True) return ordered_fls elif relative_fl: rel_fls = {} for s in corpus_context.inventory: if type(s) != str: s = s.symbol if s != '#': total = 0.0 for pair in fls: if s == pair[0] or s == pair[1]: total += fls[pair] rel_fls[s] = total / (len(corpus_context.inventory) - 1) ordered_rel_fls = sorted([(s, rel_fls[s]) for s in rel_fls], key=lambda p: p[1], reverse=True) return ordered_rel_fls