I'm new to Dagster and I'm getting beat up too much to get my scenario to work.
I would like to create jobs dynamically, and I've been to the Dagster documentation but I couldn't reproduce it at all.
My scenario is as follows:
I have 3 functions in python (@op), the first being where I fetch data from an EP. In the second function I process this data, and in the third function I receive a name of "client" as a parameter and with the processed result of the second function, I save it in that client's database. Example:
@op def step1(): return {"a" : 1, "b:" 2, "c: 3"} @op def step2(result): //process exemple return {"a" : 4, "b" : 5, "c" : 6} @op def step3(result,client): //update in database client @graph def process(): response = step1() result_process = step2(response) step3(result_process, "name_client") @job def job_examplo(): process()
the problem is that I have a list of customers, and I didn't want to create multiple @op by passing a different customer name. I tried to do something like:
clients = [ "example_microsoft", "example_apple", "example_amazon", ... ... "examplo_skynet" ] @graph def process(): response = step1() result_process = step2(response) for client_name in clients: step3(result_process, "client_name")
But when I do this approach, I get an error. Is there any way at this stage I can generate this step 3 dynamically?
Additional Information: I am using DockerRunLauncher
I already tried to use the outputs, but without success. https://docs.dagster.io/concepts/ops-jobs-graphs/dynamic-graphs
1 Answer
It seems like you don't really need to create jobs dynamically, more that you just need to be able to configure the job with a list of clients (see the configuration docs) Maybe something like
@op def step1(): return {"a" : 1, "b:" 2, "c: 3"} @op def step2(result): # process exemple return {"a" : 4, "b" : 5, "c" : 6} @op(config_schema={"client_list": Field([str])}) def step3(context, result,client): # update in database client client_list = context.op_config["client_list"] # loop over client list and do the database update @graph def process(): response = step1() result_process = step2(response) step3(result_process) @job def job_examplo(): process()
Then you can do the looping in your op. If you want to parallelize the step3
op across all your clients, you could use dynamic graphs, which might looks something like this:
@op def step1(): return {"a" : 1, "b:" 2, "c: 3"} @op def step2(result): # process exemple return {"a" : 4, "b" : 5, "c" : 6} @op(config_schema={"client_list": Field([str])}, out={"result": DynamicOut(str)) def parallelize_clients(context): client_list = context.op_config["client_list"] for i, c in client_list: yield DynamicOutput(c, mapping_key=str(i)) @op def step3(result,client): # update in database client pass @graph def process(): response = step1() result_process = step2(response) clients = parallelize_clients() clients.map(lambda client: step3(result_process, client)) job_examplo = process.to_job() # To execute job_examplo.execute_in_process(run_config={"ops": {"parallelize_clients": {"config": {"client_list": ["client1", "client2", "client3"]}}}}
There are a couple ways to execute this code, see the dagster job execution docs for more details.
To summarize the methods in brief, you can do any of the following:
This method launches the Dagit UI in a web browser, where you can select your job and provide configuration via the Launchpad
# from the command line dagit -f file_with_jobs.py
This method launches the job directly via the Dagster CLI:
dagster job execute -f file_with_jobs.py --job job_examplo --config path/to/run_config.yaml
In this method your configuration must be saved in a yaml file. Configuration for the example job would be something like
ops: parallelize_clients: config: client_list: - client1 - client2
You can also launch jobs programmatically using the Python API, which can be helpful for debugging or unit testing (Note that this method will launch your job in a single process, so you won't get any parallelism):
from file_with_jobs import job_examplo if __name__ == "__main__": result = job_examplo.execute_in_process(config={"ops": {"parallelize_clients": {"config": {"client_list": ["client1", "client2"]}}}}
3ncG1vNJzZmirpJawrLvVnqmfpJ%2Bse6S7zGiorp2jqbawutJobm1sYG2BdH6Omqytp52WwaqvwKWjsmWTp7KiwMRmoaiao2K2r3nDmp6srJWn