Blueprints
Extract multiple tables from Postgres using SQL queries and process those as Pandas dataframes on schedule
Source
yaml
id: postgres-to-pandas-dataframes
namespace: company.team
variables:
  db_host: host.docker.internal
tasks:
  - id: get_tables
    type: io.kestra.plugin.core.flow.Parallel
    concurrent: 2
    tasks:
      - id: products
        type: io.kestra.plugin.jdbc.postgresql.CopyOut
        sql: SELECT * FROM products
      - id: orders
        type: io.kestra.plugin.jdbc.postgresql.CopyOut
        sql: SELECT * FROM orders
  - id: pandas
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      products.csv: "{{ outputs.products.uri }}"
      orders.csv: "{{ outputs.orders.uri }}"
    outputFiles:
      - bestsellers_pandas.json
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    containerImage: ghcr.io/kestra-io/pydata:latest
    script: |
      import pandas as pd
      products = pd.read_csv("products.csv")
      orders = pd.read_csv("orders.csv")
      df = orders.merge(products, on="product_id", how="left")
      top = (
          df.groupby("product_name", as_index=False)["total"]
          .sum()
          .sort_values("total", ascending=False)
          .head(10)
      )
      top.to_json("bestsellers_pandas.json", orient="records")
pluginDefaults:
  - type: io.kestra.plugin.jdbc.postgresql.CopyOut
    values:
      url: jdbc:postgresql://{{ vars.db_host }}:5432/
      username: postgres
      password: "{{ secret('DB_PASSWORD') }}"
      format: CSV
      header: true
      delimiter: ","
triggers:
  - id: every_morning
    type: io.kestra.plugin.core.trigger.Schedule
    cron: 0 9 * * *
About this blueprint
Python SQL Kestra
This flow uses the Parallel task to run multiple tasks concurrently - the
limit of how many tasks will run at the same time is defined using the
concurrent property.
The flow extracts data from a Postgres database. That data is then passed to
a Python task using inputFiles.  The Python task reads the input files,
and performs operations on the data using Pandas.
