Source code for ciodashboard.views.dashboard

"""View callables to manage dashboards."""

from __future__ import annotations
from os.path import join, exists
from shutil import rmtree
from json import dumps

from sqlalchemy import desc
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import FlushError

from pyramid.view import view_config
from pyramid.request import Request
from pyramid.response import Response
from pyramid.httpexceptions import HTTPFound, HTTPForbidden, HTTPNotFound

from chrysalio.lib.log import log_info
from chrysalio.lib.utils import make_id
from chrysalio.lib.form import get_action, Form
from chrysalio.lib.filter import Filter
from chrysalio.lib.paging import PAGE_SIZES, Paging
from chrysalio.lib.attachment import attachment_url, attachment_update
from chrysalio.lib.tabset import Tabset
from chrysalio.lib.i18n import translate_field
from chrysalio.models.populate import web2db, db2web
from chrysalio.views import BaseView
from chrysalio.includes.cache import cache_namespace
from chrysalio.includes.themes import theme_static_prefix
from chrysalio.models.dbuser import DBUser
from chrysalio.models.dbgroup import DBGroup
from ..lib.i18n import _
from ..lib.utils import CIODASHBOARD_NS, CACHE_REGION_GLOBAL, flag_image
from ..lib.utils import route_file_view
from ..relaxng import RELAXNG_CIODASHBOARD
from ..models.populate import xml2db
from ..models.dbdashboard import DASHBOARD_ACCESSES
from ..models.dbdashboard import DBDashboard, DBDashboardGauge
from ..models.dbdashboard import DBDashboardUser, DBDashboardGroup

DISPLAY_REFRESH = '900'


# =============================================================================
[docs] class DashboardView(BaseView): """Class to manage dashboard views. :type request: pyramid.request.Request :param request: Current request. """ # ------------------------------------------------------------------------- def __init__(self, request: Request): """Constructor method.""" super().__init__(request) if 'modules_off' not in self._request.registry or \ 'ciodashboard' in self._request.registry['modules_off']: raise HTTPForbidden( comment=_('The module "CioDashboard" is not activated.')) # -------------------------------------------------------------------------
[docs] @view_config( route_name='dashboard_index', renderer='ciodashboard:Templates/dashboard_index.pt', permission='dashboard-view') @view_config(route_name='dashboard_index', renderer='json', xhr=True) def index(self) -> dict | Response: """List all authorized dashboards.""" # Ajax i_creator = self._request.has_permission('dashboard-create') if self._request.is_xhr: if i_creator: self._import_dashboards() return {} # Action action, items = self._index_action(i_creator) if action[:4] == 'exp!': response = self._dashboards2response(items) if response: return response elif action[:4] == 'dpl!': return HTTPFound( self._request.route_path( 'dashboard_display', dashboard_ids='|'.join(items))) # Filter paging_id = 'dashboard_index' pfilter = Filter( self._request, paging_id, ( ('i18n_label', _('Label'), False, ''), ('dashboard_id', _('Identifier'), False, ''), ( 'access', _('Access'), False, [('', ' ')] + list(DASHBOARD_ACCESSES.items()))), remove=action[:4] == 'crm!' and action[4:] or None) # Favorites user_id = self._request.session['user']['user_id'] if 'ciodashboard' not in self._request.session: self._request.session['ciodashboard'] = {} if self._request.POST: self._request.session['ciodashboard']['favorite_only'] = bool( self._request.POST.get('favorite')) favorites = [ k[0] for k in self._request.dbsession.query( DBDashboardUser.dashboard_id).filter_by( user_id=user_id, favorite=True) ] # Paging defaults = Paging.params(self._request, paging_id, '+dashboard_id') defaults['favorite'] = favorites and \ self._request.session['ciodashboard'].get('favorite_only', True) dbquery = pfilter.sql( self._request.dbsession.query(DBDashboard), 'dsh_dashboards') if not i_creator: dbquery = dbquery.filter( DBDashboard.dashboard_id.in_(self._my_dashboard_ids())) if defaults['favorite']: dbquery = dbquery.filter(DBDashboard.dashboard_id.in_(favorites)) oby = getattr(DBDashboard, defaults['sort'][1:]) dbquery = dbquery.order_by( desc(oby) if defaults['sort'][0] == '-' else oby) paging = Paging(self._request, paging_id, dbquery, defaults) paging.set_current_ids('dashboard_id') if not defaults['favorite'] and pfilter.is_empty() \ and not self._request.has_permission('dashboard-edit') \ and len(paging) == 1: return HTTPFound( self._request.route_path( 'dashboard_display', dashboard_ids=paging[0].dashboard_id)) # Form & completed action form = Form(self._request, defaults=defaults) form.forget('filter_value') gauges_refresh = action[:4] == 'rfh!' if action and action[3] == '!': action = '' # Breadcrumbs & documentation self._request.breadcrumbs(_('Dashboards'), 1) self._request.documentation = '/dashboard/index' # Refresh self._request.response.headerlist.append(('Refresh', DISPLAY_REFRESH)) return { # yapf: disable 'action': action, 'items': items, 'form': form, 'pfilter': pfilter, 'paging': paging, 'favorites': favorites, 'i_creator': i_creator, 'i_editor': self._request.has_permission('dashboard-edit'), 'gauges': self._gauges(paging, refresh=gauges_refresh), 'flag_image': flag_image, 'PAGE_SIZES': PAGE_SIZES, 'attachment_url': attachment_url, 'has_attachments': bool( self._request.registry.settings.get('attachments')), 'download_max_size': self._request.registry[ 'settings']['download-max-size'] }
# -------------------------------------------------------------------------
[docs] @view_config( route_name='dashboard_index_filter', renderer='json', xhr=True) def index_filter(self) -> dict: """Return a dictionary to autocomplete a filter field.""" return Filter.sql_autocomplete(self._request, DBDashboard)
# -------------------------------------------------------------------------
[docs] @view_config( route_name='dashboard_display', renderer='ciodashboard:Templates/dashboard_display.pt', permission='dashboard-view') def display(self) -> dict: """Display one or several dashboards.""" # Action action = get_action(self._request)[0] current = None # Load dashboards dashboards = [] gauges = {} files = {} pictures = {} i_maintainer = {} ciodashboard = self._request.registry['modules']['ciodashboard'] for dashboard_id in self._request.matchdict['dashboard_ids'].split( '|'): dashboard = ciodashboard.dashboard(self._request, dashboard_id) if dashboard is None: continue dashboards.append(dashboard) i_maintainer[dashboard.uid] = self._request.has_permission( 'dashboard-maintain') if action[:4] == 'upd!' and action[4:] == dashboard_id \ and i_maintainer[dashboard_id]: current = dashboard_id dashboard.update_metafields(self._request) gauges[dashboard.uid] = dashboard.gauges(self._request, True) else: gauges[dashboard.uid] = dashboard.gauges( self._request, action == 'rfh!') files[dashboard.uid] = dashboard.file_ul(self._request) pictures[dashboard.uid] = dashboard.cioset_picture(self._request) # Form form = Form(self._request) form.forget('') # Breadcrumbs & documentation if 'dashboard_view' in self._request.breadcrumbs.current_route_name(): self._request.breadcrumbs.pop() self._request.breadcrumbs(_('Display of dashboards')) self._request.documentation = '/dashboard/display' # Refresh self._request.response.headerlist.append(('Refresh', DISPLAY_REFRESH)) fields = self._request.registry['metafields'] \ if 'ciowarehouse' in self._request.registry['modules'] else \ self._request.registry['fields'] return { 'form': form, 'dashboards': dashboards, 'gauges': gauges, 'files': files, 'pictures': pictures, 'metafields': fields, 'i_maintainer': i_maintainer, 'current': current, 'join': join, 'translate_field': translate_field, 'route_file_view': route_file_view }
# -------------------------------------------------------------------------
[docs] @view_config( route_name='dashboard_view', renderer='ciodashboard:Templates/dashboard_view.pt', permission='dashboard-view') def view(self) -> dict | Response: """Show dashboard configuration.""" # Dashboard i_creator = self._request.has_permission('dashboard-create') dbdashboard = self._get_dbdashboard(i_creator) picture = self._request.registry.settings.get('attachments') and ( attachment_url( self._request, dbdashboard.attachments_dir, dbdashboard.attachments_key, dbdashboard.picture) or '{0}/ciodashboard/images/dashboard_picture.png'.format( theme_static_prefix(self._request))) # Action action = get_action(self._request)[0] if action == 'exp!': response = self._dashboards2response( (str(dbdashboard.dashboard_id), )) if response: return response # User paging user_paging, defaults, user_filter = self._user_paging( action, dbdashboard) # Form form = Form(self._request, defaults=defaults) form.forget('filter_value') # Breadcrumbs & documentation label = dbdashboard.label(self._request) self._request.breadcrumbs( _('Dashboard "${l}"', {'l': label}), replace=self._request.route_path( 'dashboard_edit', dashboard_id=dbdashboard.dashboard_id)) self._request.documentation = '/dashboard/view' return { # yapf: disable 'form': form, 'label': label, 'user_filter': user_filter, 'user_paging': user_paging, 'tabset': Tabset( self._request, 'tabDashboard', dbdashboard.settings_tabs(self._request)), 'dbdashboard': dbdashboard, 'picture': picture, 'navigator': Paging.navigator( self._request, 'dashboard_index', dbdashboard.dashboard_id, self._request.route_path( 'dashboard_view', dashboard_id='_ID_')), 'i_editor': self._request.has_permission('dashboard-edit'), 'i_creator': i_creator }
# -------------------------------------------------------------------------
[docs] @view_config( route_name='dashboard_create', renderer='ciodashboard:Templates/dashboard_edit.pt', permission='dashboard-edit') @view_config( route_name='dashboard_edit', renderer='ciodashboard:Templates/dashboard_edit.pt', permission='dashboard-edit') @view_config( route_name='dashboard_edit', renderer='json', xhr=True, permission='dashboard-edit') def edit(self) -> dict: """Create or edit a dashboard.""" # Rights i_creator = self._request.has_permission('dashboard-create') dbdashboard = self._get_dbdashboard(i_creator) \ if 'dashboard_id' in self._request.matchdict else None if dbdashboard is None \ and not self._request.has_permission('dashboard-create'): raise HTTPForbidden() # Ajax if self._request.is_xhr: if dbdashboard is not None: dbdashboard.attachments_key, dbdashboard.picture = \ attachment_update( self._request, dbdashboard.attachments_dir, dbdashboard.attachments_key, self._request.POST['picture'], replace=dbdashboard.picture, prefix=str(dbdashboard.dashboard_id)[:12]) log_info( self._request, 'dashboard_update_picture', dbdashboard.dashboard_id) return {} # User paging action = get_action(self._request)[0] user_paging, defaults, user_filter = self._user_paging(action) # Form groups = { k.group_id: (k.label(self._request), k.description(self._request)) for k in self._request.dbsession.query(DBGroup) } form = Form( self._request, *DBDashboard.settings_schema( self._request, defaults, groups, dbdashboard), obj=dbdashboard, force_defaults=dbdashboard is not None) form.forget('filter_value') # Action if action == 'pct!' and dbdashboard is not None: dbdashboard.attachments_key, dbdashboard.picture = \ attachment_update( self._request, dbdashboard.attachments_dir, dbdashboard.attachments_key, self._request.POST['picture'], replace=dbdashboard.picture, prefix=str(dbdashboard.dashboard_id)[:12]) log_info( self._request, 'dashboard_update_picture', dbdashboard.dashboard_id) elif action == 'sav!' and form.validate(): dbdashboard = self._save(dbdashboard, groups, form.values) if dbdashboard is not None: if 'dashboard_id' not in self._request.matchdict: self._request.breadcrumbs.pop() log_info( self._request, 'dashboard_id' in self._request.matchdict and 'dashboard_edit' or 'dashboard_create', dbdashboard.dashboard_id) return HTTPFound( self._request.route_path( 'dashboard_view', dashboard_id=dbdashboard.dashboard_id)) if form.has_error(): self._request.session.flash(_('Correct errors.'), 'alert') # Picture if dbdashboard and self._request.registry.settings.get('attachments'): picture = attachment_url( self._request, dbdashboard.attachments_dir, dbdashboard.attachments_key, dbdashboard.picture) or \ '{0}/ciodashboard/images/dashboard_picture.png'.format( theme_static_prefix(self._request)) else: picture = None # Breadcrumbs & documentation label = dbdashboard and dbdashboard.label(self._request) if not dbdashboard: self._request.breadcrumbs(_('Dashboard Creation')) else: self._request.breadcrumbs( _('Dashboard "${l}" Edition', {'l': label}), replace=self._request.route_path( 'dashboard_view', dashboard_id=dbdashboard.dashboard_id)) self._request.documentation = '/dashboard/edit' return { # yapf: disable 'form': form, 'dbdashboard': dbdashboard or DBDashboard, 'action': action, 'picture': picture, 'label': label, 'user_filter': user_filter, 'user_paging': user_paging, 'groups': groups, 'tabset': Tabset( self._request, 'tabDashboard', DBDashboard.settings_tabs(self._request)) }
# ------------------------------------------------------------------------- def _get_dbdashboard(self, i_creator: bool) -> DBDashboard: """Return the SqlAlchemy object of the selected dashboard or raise an HTTPNotFound exception. :param bool i_creator: ``True`` if the user can create a warehouse. :rtype: .models.dbdashboard.DBDashboard """ dbdashboard = self._request.dbsession.query(DBDashboard).filter_by( dashboard_id=self._request.matchdict['dashboard_id']).first() if dbdashboard is None: raise HTTPNotFound() if not i_creator and \ dbdashboard.dashboard_id not in self._my_dashboard_ids(): raise HTTPForbidden() return dbdashboard # ------------------------------------------------------------------------- def _import_dashboards(self): """Import dashboards.""" # Get current IDs dashboard_ids = { k[0] for k in self._request.dbsession.query(DBDashboard.dashboard_id) } # Update database root_tag = f"{{{ RELAXNG_CIODASHBOARD['namespace']}}}"\ f"{RELAXNG_CIODASHBOARD['root']}" web2db( self._request, xml2db, 'dashboard', relaxngs={root_tag: RELAXNG_CIODASHBOARD['file']}) # Get new IDs dashboard_ids = { k[0] for k in self._request.dbsession.query(DBDashboard.dashboard_id) } - dashboard_ids if dashboard_ids: log_info( self._request, 'dashboard_import', ' '.join(dashboard_ids)) # ------------------------------------------------------------------------- def _delete_dashboards(self, dashboard_ids: list): """Delete dashboards. :param list dashboard_ids: List of dashboard IDs to delete. """ deleted = [] attachments = self._request.registry.settings.get('attachments') for dbdashboard in self._request.dbsession.query(DBDashboard).filter( DBDashboard.dashboard_id.in_(dashboard_ids)): # Remove attachments if attachments and dbdashboard.attachments_key: attachment = join( attachments, dbdashboard.attachments_dir, dbdashboard.attachments_key) if exists(attachment): rmtree(attachment) # Remove from database deleted.append(dbdashboard.dashboard_id) self._request.dbsession.delete(dbdashboard) if deleted: log_info(self._request, 'dashboard_delete', ' '.join(deleted)) # ------------------------------------------------------------------------- def _dashboards2response(self, dashboard_ids: tuple | list) -> Response: """Export dashboards as an XML file embedded in a Pyramid response. :param list dashboard_ids: List of dashboard IDs to export. :rtype: :class:`pyramid.response.Response` or ``''`` """ dbitems = tuple( self._request.dbsession.query(DBDashboard).filter( DBDashboard.dashboard_id.in_(dashboard_ids)).order_by( 'dashboard_id')) if not dbitems: return '' filename = '{0}.{1}.xml'.format( len(dbitems) == 1 and dbitems[0].dashboard_id or make_id(self._request.registry['settings']['title'], 'token'), DBDashboard.suffix) log_info( self._request, 'dashboard_export', ' '.join([k.dashboard_id for k in dbitems])) return db2web(self._request, dbitems, filename, RELAXNG_CIODASHBOARD) # ------------------------------------------------------------------------- def _toggle_favorite(self, dashboard_id: str): """Toggle the favorite state of the dashboard. :param str dashboard_id: ID of the dashboard. """ dbsession = self._request.dbsession user_id = self._request.session['user']['user_id'] dbdashboard_user = dbsession.query(DBDashboardUser).filter_by( dashboard_id=dashboard_id, user_id=user_id).first() if dbdashboard_user is None: dbsession.add( DBDashboardUser( dashboard_id=dashboard_id, user_id=user_id, favorite=True)) else: dbdashboard_user.favorite = not dbdashboard_user.favorite # ------------------------------------------------------------------------- def _index_action(self, i_creator: bool) -> tuple[str, list]: """Execute actions for index view. :param str i_creator: ``True`` if the user can create a dashboard. :rtype: tuple :return: A tuple such as ``(action, items)``. """ action, items = get_action(self._request) if action == 'imp!' and i_creator: self._import_dashboards() elif action[:4] == 'del!' and i_creator: self._delete_dashboards(items) elif action[:4] == 'fav!': self._toggle_favorite(action[4:]) return action, items # ------------------------------------------------------------------------- def _my_dashboard_ids(self) -> set: """Return a list of authorized dashboard IDs. :rtype: set """ user_id = self._request.session['user']['user_id'] dashboard_ids = { k[0] for k in self._request.dbsession.query( DBDashboard.dashboard_id).filter_by(access='free') } dashboard_ids |= { k[0] for k in self._request.dbsession.query( DBDashboardUser.dashboard_id).filter_by(user_id=user_id) } dashboard_ids |= { k[0] for k in self._request.dbsession.query( DBDashboardGroup.dashboard_id).filter( DBDashboardGroup.group_id.in_( self._request.session['user']['groups'])) } return dashboard_ids # ------------------------------------------------------------------------- def _gauges(self, paging: Paging, refresh: bool = False) -> dict: """Return a dictionary whose keys are the dashboard identifiers and values a list such as ``((gauge1_label, gauge1_value),...)``. :type paging: chrysalio.lib.paging.Paging :param paging: Current paginf of dashboards :param bool refresh: (default=False) If ``True`` force refreshment. :rtype: dict """ gauges = {} ciodashboard = self._request.registry['modules']['ciodashboard'] for dbdashboard in paging: dashboard = ciodashboard.dashboard( self._request, dbdashboard.dashboard_id) gauges[dashboard.uid] = dashboard.gauges( self._request, refresh=refresh) return gauges # ------------------------------------------------------------------------- def _save( self, dbdashboard: DBDashboard | None, groups: dict, values: dict) -> DBDashboard | None: """Save a dashboard. :type dbdashboard: .models.dbdashboard.DBDashboard :param dbdashboard: Dashboard to save. :param dict groups: A dictionary such as ``{group_id: (label, description),...}``. :param dict values: Form values. :rtype: :class:`~.models.dbdashboard.DBDashboard` instance or ``None`` """ creation = dbdashboard is None dbdashboard = dbdashboard or DBDashboard() # Update dashboard record = { k: values[k] for k in values if k[:4] not in ('usr:', 'rgh:', 'fav:', 'grp:', 'grgh', 'gge:') } if not creation: record['dashboard_id'] = dbdashboard.dashboard_id error = dbdashboard.record_format(record) if error: self._request.session.flash(error, 'alert') return None record.update( { k: None for k in values if not values[k] and hasattr(DBDashboard, k) }) for field in record: if getattr(dbdashboard, field) != record[field]: setattr(dbdashboard, field, record[field]) # Save if creation: try: self._request.dbsession.add(dbdashboard) self._request.dbsession.flush() except (IntegrityError, FlushError): self._request.session.flash( _('This dashboard already exists.'), 'alert') return None # Update gauges self._gauges_update(dbdashboard, values) # Update users self._users_update(dbdashboard) # Update groups self._groups_update(dbdashboard, groups, values) # Clear cache namespace = cache_namespace(CIODASHBOARD_NS, dbdashboard.dashboard_id) self._request.registry['cache_global'].initialize( namespace, CACHE_REGION_GLOBAL) self._request.registry['cache_user'].clear( self._request, namespace=namespace) return dbdashboard # ------------------------------------------------------------------------- def _gauges_update(self, dbdashboard: DBDashboard, values: dict): """Update the list of gauges. :type dbdashboard: .models.dbwarhouse.DBDashboard :param dbdashboard: SQLAlchemy object for the current dashboard. :param dict values: Form values. """ gauges: dict = {} for item in values: if item[:4] != 'gge:': continue if item[4:5] not in gauges: gauges[item[4:5]] = {} gauges[item[4:5]][item[6:]] = values[item] self._request.dbsession.query(DBDashboardGauge).filter_by( dashboard_id=dbdashboard.dashboard_id).delete() for gauge_id, item in enumerate(sorted(gauges)): gauge = gauges[item] labels = { k[6:]: gauge[k] for k in gauge if k[:6] == 'label_' and gauge[k] } if gauge['metafield'] and gauge['count'] and labels: dbdashboard.gauges.append( DBDashboardGauge( gauge_id=gauge_id, i18n_label=dumps(labels), metafield=gauge['metafield'], count=gauge['count'])) # ------------------------------------------------------------------------- def _users_update(self, dbdashboard: DBDashboard): """Update the list of dashboard users. :type dbdashboard: .models.dbwarhouse.DBDashboard :param dbdashboard: SQLAlchemy object for the current dashboard. """ values: dict = {} for value in self._request.POST: if value[:4] in ('set:', 'usr:', 'fav:'): user_id = int(value[4:]) value = value[:3] else: continue if user_id not in values: values[user_id] = [] if value != 'set': values[user_id].append(value) for user_id, value in values.items(): dbdashboard_user = self._request.dbsession.query( DBDashboardUser).filter_by( dashboard_id=dbdashboard.dashboard_id, user_id=user_id).first() if 'usr' not in value and 'fav' not in value: if dbdashboard_user is not None: self._request.dbsession.delete(dbdashboard_user) continue if dbdashboard_user is None: dbdashboard_user = DBDashboardUser( dashboard_id=dbdashboard.dashboard_id, user_id=user_id) self._request.dbsession.add(dbdashboard_user) dbdashboard_user.favorite = 'fav' in value # ------------------------------------------------------------------------- def _groups_update( self, dbdashboard: DBDashboard, groups: dict, values: dict): """Update the list of groups. :type dbdashboard: .models.dbwarhouse.DBDashboard :param dbdashboard: SQLAlchemy object for the current dashboard. :param dict groups: A dictionary such as ``{group_id: (label, description),...}``. :param dict values: Form values. """ dashboard_groups = {k.group_id: k for k in dbdashboard.groups} for group_id in sorted(groups): value = values[f'grp:{group_id}'] if not value: if group_id in dashboard_groups: self._request.dbsession.delete( self._request.dbsession.query( DBDashboardGroup).filter_by( dashboard_id=dbdashboard.dashboard_id, group_id=group_id).first()) continue dbdashboard_group = dashboard_groups.get(group_id) if dbdashboard_group is None: dbdashboard_group = DBDashboardGroup( dashboard_id=dbdashboard.dashboard_id, group_id=group_id) self._request.dbsession.add(dbdashboard_group) # ------------------------------------------------------------------------- def _user_paging( self, action: str, dbdashboard: DBDashboard | None = None ) -> tuple[Paging, dict, Filter]: """Return a paging object for users. :param str action: Current action. :type dbdashboard: .models.dbwarhouse.DBDashboard :param dbdashboard: (optional) If not ``None``, users are only users of the dashboard. :rtype: tuple :return: A tuple such as ``(user_paging, defaults, user_filter)``. """ # Filter paging_id = 'dashboard_users' ufilter = Filter( self._request, paging_id, ( ('login', _('Login'), False, None), ('last_name', _('Last name'), False, None), ('email', _('Email'), False, None), ( 'status', _('Status'), False, [('', ' ')] + list(DBUser.status_labels.items()))), (('status', '=', 'active'), ), remove=action[:4] == 'crm!' and action[4:] or None) # Database query defaults = Paging.params( self._request, paging_id, '+last_name', default_display='list') dbquery = ufilter.sql( self._request.dbsession.query( DBUser.user_id, DBUser.login, DBUser.first_name, DBUser.last_name, DBUser.honorific, DBUser.email, DBUser.email_hidden, DBUser.status, DBUser.last_login, DBUser.attachments_key, DBUser.picture), 'users') if dbdashboard is not None: dbquery = dbquery.filter( DBUser.user_id.in_([k.user_id for k in dbdashboard.users])) oby = getattr(DBUser, defaults['sort'][1:]) dbquery = dbquery.order_by( desc(oby) if defaults['sort'][0] == '-' else oby) return Paging(self._request, paging_id, dbquery, defaults), \ dict(defaults), ufilter