Pytask IO¶
Pytask IO Class¶
-
class
pytask_io.pytask_io.
PyTaskIO
(*args, **kwargs)[source]¶ - Kwargs
- Key store_host
The store host name. Default is localhost.
- Key store_port
The store port. Default is 0
- Key store db
The store db number. Default is 6379
- Key workers
The amount of workers in the asyncio task queue. Default is 1.
-
add_task
(unit_of_work: Callable, *args) → Dict[str, Any][source]¶ pytask_io.add_task
method take a function as a first argument & the function arguments as the next arguments. Whenpytask_io.add_task
is called, it will return a dictionary with some useful data. Although the data returned can be used in many ways, the easiest & most straight forward use is to pass this dict directly topytask_io.get_task
. Example:# Create a `pytaskio` object & run the event loop in new thread: pytaskio = PyTaskIO() pytaskio.run() # the `add_task` method task in a function as a first argument metadata = pytask.add_task(send_email, title, body) # Later we can use the `metadata` result to pass to `add_task` result = get_task(metadata)
- Parameters
unit_of_work – A callable / executable Python function
args – The list of arguments required by unit_of_work
- Returns
metadata
-
get_task
(unit_of_work_metadata: Dict[str, Any]) → Optional[Dict[str, Any]][source]¶ Method to get the task results from the store. This function can be called directly after executing the
pytask_io.add_task
. If the results referenced from the metadata dict see below are available when this method is called, then the result is return otherwise None is returned. If the result is None, then you have to retry to call this method again, sometime in the future. Example:# Create a `pytaskio` object & run the event loop in new thread: pytaskio = PyTaskIO() pytaskio.run() # the `add_task` method task in a function as a first argument metadata = pytask.add_task(send_email, title, body) # Later we can use the `metadata` result to pass to `add_task` result = get_task(metadata)
- Parameters
unit_of_work_metadata – Dict[str, Any] -
- Return Union[Dict, bool]
The result is non blocking
-
loop_thread
: Thread = None¶ The thread that the asyncio event loop runs in. This thread has been tagged with the name of event_loop. The thread will die gracefully when PytaskIO calls event_loop.join() for you. If you require custom handling of the event_loop thread, then you can access it directly using the
pytask_io.loop_thread
object. Example:pytaskio = PytaskIO() pytaskio.loop_thread.is_alive() # check if the thread is still alive
-
main_loop
: asyncio.AbstractEventLoop = None¶ The main loop that is used by PytaskIO. If you wish to handle some of the asyncio behavior of the main loop, then you can access the asyncio object directly with
pytask_io.main_loop
.
-
pole_loop
: asyncio.AbstractEventLoop = None¶ The pole loop that is available for PytaskIO public methods such as
pytask_io.poll_for_task
If you wish to handle some of the asyncio behavior of the pole loop, then you can access the asyncio object directly withpytask_io.pole_loop
.
-
poll_for_task
(task_meta: Dict, **kwargs) → Union[Dict[str, Any], bool][source]¶ Warning: This method is still in a beta state, use with care! Blocking function to be used either with an async library or from a separate thread from the client application’s main thread. This method will create an asyncio event loop & make periodic requests to the store. Once the results are returned, the loop with be stopped and the main thread will be available once again. Example:
# Create a `pytaskio` object & run the event loop in new thread: pytaskio = PyTaskIO() pytaskio.run() # the `add_task` method task in a function as a first argument metadata = pytask.add_task(send_email, title, body) # Later we can use the `metadata` result to pass to `add_task` result = poll_for_task(metadata, tries=100, interval=60)
- Parameters
task_meta –
kwargs –
- Key tries
The amount of times the method polls the store for the task execution results.
- Key interval
The time in seconds for each try
- Returns
-
queue_client
: redis.Redis = None¶ PytaskIO is a python task queue that leverages CPython’s asyncio library to make long running task trivial. The library aims to make the public API as simple and intuitive as possible. Basic usage. Example:
Starts the task runner pytask = PytaskIO( store_port=8080, store_host="localhost", broker="redis", # rabbitmq coming soon... db=0, ) # Start the PytaskIO task queue on a separate thread. pytask.run() # Handle a long running process, in this case a send email function metadata = pytask.add_task(send_email, title, body) # Try once to get the results of your email sometime in the future result = get_task(metadata) # Stop PytaskIO completly (This will not effect any units of work that havent yet executed) pytask.stop()
The connected queue client object. Use this object exactly as you would if you were referencing the queue’s client directly. Example:
# Example for default Redis queue pushing a task into the queue pytaskio = PytaskIO() results = pytaskio.queue_client.lpush("my_queue", my_task)
-
queue_store
: redis.Redis = None¶ The queue_store is available to work with & can be accessed on the PyTaskIO instance & all the available methods from the store framework used will be available on this object. Foe example:
# Example fpr default Redis store setting a new key pytaskio = PytaskIO() pytaskio.queue_store.set('myfield', 'my_value')
-
run
()[source]¶ Starts an event loop on a new thread with a name of event_loop Example:
# Create a `pytaskio` object & run the event loop in new thread: pytaskio = PyTaskIO() pytaskio.run()
- Returns
-
stop
() → None[source]¶ Method to elegantly stop the asyncio event loop & join the event_loop thread. This method will only be executed when all active task have finished executing. If there are any pending tasks left in the clients queue then these can be executed once PytaskIO is run again. Example:
pytaskio = PyTaskIO() pytaskio.run() try: metadata = pytask.add_task(send_email, title, body) except RunTimeError: pytaskio.stop()
- Returns
None
-
store_db
: int = 0¶ The store db number. Default is 6379
-
store_host
: str = 'localhost'¶ The store host name. Default is localhost.
-
store_port
: int = 6379¶ The store port. Default is 0
-
workers
: int = 1¶ The amount of workers in the asyncio task queue. Default is 1.