Source code for bluepymm.run_combos.calculate_scores

"""Python Model Management"""

"""
Copyright (c) 2018, EPFL/Blue Brain Project

 This file is part of BluePyMM <https://github.com/BlueBrain/BluePyMM>

 This library is free software; you can redistribute it and/or modify it under
 the terms of the GNU Lesser General Public License version 3.0 as published
 by the Free Software Foundation.

 This library is distributed in the hope that it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 details.

 You should have received a copy of the GNU Lesser General Public License
 along with this library; if not, write to the Free Software Foundation, Inc.,
 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""


# pylint: disable=C0325, W0223
# pylama: ignore=E402

import sys
import os
import json
import ipyparallel
import sqlite3
import traceback
import pandas

from bluepymm import tools


[docs] def run_emodel_morph_isolated(input_args): """Run e-model morphology combination in isolated environment. Args: input_args: tuple - uid: unique identifier of the e-model morphology combination - emodel: e-model name - emodel_dir: directory containing e-model files - emodel_params: dict that maps e-model parameters to their values - morph_path: path to morphology - apical_point_isec: integer value of the apical point isection - extra_values_error: boolean to raise an exception upon a missing key Returns: Dict with keys 'exception', 'extra_values', 'scores', 'uid'. """ ( uid, emodel, emodel_dir, emodel_params, morph_path, apical_point_isec, extra_values_error ) = input_args return_dict = {'uid': uid, 'exception': None} pool = tools.NestedPool(1, maxtasksperchild=1) try: return_dict['scores'], return_dict['extra_values'] = pool.apply( run_emodel_morph, (emodel, emodel_dir, emodel_params, morph_path, apical_point_isec, extra_values_error )) except Exception: return_dict['scores'] = None return_dict['extra_values'] = None return_dict['exception'] = "".join(traceback.format_exception( *sys.exc_info())) pool.terminate() pool.join() del pool return return_dict
[docs] def read_apical_point(morph_dir, morph_name): """Read apical point from apical point json file""" json_filename = os.path.join(morph_dir, 'apical_points_isec.json') with open(json_filename) as json_file: apic_points = json.load(json_file) # Get apic_point isec from dict, if not found return None if morph_name in apic_points: return int(apic_points[morph_name]) else: return None
[docs] def run_emodel_morph( emodel, emodel_dir, emodel_params, morph_path, apical_point_isec, extra_values_error=True): """Run e-model morphology combination. Args: emodel: e-model name emodel_dir: directory containing e-model files emodel_params: dict that maps e-model parameters to their values morph_path: path to morphology apical_point_isec: integer value of the apical point isection extra_values_error: boolean to raise an exception upon a missing key Returns: tuple: - dict that maps features to scores - dict with extra values: 'holding_current' and 'threshold_current' """ try: sys.stdout = open('/dev/null', 'w') print('Running e-model %s on morphology %s in %s' % (emodel, morph_path, emodel_dir)) setup = tools.load_module('setup', emodel_dir) print("Changing path to %s" % emodel_dir) with tools.cd(emodel_dir): if hasattr(setup, 'multieval'): prefix = 'mm' altmorph = [[prefix, morph_path, apical_point_isec]] evaluator = setup.evaluator.create(etype='%s' % emodel, altmorph=altmorph) evaluator = evaluator.evaluators[0] # only one evaluator responses = evaluator.run_protocols( evaluator.fitness_protocols.values(), emodel_params) scores = evaluator.fitness_calculator.calculate_scores( responses) extra_values = {} for response_key, extra_values_key in [ ('%s.bpo_holding_current' % prefix, 'holding_current'), ('%s.bpo_threshold_current' % prefix, 'threshold_current')]: if response_key in responses: extra_values[extra_values_key] = responses[ response_key] else: if extra_values_error: raise ValueError( "Key %s not found in responses: %s" % (response_key, str(responses))) else: extra_values[extra_values_key] = None else: evaluator = setup.evaluator.create(etype='%s' % emodel) evaluator.cell_model.morphology.morphology_path = morph_path responses = evaluator.run_protocols( evaluator.fitness_protocols.values(), emodel_params) scores = evaluator.fitness_calculator.calculate_scores( responses) extra_values = {} extra_values['holding_current'] = \ responses.get('bpo_holding_current', None) extra_values['threshold_current'] = \ responses.get('bpo_threshold_current', None) return scores, extra_values except Exception: # Make sure exception and backtrace are thrown back to parent process raise Exception( "".join(traceback.format_exception(*sys.exc_info())))
[docs] def create_arg_list(scores_db_filename, emodel_dirs, final_dict, extra_values_error=False, use_apical_points=True): """Create list of argument tuples to be used as an input for run_emodel_morph. Args: scores_db_filename: path to .sqlite database emodel_dirs: a dict mapping e-models to the directories with e-model input files final_dict: a dict mapping e-models to dicts with e-model parameters extra_values_error: boolean to raise an exception upon a missing key use_apical_points: boolean to use apical points or not Raises: ValueError, if one of the database entries contains has value None for the key 'emodel'. """ arg_list = [] with sqlite3.connect(scores_db_filename) as scores_db: scores_db.row_factory = sqlite3.Row one_row = scores_db.execute('SELECT * FROM scores LIMIT 1').fetchone() apical_points_isec = {} setup = tools.load_module('setup', emodel_dirs[one_row['emodel']]) if hasattr(setup, 'multieval') and use_apical_points: apical_points_isec = tools.load_json( os.path.join(one_row['morph_dir'], "apical_points_isec.json") ) scores_cursor = scores_db.execute('SELECT * FROM scores') for row in scores_cursor.fetchall(): index = row['index'] morph_name = row['morph_name'] morph_ext = row['morph_ext'] if morph_ext is None: morph_ext = '.asc' apical_point_isec = None if morph_name in apical_points_isec: apical_point_isec = int(apical_points_isec[morph_name]) morph_filename = morph_name + morph_ext morph_path = os.path.abspath(os.path.join(row['morph_dir'], morph_filename)) if row['to_run'] == 1: emodel = row['emodel'] original_emodel = row['original_emodel'] if emodel is None: raise ValueError( "scores db row %s for morph %s, etype %s, mtype %s, " "layer %s doesn't have an e-model assigned to it" % (index, morph_name, row['etype'], row['mtype'], row['layer'])) args = (index, emodel, os.path.abspath(emodel_dirs[emodel]), final_dict[original_emodel]['params'], morph_path, apical_point_isec, extra_values_error) arg_list.append(args) print('Found %d rows in score database to run' % len(arg_list)) return arg_list
[docs] def save_scores(scores_db_filename, uid, scores, extra_values, exception, float_representation='.17g'): """Update a specific entry in a given database with scores and related parameters. Args: scores_db_filename: path to .sqlite database uid: unique identifier of database entry scores: scores dict to be added to entry as a json string extra_values: dict to be added to entry as a json string exception: description of exception that may have happened during score calculation float_representation: use for json encoding. Default is '.17g'. Returns: ValueError if entry has already been updated. """ json.encoder.FLOAT_REPR = lambda x: format(x, float_representation) with sqlite3.connect(scores_db_filename) as scores_db: # make sure we don't update a row that was already executed scores_cursor = scores_db.execute( 'SELECT `index` FROM scores WHERE `index`=? AND to_run=?', (uid, False)) if scores_cursor.fetchone() is None: # update row with calculated scores and related values scores_db.execute('UPDATE scores SET scores=?, extra_values=?, ' 'exception=?, to_run=? WHERE `index`=?', (json.dumps(scores), json.dumps(extra_values), exception, False, uid)) else: raise ValueError('save_scores: trying to update scores in a row ' 'that was already executed: %d' % uid)
[docs] def expand_scores_to_score_values_table(scores_sqlite_filename): """Read scores from sqlite table, expand to dataframe, and store in new table 'score_values'. Each column of the new table corresponds to a single score. Args: scores_sqlite_filename: path to sqlite database with keys 'scores' and 'to_run' Raises: Exception, if the scores table contains at least one entry where the value of 'to_run' is True. """ with sqlite3.connect(scores_sqlite_filename) as conn: scores = pandas.read_sql('SELECT * FROM scores', conn) tools.check_all_combos_have_run(scores, 'scores') score_values = scores['scores'].apply( lambda json_str: pandas.Series (json.loads(json_str)) if json_str else pandas.Series()) with sqlite3.connect(scores_sqlite_filename) as conn: score_values.to_sql('score_values', conn, if_exists='replace', index=False)
[docs] def calculate_scores(final_dict, emodel_dirs, scores_db_filename, use_ipyp=False, ipyp_profile=None, timeout=10, use_apical_points=True, n_processes=None): """Calculate scores of e-model morphology combinations and update the database accordingly. Args: scores_db_filename: path to .sqlite database with e-model morphology combinations final_dict: a dict mapping e-models to dicts with e-model parameters emodel_dirs: a dict mapping e-models to the directories with e-model input files use_ipyp: bool indicating whether ipyparallel is used. Default is False. ipyp_profile: path to ipyparallel profile. Default is None. use_apical_points: boolean to use apical points or not n_processes: the integer number of processes. If `None`, all processes are going to be used. """ print('Creating argument list for parallelisation') arg_list = create_arg_list(scores_db_filename, emodel_dirs, final_dict, use_apical_points=use_apical_points) print('Parallelising score evaluation of %d me-combos' % len(arg_list)) if use_ipyp: # use ipyparallel client = ipyparallel.Client(profile=ipyp_profile, timeout=timeout) lview = client.load_balanced_view(targets=n_processes) results = lview.imap(run_emodel_morph_isolated, arg_list, ordered=False) else: # use multiprocessing pool = tools.NestedPool(processes=n_processes) results = pool.imap_unordered(run_emodel_morph_isolated, arg_list) # every time a result comes in, save the score in the database for uids_received, result in enumerate(results, start=1): uid = result['uid'] scores = result['scores'] extra_values = result['extra_values'] exception = result['exception'] save_scores(scores_db_filename, uid, scores, extra_values, exception) print('Saved scores for uid %s (%d out of %d) %s' % (uid, uids_received, len(arg_list), 'with exception' if exception else '')) sys.stdout.flush() if not use_ipyp: pool.terminate() pool.join() print('Converting score json strings to scores values ...') expand_scores_to_score_values_table(scores_db_filename)