eKorpus

Source code for ekorpus.controllers.admin

"""
The :mod:`admin` module handles basic user management functions like creating new user accounts,
deleting user accounts or modifying user data.
"""
import logging
import pylons

from ekorpus.lib.base import *
from ekorpus.models.form import *
from sqlalchemy import select, join, outerjoin, func, and_, text
from ekorpus.models.schema import session_table, test_table, \
    recording_table, choice_table, user_table, groups_table, usergroups_table
from pylons.i18n import get_lang, set_lang


log = logging.getLogger(__name__)

[docs]class AdminController(AuthorizedController): """This class implements all user management functions. """ default_language='et' #ui_languages = [default_language,'en','fi']
[docs] class State(object): def _(self, msg): return pylons.i18n._(msg)
[docs] def index(self): """Generates users homepage. """ if 'user_id' in session: id = int(session['user_id']) c.user=model.User.get_by(id=id) return render('/admin/home.mak')
[docs] def signout(self): """Signout. """ session.invalidate() session.save() h.redirect_to(action='index')
[docs] def signin(self): """Signs the user into system by verifying user credentials. """ user = None p = request.params if 'name' in p and 'password' in p: user = self._check_password(p['name'], p['password']) if user: session['user'] = user.name session['user_id'] = user.id session['roles'] = user.roles if user.roles == 'admin': session['superuser'] = '1' else: if 'superuser' in session: del session['superuser'] #lg = user.language #if not (lg in self.ui_languages): # lg=self.default_language #set_lang(lg) #session['lg'] = lg session.save() if 'r' in p: h.redirect_to(str(p['r'])) else: h.redirect_to('tests') if not user: regcode = pylons.config['regcode'] if 'regcode' in p and p['regcode']==regcode: session['register'] = '1' session.save() return self.edit(None) else: return render('/admin/signin.mak')
[docs] def lang(self, lg): """Change UI language. """ p = request.params # lg is already set in BaseController if 'r' in p: h.redirect_to(str(p['r'])) else: h.redirect_to(action='index')
[docs] def list(self): """List all users. """ tests = select( [ session_table.c.user_id, func.array_accum(session_table.c.test_id).label('tests'), func.array_accum(session_table.c.score).label('scores') ], group_by=[session_table.c.user_id]).alias('tests') records = select( [ recording_table.c.user_id, func.array_accum(recording_table.c.id).label('recordings'), func.array_accum(recording_table.c.comment).label('comments') ], group_by=[recording_table.c.user_id], engine=model.ctx.current.get_bind(None)).alias('records') j = outerjoin( user_table, tests, user_table.c.id==tests.c.user_id ).\ outerjoin( records, user_table.c.id==records.c.user_id ) c.users = j.select(order_by=[user_table.c.name]).execute().fetchall() return render('/admin/list.mak')
list.roles = 'admin'
[docs] def sessions(self): """List all test sessions """ t = text(""" select id, comment, array_accum(uname) as names, array_accum(uid) as ids, array_accum(score) as scores from ( select t.id, t.comment, u.name as uname, u.id as uid, s.score as score from "user" u join session s on u.id=s.user_id join test t on t.id=s.test_id order by 1,2,3 ) as s group by 1,2 order by 2""", engine=model.ctx.current.get_bind(None)) res = t.execute() c.colnames = res.keys c.sessions = res.fetchall() return render('/admin/sessions.mak')
sessions.roles = 'admin'
[docs] def profile(self, id): """Show user profile. """ log.debug('Profile ID:%s',id) if not ('superuser' in session): if not (('user_id' in session) and session['user_id']): return h.redirect_to(action='index') id = session['user_id'] log.debug('Profile UID:%s',id) c.profile = model.Profile.get_by(id=id) if not c.profile: return h.redirect_to(action='index') c.average = model.Profile.get_by(id=1) return render('/admin/profile.mak')
profile.roles='user' #wrapper to skip authorization
[docs] def edit_mydata(self): if not (('user_id' in session) and session['user_id']): return h.redirect_to(action='index') return self.edit(session['user_id'])
[docs] def edit(self,id): """Display user data for editing. """ c.user=model.User.get_by(id=id) if not c.user: c.user=model.User() c.user.id='' c.user.name='' c.user.gender='0' c.user.age='' c.user.experience='' if 'lg' in session: lg = session['lg'] else: lg = self.default_language c.nationality = model.Selection.get_by(code="nati", lang=lg) if not c.nationality: c.nationality = model.Selection.get_by(code="nati", lang=self.default_language) c.education = model.Selection.get_by(code="educ", lang=lg) if not c.education: c.education = model.Selection.get_by(code="educ", lang=self.default_language) c.language = model.Selection.get_by(code="lang", lang=lg) if not c.language: c.language = model.Selection.get_by(code="lang", lang=self.default_language) if 'superuser' in session: sel=usergroups_table.select(usergroups_table.c.user_id==id).alias('t') sel=outerjoin(groups_table,sel) sel=select( [groups_table.c.id,groups_table.c.text,'t.user_id'], from_obj=[sel], order_by=[groups_table.c.text], engine=model.ctx.current.get_bind(None)) c.groups = sel.execute().fetchall() return render('/admin/edit.mak')
edit.roles = 'admin' @validate(schema=RegistrationForm(), form='edit', state=State(), auto_error_formatter=std_formatter)
[docs] def save(self,id): """Save user data. """ #check authorization super = 'superuser' in session register = 'register' in session user = None if register: del session['register'] id = None session.save() if not (super or register): if not (('user_id' in session) and session['user_id']): return h.redirect_to(action='index') id = session['user_id'] #p = request.params p = self.form_result if not register: user=model.User.get_by(id=id) if not user: user=model.User() log.debug("Saving user: %s", id or "") user.password=p['password'] user.gender=p['gender'] user.age=p['age'] user.education=p['education'] user.nationality=p['nationality'] user.language=p['language'] user.language2=p['language2'] user.experience=p['experience'] if super or register: user.name=p['name'] if super: if 'roles' in p: user.roles=p['roles'] else: user.roles=None if 'ignore' in p: user.ignore=1 else: user.ignore=0 if not user.id: model.ctx.current.flush() id = user.id con = model.ctx.current.connection(user.mapper) groups = request.params.getall('groups_id[]') if groups: con.execute(usergroups_table.delete(usergroups_table.c.user_id==int(id))) for group in groups: con.execute(usergroups_table.insert(values=(id, group))) model.ctx.current.flush() if super: return h.redirect_to(action='list') if register: return self.signin() return h.redirect_to(action='index') #save.roles = 'user'
[docs] def delete(self,id): """Delete user account. """ user=model.User.get_by(id=id) if user: log.debug('Deleting user: %s',id) user.delete() model.ctx.current.flush() h.redirect_to(action='list')
delete.roles = 'admin' def _check_own(self, name, password): if not name: return None user = model.User.get_by(name=name) log.debug('USER: %s',user) if not user: return None if not user.password: return user if user.password == password: return user return None def _check_password(self, name, password): if not name: return None user = model.User.get_by(name=name) if not user: return None if not user.password: return user if user.password == password: return user log.debug('Invalid password: %s = %s', name, password) return None