luigi_swf package¶
Submodules¶
luigi_swf.decider module¶
-
class
luigi_swf.decider.DeciderServer(identity=None, stdout=None, stderr=None, logfilename=None, loglevel=20, logformat='%(asctime)s - %(levelname)s - %(name)s - %(message)s', **kwargs)¶ Bases:
objectDecider daemon
Daemonizes
LuigiSwfDecider. TheSIGWINCHsignal is used to shut this down lazily (after processing the current decision task or 60-second poll) becauseSIGTERMkills child processes.Parameters: - stdout (stream (such as the return value of
open())) – stream to which stdout will be written - stderr (stream (such as the return value of
open())) – stream to which stderr will be written - logfilename (str) – file path to which the application log will be written
- loglevel (log level constant from the
loggingmodule (logging.DEBUG,logging.INFO,logging.ERROR, etc.)) – log level - logformat (str) – format string of log output lines, as in the
loggingmodule
Returns: None
-
pid_file()¶ Path to the decider daemon’s PID file, even if it is not running
Append ‘-waiting’ to this value to get the PID file path for a waiting process.
Returns: path to PID file Return type: str
-
start()¶ Start the decider daemon and exit
If there is already a decider daemon running, this process will wait for that process to unlock its PID file before taking over. If there is already another process waiting to take over, the new one will send a
SIGHUPto the old waiting process. This will not send any signals to the process that has locked the daemon PID file – it is your responsibility to callstop()before calling this. This method will return immediately and not wait for the new daemon process to lock the PID file.See
pid_file()for the PID file and waiting PID file paths.Returns: None
-
stop()¶ Shut down the decider daemon lazily from its PID file
The decider daemon will exit in under 60 seconds if it is polling, or once the currently running decision task is finished. If it receives a decision task while waiting for the poll to finish, it will process the decision task and then exit. If there is a daemon process waiting to take over once the currently running one shuts down, this will send
SIGHUPto the waiting process. This method will return immediately and will not wait for the processes to stop.Returns: None
- stdout (stream (such as the return value of
-
class
luigi_swf.decider.LuigiSwfDecider(**kwargs)¶ Bases:
boto.swf.layer2.DeciderImplementation of boto’s SWF Decider
See
DeciderServerfor daemonizing this.-
run(identity=None)¶ Poll for and run a decision task
This should be run in a loop. It will poll for up to 60 seconds. After 60 seconds, it will return without running any decision tasks. The user should usually not need to interact with this class directly. Instead,
DeciderServercan be used to run the loop.Returns: None
-
luigi_swf.executor module¶
-
class
luigi_swf.executor.LuigiSwfExecutor(domain, version, workflow_task, aws_access_key_id=None, aws_secret_access_key=None)¶ Bases:
objectWorkflow execution launcher
Can receive AWS credentials in
__init__()or read[swfscheduler]->aws_access_key_idand[swfscheduler]->aws_secret_access_keyfrom Luigi’s client.cfg. Otherwise, boto will try to read the credentials from environment variables or the EC2 instance metadata (if using an IAM role).Parameters: - domain (str) – SWF domain
- version (str) – SWF version (you may put “unspecified” if you don’t need this)
- workflow_task (
luigi.task.WrapperTask) – wrapper task that defines the workflow through itsrequires() - aws_access_key_id (str) – optional if using environment, config, or IAM
- aws_secret_access_key (str) – optional if using environment, config, or IAM
-
execute()¶ Initiates a workflow execution on SWF and returns immediately
Run
register()first.
-
register()¶ Registers the workflow type and task types with SWF
It is necessary to do this each time a new task is added to a workflow. It is safest to run this before each call to
execute()if you are just launching a workflow from a cron. However, if you are launching many workflows and callingexecute()many times, you may want to consider calling this method only when necessary because it can contribute to an SWF API throttling issue.
luigi_swf.tasks module¶
-
class
luigi_swf.tasks.SwfHeartbeatCancel¶ Bases:
objectMix-in for Luigi Tasks
Mix-in this class to a Luigi task to make use of SWF heartbeat timeouts and cancellation.
When the task is being run directly in Luigi (not SWF),
self.cancel_requestedwill always beFalse, and callingheartbeat()orack_cancel()will have no effect. However, your code should not callack_cancel()unlessself.cancel_requested == Trueanyway.-
ack_cancel()¶ Send cancellation acknowledgement to SWF
-
cancel_acked= False¶
-
cancel_requested= False¶
-
heartbeat()¶ Send heartbeat to SWF and check if cancellation was requested
If cancellation was requested,
self.cancel_requestedwill be set toTrueafter invoking this method. This method has no effect when the task is not being run with SWF.
-
register_activity_worker(activity_worker, activity)¶ Register the activity worker as an observer of heartbeats
Called by
luigi_swf.worker.LuigiSwfWorkerto register itself as the activity worker managing this instance of the task. It observesheartbeat()andack_cancel()from this class.
-
luigi_swf.util module¶
-
class
luigi_swf.util.SingleWaitingLockPidFile(pidfilepath, timeout_sec)¶ Bases:
objectLocks a PID file, sending
SIGHUPto anyone who’s already waiting.>>> with SingleWaitingLockPidFile('aoeu.pid', 60.): ... print('test')
Parameters:
-
luigi_swf.util.dictsortkey(d)¶ Sortable string key for a dict (Python 3 doesn’t do this on its own)
-
luigi_swf.util.dt_from_iso(iso)¶
-
luigi_swf.util.dthandler(obj)¶
-
luigi_swf.util.fullname(o)¶ Tuple of module name and class name from object
-
luigi_swf.util.get_class(module_name, class_name)¶ Class from module name and class name
-
luigi_swf.util.get_luigi_params(task)¶ >>> import luigi >>> class TaskA(luigi.Task): ... p1 = luigi.Parameter(default='foo') ... p2 = luigi.Parameter(default='bar') ... v1 = 'aoeu' >>> get_luigi_params(TaskA()) == {'p1': 'foo', 'p2': 'bar'} True
-
luigi_swf.util.get_task_configurations(task, wf_run_id=None, include_obj=False)¶
-
luigi_swf.util.kill_from_pid_file(pid_file, sig)¶ Signal a process given its PID file
Sends signal
sigto the process ID found in the PID file. Does not raise an error if the file does not exist, a process ID could not be found in the file, or the process was not found.Parameters:
luigi_swf.worker module¶
-
class
luigi_swf.worker.LuigiSwfWorker(**kwargs)¶ Bases:
boto.swf.layer2.ActivityWorkerImplementation of boto’s SWF Activity Worker
See
WorkerServerfor daemonizing this.-
run(identity=None)¶ Poll for and run an activity task
This should be run in a loop. It will poll for up to 60 seconds. After 60 seconds, it will return without running any activity tasks. The user should usually not need to interact with this class directly. Instead,
WorkerServercan be used to run the loop.Returns: None
-
-
class
luigi_swf.worker.WorkerServer(worker_idx, identity=None, stdout=None, stderr=None, logfilename=None, loglevel=20, logformat='%(asctime)s - %(levelname)s - %(name)s - %(message)s', **kwargs)¶ Bases:
objectDecider daemon
Daemonizes
LuigiSwfWorker. TheSIGWINCHsignal is used to shut this down lazily (after processing the current activity task or 60-second poll) becauseSIGTERMkills child processes.Parameters: - worker_idx (int) – worker index (instance number)
- stdout (stream (such as the return value of
open())) – stream to which stdout will be written - stderr (stream (such as the return value of
open())) – stream to which stderr will be written - logfilename (str) – file path to which the application log will be written
- loglevel (log level constant from the
loggingmodule (logging.DEBUG,logging.INFO,logging.ERROR, etc.)) – log level - logformat (str) – format string of log output lines, as in the
loggingmodule
Returns: None
-
pid_file()¶ Path to the worker daemon’s PID file, even if it is not running
Append ‘-waiting’ to this value to get the PID file path for a waiting process.
Returns: path to PID file Return type: str
-
start()¶ Start the worker daemon and exit
If there is already a worker daemon running with this index, this process will wait for that process to unlock its PID file before taking over. If there is already another process waiting to take over, the new one will send a
SIGHUPto the old waiting process. This will not send any signals to the process that has locked the daemon PID file – it is your responsibility to callstop()before calling this. This method will return immediately and not wait for the new daemon process to lock the PID file.See
pid_file()for the PID file and waiting PID file paths.Returns: None
-
stop()¶ Shut down the worker daemon lazily from its PID file
The worker daemon will exit in under 60 seconds if it is polling, or once the currently running activity task is finished. If it receives an activity task while waiting for the poll to finish, it will process the activity task and then exit. If there is a daemon process waiting to take over once the currently running one shuts down, this will send
SIGHUPto the waiting process. This method will return immediately and will not wait for the processes to stop.Returns: None