import re
from collections import defaultdict
from math import *
import itertools
import queue
import copy
from math import factorial
import time

from .io import save_minimal_pairs

def matches(first, second):
"""
Determine if two neutralized transcriptions are a minimal pair or not

Parameters
----------
first : tuple
Tuple of the neutralized sequence, the spelling of the word,
and the unneutralized sequence
second : tuple
Tuple of the neutralized sequence, the spelling of the word,
and the unneutralized sequence

Returns
-------
bool
Returns True if the neutralized sequences match, they both contain
neutralized segments, and the spellings and original transcriptions
are different; otherwise returns False
"""
return (first == second and first != second
and 'NEUTR:' in first and 'NEUTR:' in second
and first != second)

[docs]def minpair_fl(corpus_context, segment_pairs,
relative_count = True, distinguish_homophones = False,
stop_check = None, call_back = None):
"""Calculate the functional load of the contrast between two segments
as a count of minimal pairs.

Parameters
----------
corpus_context : CorpusContext
Context manager for a corpus
segment_pairs : list of length-2 tuples of str
The pairs of segments to be conflated.
relative_count : bool, optional
If True, divide the number of minimal pairs by the total count
by the total number of words that contain either of the two segments.
distinguish_homophones : bool, optional
If False, then you'll count sock~shock (sock=clothing) and
sock~shock (sock=punch) as just one minimal pair; but if True,
you'll overcount alternative spellings of the same word, e.g.
axel~actual and axle~actual. False is the value used by Wedel et al.
stop_check : callable, optional
Optional function to check whether to gracefully terminate early
call_back : callable, optional
Optional function to supply progress information during the function

Returns
-------
tuple(int or float, list)
Tuple of: 0. if relative_count==False, an int of the raw number of
minimal pairs; if relative_count==True, a float of that
count divided by the total number of words in the corpus that
include either s1 or s2; and 1. list of minimal pairs.
"""

if stop_check is not None and stop_check():
return
all_segments = list(itertools.chain.from_iterable(segment_pairs))

neutralized = []
if call_back is not None:
call_back('Finding and neutralizing instances of segments...')
call_back(0, len(corpus_context))
cur = 0
for w in corpus_context:
if stop_check is not None and stop_check():
return
if call_back is not None:
cur += 1
if cur % 100 == 0:
call_back(cur)
tier = getattr(w, corpus_context.sequence_type)
if any([s in tier for s in all_segments]):
n = [neutralize_segment(seg, segment_pairs)
for seg in tier]
neutralized.append(('.'.join(n), w, tier))
if stop_check is not None and stop_check():
return

minpairs = []
if call_back is not None:
call_back('Counting minimal pairs...')
call_back(0,factorial(len(neutralized))/(factorial(len(neutralized)-2)*2))
cur = 0
for first,second in itertools.combinations(neutralized, 2):
if stop_check is not None and stop_check():
return
if call_back is not None:
cur += 1
if cur % 100 == 0:
call_back(cur)
if not matches(first,second):
continue
ordered_pair = sorted([(first,first), (second, second)],
key = lambda x: x)
minpairs.append(tuple(ordered_pair))

if not distinguish_homophones:
actual_minpairs = {}
for pair in minpairs:
key = (pair, pair) # Keys are tuples of tiers
if key not in actual_minpairs:
actual_minpairs[key] = (pair, pair) # Values are words
else:
pair_freq = pair.frequency + pair.frequency
existing_freq = actual_minpairs[key].frequency + \
actual_minpairs[key].frequency
if pair_freq > existing_freq:
actual_minpairs[key] = (pair, pair)
result = sum((x.frequency + x.frequency)/2
for x in actual_minpairs.values())
else:
result = sum((x.frequency + x.frequency)/2 for x in minpairs)

if relative_count and len(neutralized) > 0:
result /= sum(x.frequency for x in neutralized)
return (result, minpairs)

[docs]def deltah_fl(corpus_context, segment_pairs,
stop_check = None, call_back = None):
"""Calculate the functional load of the contrast between between two
segments as the decrease in corpus entropy caused by a merger.

Parameters
----------
corpus_context : CorpusContext
Context manager for a corpus
segment_pairs : list of length-2 tuples of str
The pairs of segments to be conflated.
stop_check : callable, optional
Optional function to check whether to gracefully terminate early
call_back : callable, optional
Optional function to supply progress information during the function

Returns
-------
float
The difference between a) the entropy of the choice among
non-homophonous words in the corpus before a merger of s1
and s2 and b) the entropy of that choice after the merger.
"""
if call_back is not None:
call_back('Finding instances of segments...')
call_back(0, len(corpus_context))
cur = 0
freq_sum = 0
original_probs = defaultdict(float)
for w in corpus_context:
if stop_check is not None and stop_check():
return
if call_back is not None:
cur += 1
if cur % 20 == 0:
call_back(cur)

f = w.frequency

original_probs[getattr(w, corpus_context.sequence_type)] += f
freq_sum += f

original_probs = {k:v/freq_sum for k,v in original_probs.items()}

if stop_check is not None and stop_check():
return
preneutr_h = entropy(original_probs.values())

neutralized_probs = defaultdict(float)
if call_back is not None:
call_back('Neutralizing instances of segments...')
call_back(0, len(list(original_probs.keys())))
cur = 0
for k,v in original_probs.items():
if stop_check is not None and stop_check():
return
if call_back is not None:
cur += 1
if cur % 100 == 0:
call_back(cur)
neutralized_probs['.'.join([neutralize_segment(s, segment_pairs) for s in k])] += v
postneutr_h = entropy(neutralized_probs.values())

if stop_check is not None and stop_check():
return
result = preneutr_h - postneutr_h
if result < 1e-10:
result = 0.0

return result

[docs]def relative_minpair_fl(corpus_context, segment,
relative_count = True, distinguish_homophones = False,
output_filename = None,
stop_check = None, call_back = None):
"""Calculate the average functional load of the contrasts between a
segment and all other segments, as a count of minimal pairs.

Parameters
----------
corpus_context : CorpusContext
Context manager for a corpus
segment : str
The target segment.
relative_count : bool, optional
If True, divide the number of minimal pairs by the total count
by the total number of words that contain either of the two segments.
distinguish_homophones : bool, optional
If False, then you'll count sock~shock (sock=clothing) and
sock~shock (sock=punch) as just one minimal pair; but if True,
you'll overcount alternative spellings of the same word, e.g.
axel~actual and axle~actual. False is the value used by Wedel et al.
stop_check : callable, optional
Optional function to check whether to gracefully terminate early
call_back : callable, optional
Optional function to supply progress information during the function

Returns
-------
int or float
If relative_count==False, returns an int of the raw number of
minimal pairs. If relative_count==True, returns a float of
that count divided by the total number of words in the corpus
that include either s1 or s2.
"""
all_segments = corpus_context.inventory
segment_pairs = [(segment,other) for other in all_segments
if other != segment and other != '#']
results = []
to_output = []
for sp in segment_pairs:
res = minpair_fl(corpus_context, [sp],
relative_count = relative_count,
distinguish_homophones = distinguish_homophones,
stop_check = stop_check, call_back = call_back)
results.append(res)

if output_filename is not None:
to_output.append((sp, res))
if output_filename is not None:
save_minimal_pairs(output_filename, to_output)
return sum(results)/len(segment_pairs)

[docs]def relative_deltah_fl(corpus_context, segment,
stop_check = None, call_back = None):
"""Calculate the average functional load of the contrasts between a
segment and all other segments, as the decrease in corpus entropy
caused by a merger.

Parameters
----------
corpus_context : CorpusContext
Context manager for a corpus
segment : str
The target segment.
stop_check : callable, optional
Optional function to check whether to gracefully terminate early
call_back : callable, optional
Optional function to supply progress information during the function

Returns
-------
float
The difference between a) the entropy of the choice among
non-homophonous words in the corpus before a merger of s1
and s2 and b) the entropy of that choice after the merger.
"""
all_segments = corpus_context.inventory
segment_pairs = [(segment,other) for other in all_segments
if other != segment and other != '#']
results = []
for sp in segment_pairs:
results.append(deltah_fl(corpus_context, [sp],
stop_check = stop_check, call_back = call_back))
return sum(results)/len(segment_pairs)

def collapse_segpairs_fl(corpus_context, **kwargs):
func_type = kwargs.get('func_type')
segment_pairs = kwargs.get('segment_pairs')
relative_count = kwargs.get('relative_count')
distinguish_homophones = kwargs.get('distinguish_homophones')
if func_type == 'min_pairs':
fl = minpair_fl(corpus_context, segment_pairs,
relative_count, distinguish_homophones)
elif func_type == 'entropy':
fl = deltah_fl(corpus_context, segment_pairs)

def individual_segpairs_fl(corpus_context, **kwargs):
func_type = kwargs.get('func_type')
segment_pairs = kwargs.get('segment_pairs')
relative_count = kwargs.get('relative_count')
distinguish_homophones = kwargs.get('distinguish_homophones')

results = []
for pair in segment_pairs:
if func_type == 'min_pairs':
fl = minpair_fl(corpus_context, [pair],
relative_count, distinguish_homophones)
elif func_type == 'entropy':
fl = deltah_fl(corpus_context, [pair])
results.append(fl)

def entropy(probabilities):
"""Calculate the entropy of a choice from the provided probability distribution.

Parameters
---------
probabilities : list of floats
Contains the probability of each item in the list.

Returns
-------
float
Entropy
"""
return -(sum([p*log(p,2) if p > 0 else 0 for p in probabilities]))

def neutralize_segment(segment, segment_pairs):
for sp in segment_pairs:
try:
s = segment.symbol
except AttributeError:
s = segment
if s in sp:
return 'NEUTR:'+''.join(str(x) for x in sp)
return s

def all_pairwise_fls(corpus_context, relative_fl = False,
algorithm = 'minpair',
relative_count = True, distinguish_homophones = False):
"""Calculate the functional load of the contrast between two segments as a count of minimal pairs.

Parameters
----------
corpus_context : CorpusContext
Context manager for a corpus
relative_fl : bool
If False, return the FL for all segment pairs. If True, return
the relative (average) FL for each segment.
algorithm : str {'minpair', 'deltah'}
Algorithm to use for calculating functional load: "minpair" for
minimal pair count or "deltah" for change in entropy.
relative_count : bool, optional
If True, divide the number of minimal pairs by the total count
by the total number of words that contain either of the two segments.
distinguish_homophones : bool, optional
If False, then you'll count sock~shock (sock=clothing) and
sock~shock (sock=punch) as just one minimal pair; but if True,
you'll overcount alternative spellings of the same word, e.g.
axel~actual and axle~actual. False is the value used by Wedel et al.

Returns
-------
list of tuple(tuple(str, st), float)
OR
list of (str, float)
Normally returns a list of all Segment pairs and their respective functional load values, as length-2 tuples ordered by FL.
If calculating relative FL, returns a dictionary of each segment and its relative (average) FL, with entries ordered by FL.
"""
fls = {}
total_calculations = ((((len(corpus_context.inventory)-1)**2)-len(corpus_context.inventory)-1)/2)+1
ct = 1
t = time.time()
if '' in corpus_context.inventory:
raise Exception('Warning: Calculation of functional load for all segment pairs requires that all items in corpus have a non-null transcription.')
for i, s1 in enumerate(corpus_context.inventory[:-1]):
for s2 in corpus_context.inventory[i+1:]:
if s1 != '#' and s2 != '#':
print('Performing FL calculation {} out of {} possible'.format(str(ct), str(total_calculations)))
ct += 1
print('Duration of last calculation: {}'.format(str(time.time() - t)))
t = time.time()
if type(s1) != str:
s1 = s1.symbol
if type(s2) != str:
s2 = s2.symbol
if algorithm == 'minpair':
fl = minpair_fl(corpus_context, [(s1, s2)],
relative_count=relative_count,
distinguish_homophones=distinguish_homophones)
elif algorithm == 'deltah':
fl = deltah_fl(corpus_context, [(s1, s2)])
fls[(s1, s2)] = fl
if not relative_fl:
ordered_fls = sorted([(pair, fls[pair]) for pair in fls], key=lambda p: p, reverse=True)
return ordered_fls
elif relative_fl:
rel_fls = {}
for s in corpus_context.inventory:
if type(s) != str:
s = s.symbol
if s != '#':
total = 0.0
for pair in fls:
if s == pair or s == pair:
total += fls[pair]
rel_fls[s] = total / (len(corpus_context.inventory) - 1)
ordered_rel_fls = sorted([(s, rel_fls[s]) for s in rel_fls], key=lambda p: p, reverse=True)
return ordered_rel_fls