- Published on
Recursive Argo Page Crawlers
- Authors
- Name
- Jacob Aronoff
Recently, I've been doing some work in kubernetes using Argo Workflows. Argo is a hyper powerful tool for running complex jobs, and allows you to run easily parallelized and templated code. One great use case for Argo are for running jobs of unknown length to parallelize data uploads and transformations. Unfortunately very little documentation / examples exist in order to help you do that, so I figured I'd share that here in case there's anyone grepping around the internet like I was.
I'll break it down here in the components to build it up. The example I'll go through is paging through an API and doing some work with the data we get back.
The building blocks
We have a few base components required to make this happen: the thing that gets data from an API (and the next page) and the thing to do some work on the data.
Data getter
Let's assume we're going to be making calls to api X @ test.X.com/api/block
, and the response will look something like this:
{
"data": {...},
"next": {
"next_page": "https://test.X.com/api/block?page_token=example_token_123_abc"
}
}
We'll assume we have a docker container (or maybe a python script) that can take a page to get data (input args as --page _page_
) and outputs the data it got at /tmp/result.json
.
In Argo, we create a template that looks like this (assume you have the right env vars for whatever container you're running):
- name: get-page-data
inputs:
parameters:
- name: next_page
container:
image: example-image:latest
command: ['python', 'page_getter.py']
args:
- --page
- '{{inputs.parameters.next_page}}'
outputs:
artifacts:
- name: output_data
path: /tmp/result.json
Data Processor
The data processor is a pretty simple container, all it does is take an input artifact and do something with that.
- name: process-page-data
inputs:
artifacts:
- name: input_data
path: /tmp/input.json
container:
image: example-image:latest
command: ['python', 'data_processor.py']
The processor gets some data from argo at the /tmp/input.json
path
Helper Functions (a dumb thing I wrote)
Something I found myself doing often in argo is getting some data from the output of a JSON object. At first I just had my containers write extra data to other paths to use as input parameters for other templates, but then I made a much repeatable way of doing this in Argo, without the need to make a container image.
- name: json-getter
inputs:
parameters:
- name: json_path
artifacts:
- name: data
path: /tmp/input.json
script:
image: python:alpine3.6
command: [python]
source: |
import json
with open("/tmp/input.json", "r") as f:
data = json.load(f)
print(data{{inputs.parameters.json_path}})
This function, given a JSON path and some JSON data, will simply traverse the JSON object and output in the [name].outputs.result
. I've used this template in a few workflows, and have found it very helpful!
note a similar thing could be accomplished using jq
in a small bash container, but my jq foo isn't too good
The first steps
Let's build up a single flow of our workflow. We are going to
- Get data
- Get the next page
- Do something with the data
So let's do that YAML in Argo:
- name: page-dag
inputs:
parameters:
- name: next_page
dag:
tasks:
- name: get-next-page
template: get-page-data
arguments:
parameters:
- name: next_page
value: '{{inputs.parameters.next_page}}'
- name: consume-output-artifact
template: process-page-data
dependencies: [get-next-page]
arguments:
artifacts:
- name: input_data
from: '{{tasks.get-next-page.outputs.artifacts.output_data}}'
- name: get-next-page-path
template: json-getter
dependencies: [get-next-page]
arguments:
parameters:
- name: json_path
value: '["next"]["next_page"]'
artifacts:
- name: data
from: '{{tasks.get-next-page.outputs.artifacts.results}}'
If you're unfamiliar with Argo, we're going to be running this process in a dag (directed-acyclic-graph.) The tasks in this dag follow exactly what we set out to do: we get some data, and then simultaneously get the next page and do something with that data. The final step is, the recursive part...
Recursion in Argo
At the end of our dag's task list, we need to get the data for the next page and continue processing data and then get the next page... and so on. But because we already have a task that does this, our dag! So we simply need to call it, by giving it the next page.
- name: page-dag
inputs:
parameters:
- name: next_page
dag:
tasks:
- name: get-next-page
template: get-page-data
arguments:
parameters:
- name: next_page
value: "{{inputs.parameters.next_page}}"
- name: consume-output-artifact
template: process-page-data
dependencies: [get-next-page]
arguments:
artifacts:
- name: input_data
from: "{{tasks.get-next-page.outputs.artifacts.output_data}}"
- name: get-next-page-path
template: json-getter
dependencies: [get-next-page]
arguments:
parameters:
- name: json_path
value: '["next"]["next_page"]'
artifacts:
- name: data
from: "{{tasks.get-next-page.outputs.artifacts.results}}"
- name: loop-it
template: page-dag
dependencies: [get-next-page-path]
when: "\'{{tasks.get-next-page-path.outputs.results}}\' != end"
arguments:
parameters:
- name: next_page
value: "{{tasks.get-next-page-path.outputs.results}}"
You'll see that our recursive step's basecase is when the output result from our json-getter == "end", but basically you would just need to program for whenever you know your paging is over.
And that's basically it! Now all we need to do is call it to kick it off!
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: page-crawler-loader-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: begin-recursion
template: page-dag
arguments:
parameters:
- name: next_page
value: '{{initial_page}}'
The end
We now have a recursive page crawler that can run in argo, my recommendation though is not to check out the UI past a few dozen requests, it starts to lag because there are so many nodes. I recommend just using the argo commands to check the status, or create a dashboard with the metrics to know when the job is complete. Here's the full file if you want to use it as a starter:
Full file
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: page-crawler-loader-
spec:
entrypoint: main
templates:
- name: page-dag
inputs:
parameters:
- name: next_page
dag:
tasks:
- name: get-next-page
template: get-page-data
arguments:
parameters:
- name: next_page
value: "{{inputs.parameters.next_page}}"
- name: consume-output-artifact
template: process-page-data
dependencies: [get-next-page]
arguments:
artifacts:
- name: input_data
from: "{{tasks.get-next-page.outputs.artifacts.output_data}}"
- name: get-next-page-path
template: json-getter
dependencies: [get-next-page]
arguments:
parameters:
- name: json_path
value: '["next"]["next_page"]'
artifacts:
- name: data
from: "{{tasks.get-next-page.outputs.artifacts.results}}"
- name: loop-it
template: page-dag
dependencies: [get-next-page-path]
when: "\'{{tasks.get-next-page-path.outputs.results}}\' != end"
arguments:
parameters:
- name: next_page
value: "{{tasks.get-next-page-path.outputs.results}}"
- name: json-getter
inputs:
parameters:
- name: json_path
artifacts:
- name: data
path: /tmp/input.json
script:
image: python:alpine3.6
command: [python]
source: |
import json
with open("/tmp/input.json", "r") as f:
data = json.load(f)
print(data{{inputs.parameters.json_path}})
- name: get-page-data
inputs:
parameters:
- name: next_page
container:
image: example-image:latest
command: ["python", "page_getter.py"]
args:
- --page
- "{{inputs.parameters.next_page}}"
outputs:
artifacts:
- name: output_data
path: /tmp/result.json
- name: process-page-data
inputs:
artifacts:
- name: input_data
path: /tmp/input.json
container:
image: example-image:latest
command: ["python", "data_processor.py"]
- name: main
steps:
- - name: begin-recursion
template: page-dag
arguments:
parameters:
- name: next_page
value: "{{initial_page}}"
Questions?
Please reach out with questions @ jaronoff@drift.com