Source code for ekorpus.controllers.recordings
"""The :mod:`recordings` handles all recordings management opoerations.
"""
import os
import cgi
import pylons
from ekorpus.lib.base import *
from ekorpus.models.form import *
import ekorpus.controllers.sounds as sounds
from ekorpus.textgrid import TextGrid
[docs]class RecordingsController(AuthorizedController):
[docs] def parse_time(self, start):
prts = start.split(':',2)
if len(prts)==2:
return float(prts[0])*60 + float(prts[1])
if len(prts)==3:
return (float(prts[0])*60 + float(prts[1]))*60 + float(prts[2])
return float(start)
def _store_segments(self, f, recording):
line_no = 0
type = unicode(f.readline(),'utf8')
line_no += 1
type=type.strip()
if "ooTextFile" in type:
tgd=TextGrid(f)
lines = tgd[0] #tier 0 #FIXME
segmentation = model.Segmentation("phoneme", '')
for line in lines:
segmentation.segments.append(
model.Segment(line[0],line[1]*1000,line[2]*1000))
recording.segmentations.append(segmentation)
return None
if type.startswith('type:'):
type=type[5:]
type=type.strip()
else:
type="sentence"
lines = []
l = ''
previous = 0.0
try:
try:
for line in f:
line_no += 1
l = unicode(line,'utf8').strip()
if not l: continue
start, text = l.split(None,1)
if start and text:
tm = self.parse_time(start)
if tm < previous:
return _("Line time less than on previous line. Line %r: %r") % (line_no,l)
previous = tm
lines.append(( tm*1000,text) )
except:
return _("Error on line %r: %r") % (line_no,l)
finally:
f.close()
segmentation = model.Segmentation(type, '')
for i in range(len(lines)-1):
segmentation.segments.append(
model.Segment(lines[i][1],lines[i][0],lines[i+1][0]))
recording.segmentations.append(segmentation)
return None
[docs] def index(self, rid, sid):
"""List all segments for the segmentation *sid*."""
c.recordings = model.Recording.select()
c.recording_id=int(rid)
c.segmentation_id=int(sid)
return render('/tests/segmenttree.mak')
[docs] def list(self):
"""List all recordings."""
c.recordings = model.Recording.select()
return render('/recordings/list.mak')
# user id
[docs] def add(self,id):
"""Create a new recording for a speaker *id* and show the editing form."""
if id:
c.user=model.User.get_by(id=id)
c.recording=model.Recording()
c.recording.id=0
c.recording.user_id=0
c.recording.comment=''
c.recording.rec_date=''
c.recording.file='dummy'
return render('/recordings/edit.mak')
add.roles='admin'
[docs] def edit(self,id):
"""Generate an editing form for the recording *id*."""
c.recording=model.Recording.get_by(id=id)
c.user=model.User.get_by(id=c.recording.user_id)
return render('/recordings/edit.mak')
edit.roles='admin'
[docs] def save(self,id):
"""Save the data for the recording *id*."""
p = request.params
c.recording=model.Recording.get_by(id=id)
if not c.recording:
c.recording=model.Recording()
c.recording.user_id=p['user_id']
c.recording.comment=p['comment']
if 'segments' in p:
segments = p['segments']
if isinstance(segments,cgi.FieldStorage):
rsp = self._store_segments(segments.file,c.recording)
segments.file.close()
if rsp:
return rsp
if not c.recording.id:
model.ctx.current.flush()
if 'file' in p:
#wave = request.POST['file']
wave = p['file']
if isinstance(wave,cgi.FieldStorage):
sounds.save(wave, c.recording)
model.ctx.current.flush()
h.redirect_to(action='list')
save.roles='admin'
[docs] def delete(self,id):
"""Delete all the data related to the recording *id*."""
recording=model.Recording.get_by(id=id)
if recording:
recording.delete()
model.ctx.current.flush()
h.redirect_to(action='list')
delete.roles='admin'
[docs] def segmentation(self,id):
"""Show the data for a segmentation with *id*."""
c.segmentation=model.Segmentation.get_by(id=id)
return render('/recordings/segmentation.mak')