Source
yaml
id: ingest-to-datalake-git
namespace: company.team
variables:
  bucket: kestraio
  prefix: inbox
  database: default
tasks:
  - id: list_objects
    type: io.kestra.plugin.aws.s3.List
    prefix: "{{vars.prefix}}"
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    bucket: "{{ vars.bucket }}"
  - id: check
    type: io.kestra.plugin.core.flow.If
    condition: "{{ outputs.list_objects.objects }}"
    then:
      - id: process_new_objects
        type: io.kestra.plugin.core.flow.WorkingDirectory
        tasks:
          - id: clone_repository
            type: io.kestra.plugin.git.Clone
            url: https://github.com/kestra-io/scripts
            branch: main
          - id: ingest_to_datalake
            type: io.kestra.plugin.scripts.python.Commands
            env:
              AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
              AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
              AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"
            taskRunner:
              type: io.kestra.plugin.scripts.runner.docker.Docker
            containerImage: ghcr.io/kestra-io/aws:latest
            commands:
              - python etl/aws_iceberg_fruit.py
      - id: merge_query
        type: io.kestra.plugin.aws.athena.Query
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        database: "{{ vars.database }}"
        outputLocation: s3://{{ vars.bucket }}/query_results/
        query: |
          MERGE INTO fruits f USING raw_fruits r
              ON f.fruit = r.fruit
              WHEN MATCHED
                  THEN UPDATE
                      SET id = r.id, berry = r.berry, update_timestamp = current_timestamp
              WHEN NOT MATCHED
                  THEN INSERT (id, fruit, berry, update_timestamp)
                        VALUES(r.id, r.fruit, r.berry, current_timestamp);
      - id: optimize
        type: io.kestra.plugin.aws.athena.Query
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        database: "{{ vars.database }}"
        outputLocation: s3://{{ vars.bucket }}/query_results/
        query: |
          OPTIMIZE fruits REWRITE DATA USING BIN_PACK;
      - id: move_to_archive
        type: io.kestra.plugin.aws.cli.AwsCLI
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        commands:
          - aws s3 mv s3://{{ vars.bucket }}/{{ vars.prefix }}/ s3://{{
            vars.bucket }}/archive/{{ vars.prefix }}/ --recursive
triggers:
  - id: hourly_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    disabled: true
    cron: "@hourly"
About this blueprint
Python CLI Git AWS Kestra
This scheduled flow checks for new files in a given S3 bucket every hour. If new files have been detected, the flow will:
- Download the raw CSV files from S3
- Read those CSV files as a dataframe, use Pandas to clean the data and ingest it into the S3 data lake managed by Iceberg and AWS Glue
- Move already processed files to the archive/folder in the same S3 bucket
