"""
The :mod:`admin` module handles basic user management functions like creating new user accounts,
deleting user accounts or modifying user data.
"""
import logging
import pylons
from ekorpus.lib.base import *
from ekorpus.models.form import *
from sqlalchemy import select, join, outerjoin, func, and_, text
from ekorpus.models.schema import session_table, test_table, \
recording_table, choice_table, user_table, groups_table, usergroups_table
from pylons.i18n import get_lang, set_lang
log = logging.getLogger(__name__)
[docs]class AdminController(AuthorizedController):
"""This class implements all user management functions.
"""
default_language='et'
#ui_languages = [default_language,'en','fi']
[docs] class State(object):
def _(self, msg):
return pylons.i18n._(msg)
[docs] def index(self):
"""Generates users homepage.
"""
if 'user_id' in session:
id = int(session['user_id'])
c.user=model.User.get_by(id=id)
return render('/admin/home.mak')
[docs] def signout(self):
"""Signout.
"""
session.invalidate()
session.save()
h.redirect_to(action='index')
[docs] def signin(self):
"""Signs the user into system by verifying user credentials.
"""
user = None
p = request.params
if 'name' in p and 'password' in p:
user = self._check_password(p['name'], p['password'])
if user:
session['user'] = user.name
session['user_id'] = user.id
session['roles'] = user.roles
if user.roles == 'admin':
session['superuser'] = '1'
else:
if 'superuser' in session:
del session['superuser']
#lg = user.language
#if not (lg in self.ui_languages):
# lg=self.default_language
#set_lang(lg)
#session['lg'] = lg
session.save()
if 'r' in p:
h.redirect_to(str(p['r']))
else:
h.redirect_to('tests')
if not user:
regcode = pylons.config['regcode']
if 'regcode' in p and p['regcode']==regcode:
session['register'] = '1'
session.save()
return self.edit(None)
else:
return render('/admin/signin.mak')
[docs] def lang(self, lg):
"""Change UI language.
"""
p = request.params
# lg is already set in BaseController
if 'r' in p:
h.redirect_to(str(p['r']))
else:
h.redirect_to(action='index')
[docs] def list(self):
"""List all users.
"""
tests = select(
[
session_table.c.user_id,
func.array_accum(session_table.c.test_id).label('tests'),
func.array_accum(session_table.c.score).label('scores')
],
group_by=[session_table.c.user_id]).alias('tests')
records = select(
[
recording_table.c.user_id,
func.array_accum(recording_table.c.id).label('recordings'),
func.array_accum(recording_table.c.comment).label('comments')
],
group_by=[recording_table.c.user_id],
engine=model.ctx.current.get_bind(None)).alias('records')
j = outerjoin( user_table, tests, user_table.c.id==tests.c.user_id ).\
outerjoin( records, user_table.c.id==records.c.user_id )
c.users = j.select(order_by=[user_table.c.name]).execute().fetchall()
return render('/admin/list.mak')
list.roles = 'admin'
[docs] def sessions(self):
"""List all test sessions
"""
t = text("""
select
id, comment,
array_accum(uname) as names,
array_accum(uid) as ids,
array_accum(score) as scores
from (
select t.id, t.comment, u.name as uname, u.id as uid, s.score as score
from "user" u join session s on u.id=s.user_id join test t on t.id=s.test_id
order by 1,2,3
) as s
group by 1,2 order by 2""", engine=model.ctx.current.get_bind(None))
res = t.execute()
c.colnames = res.keys
c.sessions = res.fetchall()
return render('/admin/sessions.mak')
sessions.roles = 'admin'
[docs] def profile(self, id):
"""Show user profile.
"""
log.debug('Profile ID:%s',id)
if not ('superuser' in session):
if not (('user_id' in session) and session['user_id']):
return h.redirect_to(action='index')
id = session['user_id']
log.debug('Profile UID:%s',id)
c.profile = model.Profile.get_by(id=id)
if not c.profile:
return h.redirect_to(action='index')
c.average = model.Profile.get_by(id=1)
return render('/admin/profile.mak')
profile.roles='user'
#wrapper to skip authorization
[docs] def edit_mydata(self):
if not (('user_id' in session) and session['user_id']):
return h.redirect_to(action='index')
return self.edit(session['user_id'])
[docs] def edit(self,id):
"""Display user data for editing.
"""
c.user=model.User.get_by(id=id)
if not c.user:
c.user=model.User()
c.user.id=''
c.user.name=''
c.user.gender='0'
c.user.age=''
c.user.experience=''
if 'lg' in session:
lg = session['lg']
else:
lg = self.default_language
c.nationality = model.Selection.get_by(code="nati", lang=lg)
if not c.nationality:
c.nationality = model.Selection.get_by(code="nati", lang=self.default_language)
c.education = model.Selection.get_by(code="educ", lang=lg)
if not c.education:
c.education = model.Selection.get_by(code="educ", lang=self.default_language)
c.language = model.Selection.get_by(code="lang", lang=lg)
if not c.language:
c.language = model.Selection.get_by(code="lang", lang=self.default_language)
if 'superuser' in session:
sel=usergroups_table.select(usergroups_table.c.user_id==id).alias('t')
sel=outerjoin(groups_table,sel)
sel=select(
[groups_table.c.id,groups_table.c.text,'t.user_id'],
from_obj=[sel],
order_by=[groups_table.c.text],
engine=model.ctx.current.get_bind(None))
c.groups = sel.execute().fetchall()
return render('/admin/edit.mak')
edit.roles = 'admin'
@validate(schema=RegistrationForm(), form='edit', state=State(), auto_error_formatter=std_formatter)
[docs] def save(self,id):
"""Save user data.
"""
#check authorization
super = 'superuser' in session
register = 'register' in session
user = None
if register:
del session['register']
id = None
session.save()
if not (super or register):
if not (('user_id' in session) and session['user_id']):
return h.redirect_to(action='index')
id = session['user_id']
#p = request.params
p = self.form_result
if not register:
user=model.User.get_by(id=id)
if not user:
user=model.User()
log.debug("Saving user: %s", id or "")
user.password=p['password']
user.gender=p['gender']
user.age=p['age']
user.education=p['education']
user.nationality=p['nationality']
user.language=p['language']
user.language2=p['language2']
user.experience=p['experience']
if super or register:
user.name=p['name']
if super:
if 'roles' in p:
user.roles=p['roles']
else:
user.roles=None
if 'ignore' in p:
user.ignore=1
else:
user.ignore=0
if not user.id:
model.ctx.current.flush()
id = user.id
con = model.ctx.current.connection(user.mapper)
groups = request.params.getall('groups_id[]')
if groups:
con.execute(usergroups_table.delete(usergroups_table.c.user_id==int(id)))
for group in groups:
con.execute(usergroups_table.insert(values=(id, group)))
model.ctx.current.flush()
if super:
return h.redirect_to(action='list')
if register:
return self.signin()
return h.redirect_to(action='index')
#save.roles = 'user'
[docs] def delete(self,id):
"""Delete user account.
"""
user=model.User.get_by(id=id)
if user:
log.debug('Deleting user: %s',id)
user.delete()
model.ctx.current.flush()
h.redirect_to(action='list')
delete.roles = 'admin'
def _check_own(self, name, password):
if not name:
return None
user = model.User.get_by(name=name)
log.debug('USER: %s',user)
if not user:
return None
if not user.password:
return user
if user.password == password:
return user
return None
def _check_password(self, name, password):
if not name:
return None
user = model.User.get_by(name=name)
if not user:
return None
if not user.password:
return user
if user.password == password:
return user
log.debug('Invalid password: %s = %s', name, password)
return None