fsl.utils.fslsub
¶
This module submits jobs to a computing cluster using FSL’s fsl_sub command line tool. It is assumed that the computing cluster is managed by SGE.
Example usage, building a short pipeline:
from fsl.utils.fslsub import submit
# submits bet to veryshort queue unless <mask_filename> already exists
bet_job = submit('bet <input_filename> -m',
queue='veryshort.q',
output='<mask_filename>')
# submits another job
other_job = submit('some other pre-processing step', queue='short.q')
# submits cuda job, that should only start after both preparatory jobs are
# finished. This will work if bet_job and other_job are single job-ids
# (i.e., strings) or a sequence of multiple job-ids
cuda_job = submit('expensive job',
wait_for=(bet_job, other_job),
queue='cuda.q')
Submits a given command to the cluster |
|
Gets information on a given job id |
|
Returns the output of the given job. |
|
Defines the command needed to run the function from the command line |
|
Waits until all jobs have finished |
-
class
fsl.utils.fslsub.
SubmitParams
(minutes: Optional[float] = None, queue: Optional[str] = None, architecture: Optional[str] = None, priority: Optional[int] = None, email: Optional[str] = None, wait_for: Union[str, None, Collection[str]] = None, job_name: Optional[str] = None, ram: Optional[int] = None, logdir: Optional[str] = None, mail_options: Optional[str] = None, flags: bool = False, multi_threaded: Optional[Tuple[str, str]] = None, verbose: bool = False, env: Optional[dict] = None)[source]¶ Bases:
object
Represents the fsl_sub parameters
Any command line script can be submitted by the parameters by calling the SubmitParams object:
submit = SubmitParams(minutes=1, logdir='log', wait_for=['108023', '108019']) submit('echo finished')
This will run “echo finished” with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished. It is the equivalent of
fsl_sub -T 1 -l log -j 108023,108019 "echo finished"
For python scripts that submit themselves to the cluster, it might be useful to give the user some control over at least some of the submission parameters. This can be done using:
import argparse parser = argparse.ArgumentParser("my script doing awesome stuff") parser.add_argument("input_file") parser.add_argument("output_file") SubmitParams.add_to_parser(parser, include=('wait_for', 'logdir')) args = parser.parse_args() submitter = SubmitParams.from_args(args).update(minutes=10) from fsl import wrappers wrappers.bet(input_file, output_file, fslsub=submitter)
This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes.
-
minutes
: Optional[float] = None¶
-
queue
: Optional[str] = None¶
-
architecture
: Optional[str] = None¶
-
priority
: Optional[int] = None¶
-
email
: Optional[str] = None¶
-
wait_for
: Union[str, None, Collection[str]] = None¶
-
job_name
: Optional[str] = None¶
-
ram
: Optional[int] = None¶
-
logdir
: Optional[str] = None¶
-
mail_options
: Optional[str] = None¶
-
flags
: bool = False¶
-
multi_threaded
: Optional[Tuple[str, str]] = None¶
-
verbose
: bool = False¶
-
env
: dict = None¶
-
cmd_line_flags
= {'-M': 'email', '-N': 'job_name', '-R': 'ram', '-T': 'minutes', '-a': 'architecture', '-l': 'logdir', '-m': 'mail_options', '-p': 'priority', '-q': 'queue'}¶
-
__post_init__
()[source]¶ If not set explicitly by the user don’t alter the environment in which the script will be submitted
-
as_flags
()[source]¶ Creates flags for submission using fsl_sub
All parameters changed from their default value (typically None) will be included in the flags.
- Returns
tuple with the flags
-
__call__
(*command, **kwargs)[source]¶ Submits the command to the cluster.
- Parameters
command – string or tuple of strings with the command to submit
kwargs – Keyword arguments can override any parameters set in self
- Returns
job ID
-
classmethod
add_to_parser
(parser: argparse.ArgumentParser, as_group='fsl_sub commands', include=('wait_for', 'logdir', 'email', 'mail_options'))[source]¶ Adds submission parameters to the parser
- Parameters
parser – parser that should understand submission commands
as_group – add as a new group
include – sequence of argument flags/names that should be added to the parser (set to None to include everything)
- Returns
the group the arguments got added to
-
__annotations__
= {'architecture': typing.Optional[str], 'email': typing.Optional[str], 'env': <class 'dict'>, 'flags': <class 'bool'>, 'job_name': typing.Optional[str], 'logdir': typing.Optional[str], 'mail_options': typing.Optional[str], 'minutes': typing.Optional[float], 'multi_threaded': typing.Optional[typing.Tuple[str, str]], 'priority': typing.Optional[int], 'queue': typing.Optional[str], 'ram': typing.Optional[int], 'verbose': <class 'bool'>, 'wait_for': typing.Union[str, NoneType, typing.Collection[str]]}¶
-
__dataclass_fields__
= {'architecture': Field(name='architecture',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'email': Field(name='email',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'env': Field(name='env',type=<class 'dict'>,default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'flags': Field(name='flags',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'job_name': Field(name='job_name',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'logdir': Field(name='logdir',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'mail_options': Field(name='mail_options',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'minutes': Field(name='minutes',type=typing.Optional[float],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'multi_threaded': Field(name='multi_threaded',type=typing.Optional[typing.Tuple[str, str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'priority': Field(name='priority',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'queue': Field(name='queue',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'ram': Field(name='ram',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'verbose': Field(name='verbose',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'wait_for': Field(name='wait_for',type=typing.Union[str, NoneType, typing.Collection[str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD)}¶
-
__dataclass_params__
= _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False)¶
-
__dict__
= mappingproxy({'__module__': 'fsl.utils.fslsub', '__annotations__': {'minutes': typing.Optional[float], 'queue': typing.Optional[str], 'architecture': typing.Optional[str], 'priority': typing.Optional[int], 'email': typing.Optional[str], 'wait_for': typing.Union[str, NoneType, typing.Collection[str]], 'job_name': typing.Optional[str], 'ram': typing.Optional[int], 'logdir': typing.Optional[str], 'mail_options': typing.Optional[str], 'flags': <class 'bool'>, 'multi_threaded': typing.Optional[typing.Tuple[str, str]], 'verbose': <class 'bool'>, 'env': <class 'dict'>}, '__doc__': '\n Represents the fsl_sub parameters\n\n Any command line script can be submitted by the parameters by calling the `SubmitParams` object:\n\n .. code-block:: python\n\n submit = SubmitParams(minutes=1, logdir=\'log\', wait_for=[\'108023\', \'108019\'])\n submit(\'echo finished\')\n\n This will run "echo finished" with a maximum runtime of 1 minute after the jobs with IDs 108023 and 108019 are finished.\n It is the equivalent of\n\n .. code-block:: bash\n\n fsl_sub -T 1 -l log -j 108023,108019 "echo finished"\n\n For python scripts that submit themselves to the cluster, it might be useful to give the user some control\n over at least some of the submission parameters. This can be done using:\n\n .. code-block:: python\n\n import argparse\n parser = argparse.ArgumentParser("my script doing awesome stuff")\n parser.add_argument("input_file")\n parser.add_argument("output_file")\n SubmitParams.add_to_parser(parser, include=(\'wait_for\', \'logdir\'))\n args = parser.parse_args()\n\n submitter = SubmitParams.from_args(args).update(minutes=10)\n from fsl import wrappers\n wrappers.bet(input_file, output_file, fslsub=submitter)\n\n This submits a BET job using the -j and -l flags set by the user and a maximum time of 10 minutes.\n ', 'minutes': None, 'queue': None, 'architecture': None, 'priority': None, 'email': None, 'wait_for': None, 'job_name': None, 'ram': None, 'logdir': None, 'mail_options': None, 'flags': False, 'multi_threaded': None, 'verbose': False, 'env': None, 'cmd_line_flags': {'-T': 'minutes', '-q': 'queue', '-a': 'architecture', '-p': 'priority', '-M': 'email', '-N': 'job_name', '-R': 'ram', '-l': 'logdir', '-m': 'mail_options'}, '__post_init__': <function SubmitParams.__post_init__>, 'as_flags': <function SubmitParams.as_flags>, '__str__': <function SubmitParams.__str__>, '__call__': <function SubmitParams.__call__>, 'update': <function SubmitParams.update>, 'add_to_parser': <classmethod object>, 'from_args': <classmethod object>, '__dict__': <attribute '__dict__' of 'SubmitParams' objects>, '__weakref__': <attribute '__weakref__' of 'SubmitParams' objects>, '__dataclass_params__': _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False), '__dataclass_fields__': {'minutes': Field(name='minutes',type=typing.Optional[float],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'queue': Field(name='queue',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'architecture': Field(name='architecture',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'priority': Field(name='priority',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'email': Field(name='email',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'wait_for': Field(name='wait_for',type=typing.Union[str, NoneType, typing.Collection[str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'job_name': Field(name='job_name',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'ram': Field(name='ram',type=typing.Optional[int],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'logdir': Field(name='logdir',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'mail_options': Field(name='mail_options',type=typing.Optional[str],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'flags': Field(name='flags',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'multi_threaded': Field(name='multi_threaded',type=typing.Optional[typing.Tuple[str, str]],default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'verbose': Field(name='verbose',type=<class 'bool'>,default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD), 'env': Field(name='env',type=<class 'dict'>,default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=_FIELD)}, '__init__': <function __create_fn__.<locals>.__init__>, '__repr__': <function __create_fn__.<locals>.__repr__>, '__eq__': <function __create_fn__.<locals>.__eq__>, '__hash__': None})¶
-
__eq__
(other)¶ Return self==value.
-
__hash__
= None¶
-
__init__
(minutes: Optional[float] = None, queue: Optional[str] = None, architecture: Optional[str] = None, priority: Optional[int] = None, email: Optional[str] = None, wait_for: Union[str, None, Collection[str]] = None, job_name: Optional[str] = None, ram: Optional[int] = None, logdir: Optional[str] = None, mail_options: Optional[str] = None, flags: bool = False, multi_threaded: Optional[Tuple[str, str]] = None, verbose: bool = False, env: Optional[dict] = None) → None¶ Initialize self. See help(type(self)) for accurate signature.
-
__module__
= 'fsl.utils.fslsub'¶
-
__repr__
()¶ Return repr(self).
-
__weakref__
¶ list of weak references to the object (if defined)
-
-
fsl.utils.fslsub.
submit
(*command, **kwargs)[source]¶ Submits a given command to the cluster
You can pass the command and arguments as a single string, or as a regular or unpacked sequence.
- Parameters
command – string or regular/unpacked sequence of strings with the job command
minutes – Estimated job length in minutes, used to auto-set queue name
queue – Explicitly sets the queue name
architecture – e.g., darwin or lx24-amd64
priority – Lower priority [0:-1024] default = 0
email – Who to email after job completion
wait_for – Place a hold on this task until the job-ids in this string or tuple are complete
job_name – Specify job name as it will appear on the queue
ram – Max total RAM to use for job (integer in MB)
logdir – where to output logfiles
mail_options – Change the SGE mail options, see qsub for details
output – If <output> image or file already exists, do nothing and exit
flags – If True, use flags embedded in scripts to set SGE queuing options
multi_threaded –
Submit a multi-threaded task - Set to a tuple containing two elements:
<pename>: a PE configures for the requested queues
<threads>: number of threads to run
verbose – If True, use verbose mode
env – Dict containing environment variables
- Returns
string of submitted job id
-
fsl.utils.fslsub.
info
(job_ids) → Dict[str, Optional[Dict[str, str]]][source]¶ Gets information on a given job id
Uses qstat -j <job_ids>
- Parameters
job_ids – string with job id or (nested) sequence with jobs
- Returns
dictionary of jobid -> another dictionary with job information (or None if job does not exist)
-
fsl.utils.fslsub.
_parse_qstat
(job_ids_string, qstat_stdout)[source]¶ Parses the qstat output into a dictionary of dictionaries
- Parameters
job_ids_string – input job ids
qstat_stdout – qstat output
- Returns
dictionary of jobid -> another dictionary with job information (or None if job does not exist)
-
fsl.utils.fslsub.
output
(job_id, logdir='.', command=None, name=None)[source]¶ Returns the output of the given job.
- Parameters
job_id – String containing job ID.
logdir – Directory containing the log - defaults to the current directory.
command – Command that was run. Not currently used.
name – Job name if it was specified. Not currently used.
- Returns
A tuple containing the standard output and standard error.
-
fsl.utils.fslsub.
_flatten_job_ids
(job_ids)[source]¶ Returns a potentially nested sequence of job ids as a single comma-separated string
- Parameters
job_ids – possibly nested sequence of job ids. The job ids themselves should be strings.
- Returns
comma-separated string of job ids
-
fsl.utils.fslsub.
hold
(job_ids, hold_filename=None)[source]¶ Waits until all jobs have finished
Internally works by submitting a new job, which creates a file named hold_filename, which will only run after all jobs in job_ids finished.
This function will only return once hold_filename has been created
- Parameters
job_ids – possibly nested sequence of job ids. The job ids themselves should be strings.
hold_filename – filename to use as a hold file. The containing directory should exist, but the file itself should not. Defaults to a ./.<random characters>.hold in the current directory.
- Returns
only returns when all the jobs have finished
-
fsl.utils.fslsub.
func_to_cmd
(func, args=None, kwargs=None, tmp_dir=None, clean='never', verbose=False)[source]¶ Defines the command needed to run the function from the command line
WARNING: if submitting a function defined in the __main__ script, the script will be run again to retrieve this function. Make sure there is a “if __name__ == ‘__main__’” guard to prevent the full script from being rerun.
- Parameters
func – function to be run
args – positional arguments
kwargs – keyword arguments
tmp_dir – directory where to store the temporary file
clean –
Whether the script should be removed after running. There are three options:
”never” (default): Script is kept
”on_success”: only remove if script successfully finished (i.e., no error is raised)
”always”: always remove the script, even if it raises an error
verbose – If set to True, the script will print its own filename before running
- Returns
string which will run the function