Arguments & Results
Repid provides mechanisms to pass arguments to a job and retrieve the job's result.
Preparation
First of all, you will have to define what will be used to store the data. Arguments can either be stored inside of a message (so you only need message broker) or inside of a bucket broker. Results, on the other hand, can only be stored in a result bucket broker.
For example, let's create a connection with InMemoryBucketBroker.
from repid import Connection, InMemoryMessageBroker, InMemoryBucketBroker
my_connection = Connection(
message_broker=InMemoryMessageBroker(),
args_bucket_broker=InMemoryBucketBroker(),
results_bucket_broker=InMemoryBucketBroker(,
use_result_bucket=True,
),
)
We can then experiment with storing of our buckets.
Arguments
First of all, let's create an actor with some arguments.
from repid import Router
router = Router()
@router.actor
async def actor_with_args(user_id: int, user_name: str, user_messages: list[str]) -> list[str]:
user_message = f"Hi {user_name}! Your id is: {user_id}."
user_messages.append(user_message)
return user_messages
Now we would like to schedule a job, which will pass those arguments to the actor.
from repid import Job
# code above is omitted
await Job(
"actor_with_args",
args=dict(user_id=123, user_name="Alex", user_messages=["This is your first message!"]),
).enqueue()
# code below is omitted
What will happen under the hood?
Job.args
will be serialized usingConfig.SERIALIZER
- The serialized string will be
- encoded into the message or
- passed to arguments bucket broker if
Job.use_args_bucketer
is set to True- defaults to True if the current connection contains arguments bucket broker
- When the message will be received by the actor, arguments will be decoded and mapped to
the function arguments using actor's
Converter.convert_inputs
- The return of the actor will be encoded using actor's
Converter.convert_outputs
Results
You can define whether to store result of a job or not using Job.store_result
argument.
The default is to store result, if connection contains result bucket broker, and not to store result otherwise.
You can access result of a job using Job.result
async property.
import asyncio
from repid import Job
# code above is omitted
myjob = Job(
"actor_with_args",
args=dict(user_id=123, user_name="Alex", user_messages=["This is your first message!"]),
)
await myjob.enqueue()
await asyncio.sleep(1.0) # wait for the job to complete
result_bucket = await myjob.result
print(result_bucket)
# code below is omitted
Info
Results of recurring jobs will be overwritten.
IDs
By default, an UUID4 will be generated upon creation of a job, both in case of an arguments bucket and a result bucket.
You can pass your own IDs using appropriate arguments:
Chaining jobs
As a result bucket is technically backwards compatible with arguments bucket (result bucket just holds more fields), you can use it to chain multiple jobs.
Warning
Keep in mind that you will have to ensure order of the execution of those jobs and availability of the result bucket to the arguments bucket broker of the second job.
Example:
import asyncio
import os
from repid import Connection, Job, RedisBucketBroker, RedisMessageBroker, Repid, Router, Worker
redis_messages_dsn = os.environ.get("REDIS_CONNECTION")
redis_args_and_results_dsn = os.environ.get("REDIS_ARGS_CONNECTION")
my_connection = Connection(
message_broker=RedisMessageBroker(redis_messages_dsn),
args_bucket_broker=RedisBucketBroker(redis_args_and_results_dsn),
results_bucket_broker=RedisBucketBroker(
redis_args_and_results_dsn,
use_result_bucket=True,
),
)
app = Repid(my_connection)
my_router = Router()
@my_router.actor
async def add_hello(user_name: str, user_id: int, messages: list[str]) -> dict:
message = f"Hello {user_name}!"
messages.append(message)
return dict(user_id=user_id, messages=messages)
@my_router.actor
async def add_id(user_id: int, messages: list[str]) -> list[str]:
message = f"Your id is {user_id}."
messages.append(message)
return messages
async def main() -> None:
async with app.magic(auto_disconnect=True):
w = Worker(routers=[my_router], messages_limit=1)
await Job(
"add_hello",
args=dict(user_name="Alex", user_id=123, messages=["This is your first message!"]),
result_id="chained_id",
).enqueue()
await w.run()
second_job = Job("add_id", args_id="chained_id")
await second_job.enqueue()
await w.run()
result_bucket = await second_job.result
print(result_bucket.data)
if __name__ == "__main__":
asyncio.run(main())
TTL
Repid also supports specifying Time-To-Live for both arguments and result buckets.
from datetime import timedelta
Job("some_job", args_ttl=timedelta(weeks=2), result_ttl=timedelta(days=5))
You can also set TTL to None, which equals to no expiration.
By default, arguments buckets have no TTL, while result buckets have TTL of one day.
Warning
Not all bucket brokers may have native support for TTL, so be careful not to run out of memory.