Fork me on GitHub

pyhrf.parallel module

exception pyhrf.parallel.RemoteException

Bases: exceptions.Exception

pyhrf.parallel.dump_func(func, fn)
pyhrf.parallel.merge_default_kwargs(func, kwargs)
pyhrf.parallel.prepare_treatment_jobs(treatment, tmp_local_dir, local_result_path, local_user, local_host, remote_host, remote_user, remote_path, label_for_cluster)

Prepare soma-workflow jobs to perform one treatment (i.e., one subject).

Parameters:
  • treatment (FMRITreatment) – the treatment defining the analysis
  • tmp_local_dir (str) – a path where to store the temporary config file before sending it to the remote host
  • local_result_path (str) – path where to store the final result
  • local_user (str) – the user on the local host who enables SHH connection from the remote cluster
  • local_host (str) – local host (used to send back the result)
  • remote_host (str) – remote machine where the treatment will be run
  • remote_user (str) – user login on the remote machine.
  • remote_path (str) – path on the remote machine where to store ROI data and analysis results
  • label_for_cluster (str) – label prefix to name job in soma-workflow
Returns:

  • a tuple (job_split, jobs, dependencies, mainGroup)
  • job_split (Job) – job handling splitting of input data into ROI data
  • jobs (list of Job) – all jobs except the splitting jobs -> roi analyses, result merge, scp of result back to local host, data cleaning
  • dependencies (list of job pairs) – define the pipeline structure
  • mainGroup (Group) – top-level object gathering all jobs for this treatment.

pyhrf.parallel.remote_map(func, largs=None, lkwargs=None, mode='serial')

Execute a function in parallel on a list of arguments.

Parameters:
  • *func* (function) – function to apply on each item. this function must be importable on the remote side
  • *largs* (list of tuple) – each item in the list is a tuple containing all positional argument values of the function
  • *lkwargs* (list of dict) – each item in the list is a dict containing all named arguments of the function mapped to their value.
  • *mode* (str) –

    indicates how execution is distributed. Choices are:

    • ”serial”: single-thread loop on the local machine
    • ”local” : use joblib to run tasks in parallel.
      The number of simultaneous jobs is defined in the configuration section [‘parallel-local’][‘nb_procs’] see ~/.pyhrf/config.cfg
    • ”remote_cluster: use somaworkflow to run tasks in parallel.
      The connection setup has to be defined in the configuration section [‘parallel-cluster’] of ~/.pyhrf/config.cfg.
    • ”local_with_dumps”: testing purpose only, run each task serially as
      a subprocess.
Returns:

a list of results

Raises:

RemoteException if any remote task has failed

Example: >>> from pyhrf.parallel import remote_map >>> def foo(a, b=2): return a + b >>> remote_map(foo, [(2,),(3,)], [{‘b’:5}, {‘b’:7}]) [7, 10]

pyhrf.parallel.remote_map_marshal(func, largs=None, lkwargs=None, mode='local')
pyhrf.parallel.run_soma_workflow(treatments, exec_cmd, tmp_local_dirs, server_id, remote_host, remote_user, remote_pathes, local_result_pathes, label_for_cluster, wait_ending=False)

Dispatch treatments using soma-workflow.

Parameters:
  • treatments – it is a dict mapping a treatment name to a treatment object
  • exec_cmd – it is the command to run on each ROI data.
  • tmp_local_dirs – it is a dict mapping a treatment name to a local tmp dir (used to store a temporary configuration file)
  • server_id – it is the server ID as expected by WorkflowController
  • remote_host – it is the remote machine where treatments are treated in parallel
  • remote_user – it is used to log in remote_host
  • remote_pathes – it is a dict mapping a treatment name to an existing remote dir which will be used to store ROI data and result files
  • local_result_pathes – it is a dict mapping a treatment name to a local path where final results will be sorted (host will send it there by scp)
  • label_for_cluster – it is the base name used to label workflows and sub jobs
pyhrf.parallel.save_treatment(t, f)