"""The :mod:`tests` contains various tests management methods."""
import os
import cgi
from ekorpus.lib.base import *
from ekorpus.models.form import *
from sqlalchemy import select,outerjoin,func,and_,or_
from ekorpus.models.schema import session_table, test_table, \
response_table, responsekey_table, user_table, testuser_table
[docs]class TestsController(AuthorizedController):
"""
Test descriptin file format
description: text
instruction: text
language: code
choices: name, choice=code,choice=code, ...
choices: name, choice=code,choice=code, ...
item: choices, text
item: choices, text
"""
def _store_test(self, f, test):
selections = {}
item_no = 0
line_no = 0
try:
for line in f:
line_no += 1
if not line.strip():
continue
kind, val = line.split(':',1)
if not val: continue
kind = kind.strip()
val = val.strip()
if kind=='description':
test.comment=val
elif kind=='instruction':
test.instruction=val
elif kind=='language':
test.lang=val
#choices: name,choice=code,choice=code, ...
elif kind=='choices':
parts = val.split(',')
name = parts[0].strip()
if not name:
print "Choices without name:", line
continue
if name in selections:
selection = selections[name]
else:
selection = model.Selection(name, name[:8], test.lang)
selections[name]=selection
for choice in parts[1:]:
text, code = choice.split('=',1)
selection.choices.append(model.Choice(len(selection.choices)+1, text.strip(), code.strip()))
#choice: name, code=text
elif kind=='choice':
name,choice = val.split(',',1)
name = name.strip()
if not name:
print "Choice without name:", line
continue
if name in selections:
selection = selections[name]
else:
selection = model.Selection(name, name[:8], test.lang)
selections[name]=selection
code, text = choice.split('=',1)
selection.choices.append(model.Choice(len(selection.choices)+1, text.strip(), code.strip()))
elif kind=='item':
choices, text = val.split(',',1)
if text:
item_no = item_no+1
choices = choices.strip()
if choices in selections:
item = model.Item(item_no, text.strip(), choices)
selections[choices].items.append(item)
test.items.append(item)
else:
print "Unknown:", line
except:
print "Error:", line
finally:
f.close()
[docs] def list(self):
"""List tests available to a user
"""
uid = session['user_id']
lang = h.get_lang()[0]
test=test_table.select()
if not ('superuser' in session):
test.append_whereclause(
and_(
test_table.c.id==testuser_table.c.test_id,
or_(testuser_table.c.user_id==uid,testuser_table.c.user_id==1),
test_table.c.lang==lang)
)
test = test.alias('test')
sess=session_table.select(session_table.c.user_id==uid).alias('session')
q = outerjoin(test,sess)
q = select(['test.id','test.comment','session.score'],from_obj=[q], order_by=['test.comment'],
engine=model.ctx.current.get_bind(None))
c.tests = q.execute().fetchall()
return render('/tests/list.mak')
list.roles = 'user'
[docs] def edit(self,id):
"""Generate edit form for a test *id*."""
c.test=model.Test.get_by(id=id)
if not c.test:
c.test=model.Test()
c.test.id=''
c.test.comment=''
c.test.instruction=''
c.test.text=''
c.selections=model.Selection.select()
sel=testuser_table.select(testuser_table.c.test_id==id).alias('t')
sel=outerjoin(user_table,sel)
sel=select(
[user_table.c.id,user_table.c.name,'t.test_id'],
from_obj=[sel],
order_by=[user_table.c.name],
engine=model.ctx.current.get_bind(None))
c.users = sel.execute().fetchall()
return render('/tests/edit.mak')
edit.roles = 'admin'
[docs] def save(self,id):
"""Save data for a test *id*."""
exists = True
tx = model.ctx.current.create_transaction()
try:
c.test=model.Test.get_by(id=id)
p = request.params
if not c.test:
exists = False
c.test=model.Test()
if 'questions' in p:
questions = p['questions']
if isinstance(questions,cgi.FieldStorage):
self._store_test(questions.file, c.test)
if ('comment' in p) and p['comment']:
c.test.comment=p['comment']
if ('instruction' in p) and p['instruction']:
c.test.instruction=p['instruction']
if ('text' in p) and p['text']:
c.test.text=p['text']
if ('selection_id' in p) and p['selection_id']:
sel=int(p['selection_id'])
c.test.selection_id=sel
else:
raise
if exists:
con = model.ctx.current.connection(c.test.mapper)
con.execute(testuser_table.delete(
testuser_table.c.test_id==int(id)))
users = p.getall('user_id[]')
if users:
for user in users:
con.execute(testuser_table.insert(values=(id, user)))
model.ctx.current.flush()
except:
tx.rollback()
raise
tx.commit()
h.redirect_to(action='list')
save.roles = 'admin'
[docs] def build(self,id):
"""Generate a form for building a test interactively."""
c.recordings = model.Recording.select()
c.test=model.Test.get_by(id=id)
if not c.test:
c.test=model.Test()
c.test.id=''
c.test.comment=''
return render('/tests/build.mak')
build.roles = 'admin'
#fragment
[docs] def addsegment(self,id):
"""Add an item to test."""
test=model.Test.get_by(id=id)
sid=request.params.get('id',None)
if sid:
item = model.Item(1, "Test question", test.selection_id)
item.first_id=int(sid)
item.last_id=int(sid)
test.items.append(item)
model.ctx.current.flush()
c.test=test
return render('/tests/itemlist.mak')
addsegment.roles = 'admin'
#fragment
[docs] def removesegment(self,id):
"""Remove an item from the test."""
test=model.Test.get_by(id=id)
sid=request.params.get('id',None)
if sid:
if sid.startswith('i_'):
sid=sid[2:]
sid = int(sid)
for s in test.items:
if s.id==sid:
s.delete()
test.items.remove(s)
break;
model.ctx.current.flush()
c.test=test
return render('/tests/itemlist.mak')
removesegment.roles = 'admin'
#fragment
[docs] def order(self,id):
"""Save the item order. Item ids are prefixed with `i_` or `s_`.
Items with `s_` are new items.
"""
test=model.Test.get_by(id=id)
items=test.items
ids=request.params.getall('test[]')
n=0
for elem in ids:
nv = elem.split('_',1)
if not nv[1]:
continue
item_id = int(nv[1])
if nv[0]=='i':
for item in items:
if item.id==item_id:
n=n+1
item.nr=n
break
elif nv[0]=='s':
n=n+1
item = model.Item(n, test.text, test.selection_id)
item.first_id=item_id
item.last_id=item.first_id
items.append(item)
model.ctx.current.flush()
c.test=test
items.sort(cmp=lambda x,y: cmp(x.nr, y.nr))
return render('/tests/itemlist.mak')
order.roles = 'admin'
[docs] def delete(self,id):
"""Delete a test."""
test=model.Test.get_by(id=id)
if test:
test.delete()
model.ctx.current.flush()
h.redirect_to(action='list')
delete.roles = 'admin'
[docs] def test(self, id):
"""Generate test for a session
"""
c.test=model.Test.get_by(id=id)
responses={}
sess=model.Session.get_by(test_id=id,user_id=session.get('user_id'))
if sess:
#filter responses, they are ordered by response.id
for r in sess.responses:
responses[r.item_id]=r #UNSORTED
c.responses=responses
return render('/tests/test.mak')
test.roles = 'user'
[docs] def show(self, id, uid):
"""Show the test *id* results for the user *uid*"""
c.test=model.Test.get_by(id=id)
sess=model.Session.get_by(test_id=id,user_id=uid)
responses={}
if sess:
#filter responses, they are ordered by response.id
for r in sess.responses:
responses[r.item_id]=r
c.responses=responses
return render('/tests/test.mak')
show.roles = 'admin'
[docs] def legend(self, id):
return render('/tests/legend.mak')
legend.roles = 'user'
[docs] def results(self,id):
"""Save test session results
"""
uid = session.get('user_id')
sess = model.Session.get_by(test_id=id,user_id=uid)
if not sess:
sess=model.Session()
sess.user_id = uid
sess.test_id = id
p = request.params
rs = sess.responses
newResponses = []
for k,v in p.iteritems():
try:
kint = int(k)
if kint in rs:
old=rs[kint]
if old.code!=v: #modified code
newResponses.append(model.Response(kint,v))
else: #new response
newResponses.append(model.Response(kint,v))
except TypeError:
continue
for r in newResponses:
sess.responses.append(r)
sess.score='#:'+str(len(sess.responses))
model.ctx.current.flush()
# calculate score
# select sum(value)
# from response r, responsekey k
# where session_id=4 and r.item_id=k.item_id and r.code=k.code ;
if len(sess.responses):
s = select(
[func.sum(responsekey_table.c.value).label('score')],
and_(
response_table.c.item_id==responsekey_table.c.item_id,
response_table.c.code==responsekey_table.c.code),
engine=model.ctx.current.get_bind(None))
row = s.execute(session_id=sess.id).fetchone()
if row and row[0]:
sess.score = _('score')+': '+str(row[0])
model.ctx.current.flush()
h.redirect_to(action='list')
results.roles = 'user'