import os
import re
import sys
FILLERS = set(['uh','um','okay','yes','yeah','oh','heh','yknow','um-huh',
'uh-uh','uh-huh','uh-hum','mm-hmm'])
from corpustools.corpus.classes import SpontaneousSpeechCorpus
from .helper import DiscourseData,data_to_discourse, AnnotationType, Annotation, BaseAnnotation, find_wav_path
from corpustools.corpus.io.binary import load_binary
def phone_match(one,two):
if one != two and one not in two:
return False
return True
[docs]def inspect_discourse_multiple_files(word_path, dialect):
"""
Generate a list of AnnotationTypes for a specified dialect
Parameters
----------
word_path : str
Full path to text file
dialect : str
Either 'buckeye' or 'timit'
Returns
-------
list of AnnotationTypes
Autodetected AnnotationTypes for the dialect
"""
if dialect == 'buckeye':
annotation_types = [AnnotationType('spelling', 'surface_transcription', None, anchor = True),
AnnotationType('transcription', None, 'spelling', base = True, token = False),
AnnotationType('surface_transcription', None, 'spelling', base = True, token = True),
AnnotationType('category', None, 'spelling', base = False, token = True)]
elif dialect == 'timit':
annotation_types = [AnnotationType('spelling', 'transcription', None, anchor = True),
AnnotationType('transcription', None, 'spelling', base = True, token = True)]
else:
raise(NotImplementedError)
return annotation_types
def multiple_files_to_data(word_path, phone_path, dialect, annotation_types = None,
call_back = None, stop_check = None):
if annotation_types is None:
annotation_types = inspect_discourse_multiple_files(word_path, dialect)
for a in annotation_types:
a.reset()
name = os.path.splitext(os.path.split(word_path)[1])[0]
if call_back is not None:
call_back('Reading files...')
call_back(0,0)
words = read_words(word_path, dialect)
phones = read_phones(phone_path, dialect)
data = DiscourseData(name, annotation_types)
if call_back is not None:
call_back('Parsing files...')
call_back(0,len(words))
cur = 0
for i, w in enumerate(words):
if stop_check is not None and stop_check():
return
if call_back is not None:
cur += 1
if cur % 20 == 0:
call_back(cur)
annotations = {}
word = Annotation()
word.label = w['spelling']
beg = w['begin']
end = w['end']
if dialect == 'timit':
found_all = False
found = []
while not found_all:
p = phones.pop(0)
if p.begin < beg:
continue
found.append(p)
if p.end == end:
found_all = True
n = 'transcription'
level_count = data.level_length(n)
word.references.append(n)
word.begins.append(level_count)
word.ends.append(level_count + len(found))
annotations[n] = found
elif dialect == 'buckeye':
if w['transcription'] is None:
for n in data.base_levels:
level_count = data.level_length(n)
word.references.append(n)
word.begins.append(level_count)
word.ends.append(level_count)
else:
for n in data.base_levels:
if data[n].token:
expected = w[n]
found = []
while len(found) < len(expected):
cur_phone = phones.pop(0)
if phone_match(cur_phone.label,expected[len(found)]) \
and cur_phone.end >= beg and cur_phone.begin <= end:
found.append(cur_phone)
if not len(phones) and i < len(words)-1:
print(name)
print(w)
raise(Exception)
else:
found = [BaseAnnotation(x) for x in w[n]]
level_count = data.level_length(n)
word.references.append(n)
word.begins.append(level_count)
word.ends.append(level_count+len(found))
annotations[n] = found
for at in annotation_types:
if at.ignored:
continue
if at.base:
continue
if at.anchor:
continue
value = w[at.name]
if at.delimited:
value = [Annotation(x) for x in parse_transcription(ti.mark)]
if at.token:
word.token[at.name] = value
else:
word.additional[at.name] = value
annotations[data.word_levels[0]] = [word]
data.add_annotations(**annotations)
return data
[docs]def load_directory_multiple_files(corpus_name, path, dialect,
annotation_types = None,
feature_system_path = None,
stop_check = None, call_back = None):
"""
Loads a directory of corpus standard files (separated into words files
and phones files)
Parameters
----------
corpus_name : str
Name of corpus
path : str
Path to directory of text files
dialect : str
One of 'buckeye' or 'timit'
annotation_types : list of AnnotationType, optional
List of AnnotationType specifying how to parse the glosses.
Auto-generated based on dialect.
feature_system_path : str, optional
File path of FeatureMatrix binary to specify segments
stop_check : callable or None
Optional function to check whether to gracefully terminate early
call_back : callable or None
Optional function to supply progress information during the loading
Returns
-------
SpontaneousSpeechCorpus
Corpus containing Discourses corresponding to the text files
"""
if call_back is not None:
call_back('Finding files...')
call_back(0, 0)
file_tuples = []
for root, subdirs, files in os.walk(path):
for filename in files:
if stop_check is not None and stop_check():
return
if not (filename.lower().endswith('.words') or filename.lower().endswith('.wrd')):
continue
file_tuples.append((root, filename))
if call_back is not None:
call_back('Parsing files...')
call_back(0,len(file_tuples))
cur = 0
corpus = SpontaneousSpeechCorpus(corpus_name, path)
for i, t in enumerate(file_tuples):
if stop_check is not None and stop_check():
return
if call_back is not None:
call_back('Parsing file {} of {}...'.format(i+1, len(file_tuples)))
call_back(i)
root, filename = t
name,ext = os.path.splitext(filename)
if ext == '.words':
phone_ext = '.phones'
else:
phone_ext = '.phn'
word_path = os.path.join(root,filename)
phone_path = os.path.splitext(word_path)[0] + phone_ext
d = load_discourse_multiple_files(name, word_path, phone_path,
dialect, annotation_types,
corpus.lexicon, None,
stop_check, None)
corpus.add_discourse(d)
if feature_system_path is not None:
feature_matrix = load_binary(feature_system_path)
corpus.lexicon.set_feature_matrix(feature_matrix)
return corpus
[docs]def load_discourse_multiple_files(corpus_name, word_path, phone_path, dialect,
annotation_types = None,
lexicon = None,
feature_system_path = None,
stop_check = None, call_back = None):
"""
Load a discourse from a text file containing interlinear glosses
Parameters
----------
corpus_name : str
Informative identifier to refer to corpus
word_path : str
Full path to words text file
phone_path : str
Full path to phones text file
dialect : str
One of 'buckeye' or 'timit'
annotation_types : list of AnnotationType, optional
List of AnnotationType specifying how to parse the glosses.
Auto-generated based on dialect.
lexicon : Corpus, optional
Corpus to store Discourse word information
feature_system_path : str
Full path to pickled FeatureMatrix to use with the Corpus
stop_check : callable or None
Optional function to check whether to gracefully terminate early
call_back : callable or None
Optional function to supply progress information during the loading
Returns
-------
Discourse
Discourse object generated from the text file
"""
data = multiple_files_to_data(word_path,phone_path, dialect,
annotation_types,
call_back, stop_check)
data.name = corpus_name
data.wav_path = find_wav_path(word_path)
discourse = data_to_discourse(data, lexicon)
if feature_system_path is not None:
feature_matrix = load_binary(feature_system_path)
discourse.lexicon.set_feature_matrix(feature_matrix)
return discourse
def read_phones(path, dialect, sr = None):
output = []
with open(path,'r') as file_handle:
if dialect == 'timit':
if sr is None:
sr = 16000
for line in file_handle:
l = line.strip().split(' ')
start = float(l[0])
end = float(l[1])
label = l[2]
if sr is not None:
start /= sr
end /= sr
output.append(BaseAnnotation(label, begin, end))
elif dialect == 'buckeye':
header_pattern = re.compile("#\r{0,1}\n")
line_pattern = re.compile("\s+\d{3}\s+")
label_pattern = re.compile(" {0,1};| {0,1}\+")
f = header_pattern.split(file_handle.read())[1]
flist = f.splitlines()
begin = 0.0
for l in flist:
line = line_pattern.split(l.strip())
end = float(line[0])
label = sys.intern(label_pattern.split(line[1])[0])
output.append(BaseAnnotation(label, begin, end))
begin = end
else:
raise(NotImplementedError)
return output
def read_words(path, dialect, sr = None):
output = list()
with open(path,'r') as file_handle:
if dialect == 'timit':
for line in file_handle:
l = line.strip().split(' ')
start = float(l[0])
end = float(l[1])
word = l[2]
if sr is not None:
start /= sr
end /= sr
output.append({'spelling':word, 'begin':start, 'end':end})
elif dialect == 'buckeye':
f = re.split(r"#\r{0,1}\n",file_handle.read())[1]
line_pattern = re.compile("; | \d{3} ")
begin = 0.0
flist = f.splitlines()
for l in flist:
line = line_pattern.split(l.strip())
end = float(line[0])
word = sys.intern(line[1])
if word[0] != "<" and word[0] != "{":
citation = line[2].split(' ')
phonetic = line[3].split(' ')
category = line[4]
else:
citation = None
phonetic = None
category = None
if word in FILLERS:
category = 'UH'
line = {'spelling':word,'begin':begin,'end':end,
'transcription':citation,'surface_transcription':phonetic,
'category':category}
output.append(line)
begin = end
else:
raise(NotImplementedError)
return output