eKorpus

Source code for ekorpus.controllers.tests

"""The :mod:`tests` contains various tests management methods."""
import os
import cgi
from ekorpus.lib.base import *
from ekorpus.models.form import *
from sqlalchemy import select,outerjoin,func,and_,or_
from ekorpus.models.schema import session_table, test_table, \
     response_table, responsekey_table, user_table, testuser_table

[docs]class TestsController(AuthorizedController): """ Test descriptin file format description: text instruction: text language: code choices: name, choice=code,choice=code, ... choices: name, choice=code,choice=code, ... item: choices, text item: choices, text """ def _store_test(self, f, test): selections = {} item_no = 0 line_no = 0 try: for line in f: line_no += 1 if not line.strip(): continue kind, val = line.split(':',1) if not val: continue kind = kind.strip() val = val.strip() if kind=='description': test.comment=val elif kind=='instruction': test.instruction=val elif kind=='language': test.lang=val #choices: name,choice=code,choice=code, ... elif kind=='choices': parts = val.split(',') name = parts[0].strip() if not name: print "Choices without name:", line continue if name in selections: selection = selections[name] else: selection = model.Selection(name, name[:8], test.lang) selections[name]=selection for choice in parts[1:]: text, code = choice.split('=',1) selection.choices.append(model.Choice(len(selection.choices)+1, text.strip(), code.strip())) #choice: name, code=text elif kind=='choice': name,choice = val.split(',',1) name = name.strip() if not name: print "Choice without name:", line continue if name in selections: selection = selections[name] else: selection = model.Selection(name, name[:8], test.lang) selections[name]=selection code, text = choice.split('=',1) selection.choices.append(model.Choice(len(selection.choices)+1, text.strip(), code.strip())) elif kind=='item': choices, text = val.split(',',1) if text: item_no = item_no+1 choices = choices.strip() if choices in selections: item = model.Item(item_no, text.strip(), choices) selections[choices].items.append(item) test.items.append(item) else: print "Unknown:", line except: print "Error:", line finally: f.close()
[docs] def list(self): """List tests available to a user """ uid = session['user_id'] lang = h.get_lang()[0] test=test_table.select() if not ('superuser' in session): test.append_whereclause( and_( test_table.c.id==testuser_table.c.test_id, or_(testuser_table.c.user_id==uid,testuser_table.c.user_id==1), test_table.c.lang==lang) ) test = test.alias('test') sess=session_table.select(session_table.c.user_id==uid).alias('session') q = outerjoin(test,sess) q = select(['test.id','test.comment','session.score'],from_obj=[q], order_by=['test.comment'], engine=model.ctx.current.get_bind(None)) c.tests = q.execute().fetchall() return render('/tests/list.mak')
list.roles = 'user'
[docs] def edit(self,id): """Generate edit form for a test *id*.""" c.test=model.Test.get_by(id=id) if not c.test: c.test=model.Test() c.test.id='' c.test.comment='' c.test.instruction='' c.test.text='' c.selections=model.Selection.select() sel=testuser_table.select(testuser_table.c.test_id==id).alias('t') sel=outerjoin(user_table,sel) sel=select( [user_table.c.id,user_table.c.name,'t.test_id'], from_obj=[sel], order_by=[user_table.c.name], engine=model.ctx.current.get_bind(None)) c.users = sel.execute().fetchall() return render('/tests/edit.mak')
edit.roles = 'admin'
[docs] def save(self,id): """Save data for a test *id*.""" exists = True tx = model.ctx.current.create_transaction() try: c.test=model.Test.get_by(id=id) p = request.params if not c.test: exists = False c.test=model.Test() if 'questions' in p: questions = p['questions'] if isinstance(questions,cgi.FieldStorage): self._store_test(questions.file, c.test) if ('comment' in p) and p['comment']: c.test.comment=p['comment'] if ('instruction' in p) and p['instruction']: c.test.instruction=p['instruction'] if ('text' in p) and p['text']: c.test.text=p['text'] if ('selection_id' in p) and p['selection_id']: sel=int(p['selection_id']) c.test.selection_id=sel else: raise if exists: con = model.ctx.current.connection(c.test.mapper) con.execute(testuser_table.delete( testuser_table.c.test_id==int(id))) users = p.getall('user_id[]') if users: for user in users: con.execute(testuser_table.insert(values=(id, user))) model.ctx.current.flush() except: tx.rollback() raise tx.commit() h.redirect_to(action='list')
save.roles = 'admin'
[docs] def build(self,id): """Generate a form for building a test interactively.""" c.recordings = model.Recording.select() c.test=model.Test.get_by(id=id) if not c.test: c.test=model.Test() c.test.id='' c.test.comment='' return render('/tests/build.mak')
build.roles = 'admin' #fragment
[docs] def addsegment(self,id): """Add an item to test.""" test=model.Test.get_by(id=id) sid=request.params.get('id',None) if sid: item = model.Item(1, "Test question", test.selection_id) item.first_id=int(sid) item.last_id=int(sid) test.items.append(item) model.ctx.current.flush() c.test=test return render('/tests/itemlist.mak')
addsegment.roles = 'admin' #fragment
[docs] def removesegment(self,id): """Remove an item from the test.""" test=model.Test.get_by(id=id) sid=request.params.get('id',None) if sid: if sid.startswith('i_'): sid=sid[2:] sid = int(sid) for s in test.items: if s.id==sid: s.delete() test.items.remove(s) break; model.ctx.current.flush() c.test=test return render('/tests/itemlist.mak')
removesegment.roles = 'admin' #fragment
[docs] def order(self,id): """Save the item order. Item ids are prefixed with `i_` or `s_`. Items with `s_` are new items. """ test=model.Test.get_by(id=id) items=test.items ids=request.params.getall('test[]') n=0 for elem in ids: nv = elem.split('_',1) if not nv[1]: continue item_id = int(nv[1]) if nv[0]=='i': for item in items: if item.id==item_id: n=n+1 item.nr=n break elif nv[0]=='s': n=n+1 item = model.Item(n, test.text, test.selection_id) item.first_id=item_id item.last_id=item.first_id items.append(item) model.ctx.current.flush() c.test=test items.sort(cmp=lambda x,y: cmp(x.nr, y.nr)) return render('/tests/itemlist.mak')
order.roles = 'admin'
[docs] def delete(self,id): """Delete a test.""" test=model.Test.get_by(id=id) if test: test.delete() model.ctx.current.flush() h.redirect_to(action='list')
delete.roles = 'admin'
[docs] def test(self, id): """Generate test for a session """ c.test=model.Test.get_by(id=id) responses={} sess=model.Session.get_by(test_id=id,user_id=session.get('user_id')) if sess: #filter responses, they are ordered by response.id for r in sess.responses: responses[r.item_id]=r #UNSORTED c.responses=responses return render('/tests/test.mak')
test.roles = 'user'
[docs] def show(self, id, uid): """Show the test *id* results for the user *uid*""" c.test=model.Test.get_by(id=id) sess=model.Session.get_by(test_id=id,user_id=uid) responses={} if sess: #filter responses, they are ordered by response.id for r in sess.responses: responses[r.item_id]=r c.responses=responses return render('/tests/test.mak')
show.roles = 'admin'
[docs] def legend(self, id): return render('/tests/legend.mak')
legend.roles = 'user'
[docs] def results(self,id): """Save test session results """ uid = session.get('user_id') sess = model.Session.get_by(test_id=id,user_id=uid) if not sess: sess=model.Session() sess.user_id = uid sess.test_id = id p = request.params rs = sess.responses newResponses = [] for k,v in p.iteritems(): try: kint = int(k) if kint in rs: old=rs[kint] if old.code!=v: #modified code newResponses.append(model.Response(kint,v)) else: #new response newResponses.append(model.Response(kint,v)) except TypeError: continue for r in newResponses: sess.responses.append(r) sess.score='#:'+str(len(sess.responses)) model.ctx.current.flush() # calculate score # select sum(value) # from response r, responsekey k # where session_id=4 and r.item_id=k.item_id and r.code=k.code ; if len(sess.responses): s = select( [func.sum(responsekey_table.c.value).label('score')], and_( response_table.c.item_id==responsekey_table.c.item_id, response_table.c.code==responsekey_table.c.code), engine=model.ctx.current.get_bind(None)) row = s.execute(session_id=sess.id).fetchone() if row and row[0]: sess.score = _('score')+': '+str(row[0]) model.ctx.current.flush() h.redirect_to(action='list')
results.roles = 'user'