eKorpus

Source code for ekorpus.controllers.recordings

"""The :mod:`recordings` handles all recordings management opoerations.
"""
import os
import cgi
import pylons
from ekorpus.lib.base import *
from ekorpus.models.form import *
import ekorpus.controllers.sounds as sounds
from ekorpus.textgrid import TextGrid

[docs]class RecordingsController(AuthorizedController):
[docs] def parse_time(self, start): prts = start.split(':',2) if len(prts)==2: return float(prts[0])*60 + float(prts[1]) if len(prts)==3: return (float(prts[0])*60 + float(prts[1]))*60 + float(prts[2]) return float(start)
def _store_segments(self, f, recording): line_no = 0 type = unicode(f.readline(),'utf8') line_no += 1 type=type.strip() if "ooTextFile" in type: tgd=TextGrid(f) lines = tgd[0] #tier 0 #FIXME segmentation = model.Segmentation("phoneme", '') for line in lines: segmentation.segments.append( model.Segment(line[0],line[1]*1000,line[2]*1000)) recording.segmentations.append(segmentation) return None if type.startswith('type:'): type=type[5:] type=type.strip() else: type="sentence" lines = [] l = '' previous = 0.0 try: try: for line in f: line_no += 1 l = unicode(line,'utf8').strip() if not l: continue start, text = l.split(None,1) if start and text: tm = self.parse_time(start) if tm < previous: return _("Line time less than on previous line. Line %r: %r") % (line_no,l) previous = tm lines.append(( tm*1000,text) ) except: return _("Error on line %r: %r") % (line_no,l) finally: f.close() segmentation = model.Segmentation(type, '') for i in range(len(lines)-1): segmentation.segments.append( model.Segment(lines[i][1],lines[i][0],lines[i+1][0])) recording.segmentations.append(segmentation) return None
[docs] def index(self, rid, sid): """List all segments for the segmentation *sid*.""" c.recordings = model.Recording.select() c.recording_id=int(rid) c.segmentation_id=int(sid) return render('/tests/segmenttree.mak')
[docs] def list(self): """List all recordings.""" c.recordings = model.Recording.select() return render('/recordings/list.mak') # user id
[docs] def add(self,id): """Create a new recording for a speaker *id* and show the editing form.""" if id: c.user=model.User.get_by(id=id) c.recording=model.Recording() c.recording.id=0 c.recording.user_id=0 c.recording.comment='' c.recording.rec_date='' c.recording.file='dummy' return render('/recordings/edit.mak')
add.roles='admin'
[docs] def edit(self,id): """Generate an editing form for the recording *id*.""" c.recording=model.Recording.get_by(id=id) c.user=model.User.get_by(id=c.recording.user_id) return render('/recordings/edit.mak')
edit.roles='admin'
[docs] def save(self,id): """Save the data for the recording *id*.""" p = request.params c.recording=model.Recording.get_by(id=id) if not c.recording: c.recording=model.Recording() c.recording.user_id=p['user_id'] c.recording.comment=p['comment'] if 'segments' in p: segments = p['segments'] if isinstance(segments,cgi.FieldStorage): rsp = self._store_segments(segments.file,c.recording) segments.file.close() if rsp: return rsp if not c.recording.id: model.ctx.current.flush() if 'file' in p: #wave = request.POST['file'] wave = p['file'] if isinstance(wave,cgi.FieldStorage): sounds.save(wave, c.recording) model.ctx.current.flush() h.redirect_to(action='list')
save.roles='admin'
[docs] def delete(self,id): """Delete all the data related to the recording *id*.""" recording=model.Recording.get_by(id=id) if recording: recording.delete() model.ctx.current.flush() h.redirect_to(action='list')
delete.roles='admin'
[docs] def segmentation(self,id): """Show the data for a segmentation with *id*.""" c.segmentation=model.Segmentation.get_by(id=id) return render('/recordings/segmentation.mak')