Source code for corpustools.contextmanagers

from corpustools.exceptions import PCTError, PCTPythonError
import math
import collections
import copy
import operator

from corpustools.corpus.classes.lexicon import Word

from corpustools.exceptions import PCTContextError

def ensure_context(context):
    if not isinstance(context, BaseCorpusContext):
        raise(PCTContextError('Context manager required for here, please see API documentation for more details.'))

[docs]class BaseCorpusContext(object): """ Abstract Corpus context class that all other contexts inherit from. Parameters ---------- corpus : Corpus Corpus to form context from sequence_type : str Sequence type to evaluate algorithms on (i.e., 'transcription') type_or_token : str The type of frequency to use for calculations attribute : Attribute, optional Attribute to save results to for calculations involving all words in the Corpus frequency_threshold: float, optional If specified, ignore words below this token frequency """ def __init__(self, corpus, sequence_type, type_or_token, attribute = None, frequency_threshold = 0): self.sequence_type = sequence_type self.type_or_token = type_or_token self.corpus = corpus self.name = self.corpus.name self.attribute = attribute self._freq_base = {} self.length = None self.frequency_threshold = frequency_threshold @property def inventory(self): return self.corpus.inventory @property def specifier(self): return self.corpus.specifier def __enter__(self): if self.attribute is not None: self.corpus.add_attribute(self.attribute,initialize_defaults = False) return self def __len__(self): if self.length is not None: return self.length else: counter = 0 for w in self: counter += 1 self.length = counter return self.length
[docs] def get_frequency_base(self, gramsize = 1, halve_edges = False, probability = False): """ Generate (and cache) frequencies for each segment in the Corpus. Parameters ---------- halve_edges : boolean If True, word boundary symbols ('#') will only be counted once per word, rather than twice. Defaults to False. gramsize : integer Size of n-gram to use for getting frequency, defaults to 1 (unigram) probability : boolean If True, frequency counts will be normalized by total frequency, defaults to False Returns ------- dict Keys are segments (or sequences of segments) and values are their frequency in the Corpus """ if (gramsize) not in self._freq_base: freq_base = collections.defaultdict(float) for word in self: tier = getattr(word, self.sequence_type) if self.sequence_type.lower() == 'spelling': seq = ['#'] + [x for x in tier] + ['#'] else: seq = tier.with_word_boundaries() grams = zip(*[seq[i:] for i in range(gramsize)]) for x in grams: if len(x) == 1: x = x[0] freq_base[x] += word.frequency freq_base['total'] = sum(value for value in freq_base.values()) self._freq_base[(gramsize)] = freq_base freq_base = self._freq_base[(gramsize)] return_dict = { k:v for k,v in freq_base.items()} if halve_edges and '#' in return_dict: return_dict['#'] = (return_dict['#'] / 2) + 1 if not probability: return_dict['total'] -= return_dict['#'] - 2 if probability: return_dict = { k:v/freq_base['total'] for k,v in return_dict.items()} return return_dict
[docs] def get_phone_probs(self, gramsize = 1, probability = True, preserve_position = True, log_count = True): """ Generate (and cache) phonotactic probabilities for segments in the Corpus. Parameters ---------- gramsize : integer Size of n-gram to use for getting frequency, defaults to 1 (unigram) probability : boolean If True, frequency counts will be normalized by total frequency, defaults to False preserve_position : boolean If True, segments will in different positions in the transcription will not be collapsed, defaults to True log_count : boolean If True, token frequencies will be logrithmically-transformed prior to being summed Returns ------- dict Keys are segments (or sequences of segments) and values are their phonotactic probability in the Corpus """ if (gramsize, preserve_position, log_count) not in self._freq_base: freq_base = collections.defaultdict(float) totals = collections.defaultdict(float) for word in self: freq = word.frequency if self.type_or_token != 'type' and log_count: freq = math.log(freq) grams = zip(*[getattr(word, self.sequence_type)[i:] for i in range(gramsize)]) for i, x in enumerate(grams): #if len(x) == 1: # x = x[0] if preserve_position: x = (x,i) totals[i] += freq freq_base[x] += freq if not preserve_position: freq_base['total'] = sum(value for value in freq_base.values()) else: freq_base['total'] = totals self._freq_base[(gramsize, preserve_position, log_count)] = freq_base freq_base = self._freq_base[(gramsize,preserve_position, log_count)] return_dict = { k:v for k,v in freq_base.items()} if probability and not preserve_position: return_dict = { k:v/freq_base['total'] for k,v in return_dict.items()} elif probability: return_dict = { k:v/freq_base['total'][k[1]] for k,v in return_dict.items() if k != 'total'} return return_dict
def __exit__(self, exc_type, exc, exc_tb): if exc_type is None: return True else: if self.attribute is not None: self.corpus.remove_attribute(self.attribute)
[docs]class CanonicalVariantContext(BaseCorpusContext): """ Corpus context that uses canonical forms for transcriptions and tiers See the documentation of `BaseCorpusContext` for additional information """ def __exit__(self, exc_type, exc, exc_tb): BaseCorpusContext.__exit__(self, exc_type, exc, exc_tb) def __iter__(self): for word in self.corpus: if self.type_or_token == 'token' and word.frequency == 0: continue if self.frequency_threshold > 0 and word.frequency < self.frequency_threshold: continue w = copy.copy(word) if math.isnan(word.frequency): w.frequency = 0 elif self.type_or_token == 'type': w.frequency = 1 w.original = word yield w
[docs]class MostFrequentVariantContext(BaseCorpusContext): """ Corpus context that uses the most frequent pronunciation variants for transcriptions and tiers See the documentation of `BaseCorpusContext` for additional information """ def __enter__(self): self = BaseCorpusContext.__enter__(self) if not self.corpus.has_wordtokens: raise(PCTError('The corpus specified does not have variants.')) return self def __exit__(self, exc_type, exc, exc_tb): BaseCorpusContext.__exit__(self, exc_type, exc, exc_tb) def __iter__(self): for word in self.corpus: if self.type_or_token == 'token' and word.frequency == 0: continue if self.frequency_threshold > 0 and word.frequency < self.frequency_threshold: continue v = word.variants(self.sequence_type) w = copy.copy(word) if math.isnan(word.frequency): w.frequency = 0 if len(v.keys()) > 0: # Sort variants by most frequent v_sorted = sorted(v.items(), key=operator.itemgetter(1), reverse=True) if len(v_sorted) == 1: # There's only 1 variant setattr(w, self.sequence_type, v_sorted[0][0]) elif v_sorted[0][1] != v_sorted[1][1]: # There's only one most frequent variant setattr(w, self.sequence_type, v_sorted[0][0]) else: # There're variants tied for frequency highest_freq = v_sorted[0][1] v_candidates = list() for vv in v_sorted: if vv[1] != highest_freq: break else: v_candidates.append(vv[0]) if getattr(w, self.sequence_type) in v_candidates: # Use cannonical variant if it is one of most frequent pass else: v_longest1 = max(v_candidates, key=len) v_candidates.reverse() v_longest2 = max(v_candidates, key=len) if v_longest1 == v_longest2: setattr(w, self.sequence_type, v_longest1) # Use longest variant if one exists else: v_candidates = [vv for vv in v_candidates if len(vv) == len(v_longest1)] v_candidates = sorted(v_candidates) setattr(w, self.sequence_type, v_candidates[0]) # Use longest variant that is first alphabetically if self.type_or_token == 'type': w.frequency = 1 w.original = word yield w
[docs]class SeparatedTokensVariantContext(BaseCorpusContext): """ Corpus context that treats pronunciation variants as separate types for transcriptions and tiers See the documentation of `BaseCorpusContext` for additional information """ def __enter__(self): self = BaseCorpusContext.__enter__(self) if not self.corpus.has_wordtokens: raise(PCTError('The corpus specified does not have variants.')) return self def __exit__(self, exc_type, exc, exc_tb): BaseCorpusContext.__exit__(self, exc_type, exc, exc_tb) def __iter__(self): for word in self.corpus: if math.isnan(word.frequency): continue if self.type_or_token == 'token' and word.frequency == 0: continue if self.frequency_threshold > 0 and word.frequency < self.frequency_threshold: continue variants = word.variants(self.sequence_type) for v in variants: # Create a new word from each variant kwargs = {} if self.sequence_type == 'spelling': kwargs['spelling'] = v kwargs['transcription'] = word.transcription kwargs['frequency'] = variants[v] elif self.sequence_type == 'transcription': kwargs['spelling'] = word.spelling kwargs['transcription'] = v kwargs['frequency'] = variants[v] else: kwargs['spelling'] = word.spelling kwargs['transcription'] = word.transcription kwargs['frequency'] = variants[v] kwargs[self.sequence_type] = v if self.type_or_token == 'type': kwargs['frequency'] = 1 w = Word(**kwargs) yield w
[docs]class WeightedVariantContext(BaseCorpusContext): """ Corpus context that weights frequency of pronunciation variants by the number of variants or the token frequency for transcriptions and tiers See the documentation of `BaseCorpusContext` for additional information """ def __enter__(self): self = BaseCorpusContext.__enter__(self) if not self.corpus.has_wordtokens: raise(PCTError('The corpus specified does not have variants.')) return self def __exit__(self, exc_type, exc, exc_tb): BaseCorpusContext.__exit__(self, exc_type, exc, exc_tb) def __iter__(self): for word in self.corpus: if math.isnan(word.frequency): continue if self.type_or_token == 'token' and word.frequency == 0: continue if self.frequency_threshold > 0 and word.frequency < self.frequency_threshold: continue variants = word.variants(self.sequence_type) num_of_variants = len(variants) total_variants = sum(variants.values()) for v in variants: # Create a new word from each variant kwargs = {} if self.sequence_type == 'spelling': kwargs['spelling'] = v kwargs['transcription'] = word.transcription kwargs['frequency'] = variants[v]/total_variants elif self.sequence_type == 'transcription': kwargs['spelling'] = word.spelling kwargs['transcription'] = v kwargs['frequency'] = variants[v]/total_variants else: kwargs['spelling'] = word.spelling kwargs['transcription'] = word.transcription kwargs['frequency'] = variants[v]/total_variants kwargs[self.sequence_type] = v if self.type_or_token == 'type': kwargs['frequency'] = 1/num_of_variants w = Word(**kwargs) yield w