Pytask IO

Pytask IO Class

class pytask_io.pytask_io.PyTaskIO(*args, **kwargs)[source]
Kwargs

Key store_host

The store host name. Default is localhost.

Key store_port

The store port. Default is 0

Key store db

The store db number. Default is 6379

Key workers

The amount of workers in the asyncio task queue. Default is 1.

add_task(unit_of_work: Callable, *args) → Dict[str, Any][source]

pytask_io.add_task method take a function as a first argument & the function arguments as the next arguments. When pytask_io.add_task is called, it will return a dictionary with some useful data. Although the data returned can be used in many ways, the easiest & most straight forward use is to pass this dict directly to pytask_io.get_task. Example:

# Create a `pytaskio` object & run the event loop in new thread:
pytaskio = PyTaskIO()
pytaskio.run()

# the `add_task` method task in a function as a first argument
metadata = pytask.add_task(send_email, title, body)


# Later we can use the `metadata` result to pass to `add_task`
result = get_task(metadata)
Parameters
  • unit_of_work – A callable / executable Python function

  • args – The list of arguments required by unit_of_work

Returns

metadata

get_task(unit_of_work_metadata: Dict[str, Any]) → Optional[Dict[str, Any]][source]

Method to get the task results from the store. This function can be called directly after executing the pytask_io.add_task. If the results referenced from the metadata dict see below are available when this method is called, then the result is return otherwise None is returned. If the result is None, then you have to retry to call this method again, sometime in the future. Example:

# Create a `pytaskio` object & run the event loop in new thread:
pytaskio = PyTaskIO()
pytaskio.run()

# the `add_task` method task in a function as a first argument
metadata = pytask.add_task(send_email, title, body)


# Later we can use the `metadata` result to pass to `add_task`
result = get_task(metadata)
Parameters

unit_of_work_metadata – Dict[str, Any] -

Return Union[Dict, bool]

The result is non blocking

loop_thread: Thread = None

The thread that the asyncio event loop runs in. This thread has been tagged with the name of event_loop. The thread will die gracefully when PytaskIO calls event_loop.join() for you. If you require custom handling of the event_loop thread, then you can access it directly using the pytask_io.loop_thread object. Example:

pytaskio = PytaskIO()
pytaskio.loop_thread.is_alive() # check if the thread is still alive
main_loop: asyncio.AbstractEventLoop = None

The main loop that is used by PytaskIO. If you wish to handle some of the asyncio behavior of the main loop, then you can access the asyncio object directly with pytask_io.main_loop.

pole_loop: asyncio.AbstractEventLoop = None

The pole loop that is available for PytaskIO public methods such as pytask_io.poll_for_task If you wish to handle some of the asyncio behavior of the pole loop, then you can access the asyncio object directly with pytask_io.pole_loop.

poll_for_task(task_meta: Dict, **kwargs) → Union[Dict[str, Any], bool][source]

Warning: This method is still in a beta state, use with care! Blocking function to be used either with an async library or from a separate thread from the client application’s main thread. This method will create an asyncio event loop & make periodic requests to the store. Once the results are returned, the loop with be stopped and the main thread will be available once again. Example:

# Create a `pytaskio` object & run the event loop in new thread:
pytaskio = PyTaskIO()
pytaskio.run()

# the `add_task` method task in a function as a first argument
metadata = pytask.add_task(send_email, title, body)


# Later we can use the `metadata` result to pass to `add_task`
result = poll_for_task(metadata, tries=100, interval=60)
Parameters
  • task_meta

  • kwargs

Key tries

The amount of times the method polls the store for the task execution results.

Key interval

The time in seconds for each try

Returns

queue_client: redis.Redis = None

PytaskIO is a python task queue that leverages CPython’s asyncio library to make long running task trivial. The library aims to make the public API as simple and intuitive as possible. Basic usage. Example:

Starts the task runner
      pytask = PytaskIO(
      store_port=8080,
      store_host="localhost",
      broker="redis",  # rabbitmq coming soon...
      db=0,
 )

 # Start the PytaskIO task queue on a separate thread.
 pytask.run()

 # Handle a long running process, in this case a send email function
 metadata = pytask.add_task(send_email, title, body)

 # Try once to get the results of your email sometime in the future
 result = get_task(metadata)

 # Stop PytaskIO completly (This will not effect any units of work that havent yet executed)
 pytask.stop()

The connected queue client object. Use this object exactly as you would if you were referencing the queue’s client directly. Example:

# Example for default Redis queue pushing a task into the queue
pytaskio = PytaskIO()
results = pytaskio.queue_client.lpush("my_queue", my_task)
queue_store: redis.Redis = None

The queue_store is available to work with & can be accessed on the PyTaskIO instance & all the available methods from the store framework used will be available on this object. Foe example:

# Example fpr default Redis store setting a new key
pytaskio = PytaskIO()
pytaskio.queue_store.set('myfield', 'my_value')
run()[source]

Starts an event loop on a new thread with a name of event_loop Example:

# Create a `pytaskio` object & run the event loop in new thread:
pytaskio = PyTaskIO()
pytaskio.run()
Returns

stop() → None[source]

Method to elegantly stop the asyncio event loop & join the event_loop thread. This method will only be executed when all active task have finished executing. If there are any pending tasks left in the clients queue then these can be executed once PytaskIO is run again. Example:

pytaskio = PyTaskIO()
pytaskio.run()
try:
    metadata = pytask.add_task(send_email, title, body)
except RunTimeError:
    pytaskio.stop()
Returns

None

store_db: int = 0

The store db number. Default is 6379

store_host: str = 'localhost'

The store host name. Default is localhost.

store_port: int = 6379

The store port. Default is 0

workers: int = 1

The amount of workers in the asyncio task queue. Default is 1.