luigi_swf package

Submodules

luigi_swf.decider module

class luigi_swf.decider.DeciderServer(identity=None, stdout=None, stderr=None, logfilename=None, loglevel=20, logformat='%(asctime)s - %(levelname)s - %(name)s - %(message)s', **kwargs)

Bases: object

Decider daemon

Daemonizes LuigiSwfDecider. The SIGWINCH signal is used to shut this down lazily (after processing the current decision task or 60-second poll) because SIGTERM kills child processes.

Parameters:
  • stdout (stream (such as the return value of open())) – stream to which stdout will be written
  • stderr (stream (such as the return value of open())) – stream to which stderr will be written
  • logfilename (str) – file path to which the application log will be written
  • loglevel (log level constant from the logging module (logging.DEBUG, logging.INFO, logging.ERROR, etc.)) – log level
  • logformat (str) – format string of log output lines, as in the logging module
Returns:

None

pid_file()

Path to the decider daemon’s PID file, even if it is not running

Append ‘-waiting’ to this value to get the PID file path for a waiting process.

Returns:path to PID file
Return type:str
start()

Start the decider daemon and exit

If there is already a decider daemon running, this process will wait for that process to unlock its PID file before taking over. If there is already another process waiting to take over, the new one will send a SIGHUP to the old waiting process. This will not send any signals to the process that has locked the daemon PID file – it is your responsibility to call stop() before calling this. This method will return immediately and not wait for the new daemon process to lock the PID file.

See pid_file() for the PID file and waiting PID file paths.

Returns:None
stop()

Shut down the decider daemon lazily from its PID file

The decider daemon will exit in under 60 seconds if it is polling, or once the currently running decision task is finished. If it receives a decision task while waiting for the poll to finish, it will process the decision task and then exit. If there is a daemon process waiting to take over once the currently running one shuts down, this will send SIGHUP to the waiting process. This method will return immediately and will not wait for the processes to stop.

Returns:None
class luigi_swf.decider.LuigiSwfDecider(**kwargs)

Bases: boto.swf.layer2.Decider

Implementation of boto’s SWF Decider

See DeciderServer for daemonizing this.

run(identity=None)

Poll for and run a decision task

This should be run in a loop. It will poll for up to 60 seconds. After 60 seconds, it will return without running any decision tasks. The user should usually not need to interact with this class directly. Instead, DeciderServer can be used to run the loop.

Returns:None
class luigi_swf.decider.WfState

Bases: object

read_wf_state(events, task_configs)

luigi_swf.executor module

class luigi_swf.executor.LuigiSwfExecutor(domain, version, workflow_task, aws_access_key_id=None, aws_secret_access_key=None)

Bases: object

Workflow execution launcher

Can receive AWS credentials in __init__() or read [swfscheduler]->aws_access_key_id and [swfscheduler]->aws_secret_access_key from Luigi’s client.cfg. Otherwise, boto will try to read the credentials from environment variables or the EC2 instance metadata (if using an IAM role).

Parameters:
  • domain (str) – SWF domain
  • version (str) – SWF version (you may put “unspecified” if you don’t need this)
  • workflow_task (luigi.task.WrapperTask) – wrapper task that defines the workflow through its requires()
  • aws_access_key_id (str) – optional if using environment, config, or IAM
  • aws_secret_access_key (str) – optional if using environment, config, or IAM
execute()

Initiates a workflow execution on SWF and returns immediately

Run register() first.

register()

Registers the workflow type and task types with SWF

It is necessary to do this each time a new task is added to a workflow. It is safest to run this before each call to execute() if you are just launching a workflow from a cron. However, if you are launching many workflows and calling execute() many times, you may want to consider calling this method only when necessary because it can contribute to an SWF API throttling issue.

luigi_swf.tasks module

class luigi_swf.tasks.SwfHeartbeatCancel

Bases: object

Mix-in for Luigi Tasks

Mix-in this class to a Luigi task to make use of SWF heartbeat timeouts and cancellation.

When the task is being run directly in Luigi (not SWF), self.cancel_requested will always be False, and calling heartbeat() or ack_cancel() will have no effect. However, your code should not call ack_cancel() unless self.cancel_requested == True anyway.

ack_cancel()

Send cancellation acknowledgement to SWF

cancel_acked = False
cancel_requested = False
heartbeat()

Send heartbeat to SWF and check if cancellation was requested

If cancellation was requested, self.cancel_requested will be set to True after invoking this method. This method has no effect when the task is not being run with SWF.

register_activity_worker(activity_worker, activity)

Register the activity worker as an observer of heartbeats

Called by luigi_swf.worker.LuigiSwfWorker to register itself as the activity worker managing this instance of the task. It observes heartbeat() and ack_cancel() from this class.

luigi_swf.util module

class luigi_swf.util.SingleWaitingLockPidFile(pidfilepath, timeout_sec)

Bases: object

Locks a PID file, sending SIGHUP to anyone who’s already waiting.

>>> with SingleWaitingLockPidFile('aoeu.pid', 60.):
...     print('test')
Parameters:
  • pidfilepath (str) – path to PID file to lock
  • timeout_sec (int) – how long to wait to lock the PID file (in seconds)
luigi_swf.util.dictsortkey(d)

Sortable string key for a dict (Python 3 doesn’t do this on its own)

luigi_swf.util.dt_from_iso(iso)
luigi_swf.util.dthandler(obj)
luigi_swf.util.fullname(o)

Tuple of module name and class name from object

luigi_swf.util.get_class(module_name, class_name)

Class from module name and class name

luigi_swf.util.get_luigi_params(task)
>>> import luigi
>>> class TaskA(luigi.Task):
...     p1 = luigi.Parameter(default='foo')
...     p2 = luigi.Parameter(default='bar')
...     v1 = 'aoeu'
>>> get_luigi_params(TaskA()) == {'p1': 'foo', 'p2': 'bar'}
True
luigi_swf.util.get_task_configurations(task, wf_run_id=None, include_obj=False)
luigi_swf.util.kill_from_pid_file(pid_file, sig)

Signal a process given its PID file

Sends signal sig to the process ID found in the PID file. Does not raise an error if the file does not exist, a process ID could not be found in the file, or the process was not found.

Parameters:
  • pid_file (str) – path to PID file
  • sig (signal constant from signal module (i.e. signal.SIGTERM)) – signal to send to process

luigi_swf.worker module

class luigi_swf.worker.LuigiSwfWorker(**kwargs)

Bases: boto.swf.layer2.ActivityWorker

Implementation of boto’s SWF Activity Worker

See WorkerServer for daemonizing this.

run(identity=None)

Poll for and run an activity task

This should be run in a loop. It will poll for up to 60 seconds. After 60 seconds, it will return without running any activity tasks. The user should usually not need to interact with this class directly. Instead, WorkerServer can be used to run the loop.

Returns:None
class luigi_swf.worker.WorkerServer(worker_idx, identity=None, stdout=None, stderr=None, logfilename=None, loglevel=20, logformat='%(asctime)s - %(levelname)s - %(name)s - %(message)s', **kwargs)

Bases: object

Decider daemon

Daemonizes LuigiSwfWorker. The SIGWINCH signal is used to shut this down lazily (after processing the current activity task or 60-second poll) because SIGTERM kills child processes.

Parameters:
  • worker_idx (int) – worker index (instance number)
  • stdout (stream (such as the return value of open())) – stream to which stdout will be written
  • stderr (stream (such as the return value of open())) – stream to which stderr will be written
  • logfilename (str) – file path to which the application log will be written
  • loglevel (log level constant from the logging module (logging.DEBUG, logging.INFO, logging.ERROR, etc.)) – log level
  • logformat (str) – format string of log output lines, as in the logging module
Returns:

None

pid_file()

Path to the worker daemon’s PID file, even if it is not running

Append ‘-waiting’ to this value to get the PID file path for a waiting process.

Returns:path to PID file
Return type:str
start()

Start the worker daemon and exit

If there is already a worker daemon running with this index, this process will wait for that process to unlock its PID file before taking over. If there is already another process waiting to take over, the new one will send a SIGHUP to the old waiting process. This will not send any signals to the process that has locked the daemon PID file – it is your responsibility to call stop() before calling this. This method will return immediately and not wait for the new daemon process to lock the PID file.

See pid_file() for the PID file and waiting PID file paths.

Returns:None
stop()

Shut down the worker daemon lazily from its PID file

The worker daemon will exit in under 60 seconds if it is polling, or once the currently running activity task is finished. If it receives an activity task while waiting for the poll to finish, it will process the activity task and then exit. If there is a daemon process waiting to take over once the currently running one shuts down, this will send SIGHUP to the waiting process. This method will return immediately and will not wait for the processes to stop.

Returns:None

Module contents