Source
id: advanced-scheduling
namespace: company.team
inputs:
  - id: country
    type: STRING
    defaults: US
  - id: date
    type: DATETIME
    required: false
    defaults: 2023-12-22T14:00:00.000Z
tasks:
  - id: check_if_business_date
    type: io.kestra.plugin.scripts.python.Commands
    namespaceFiles:
      enabled: true
    commands:
      - python schedule.py "{{trigger.date ?? inputs.date}}" {{inputs.country}}
    beforeCommands:
      - pip install workalendar
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
  - id: log
    type: io.kestra.plugin.core.log.Log
    message: business day - continuing the flow...
triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: 0 14 25 12 *
About this blueprint
Python Kestra
This flow will run one or more tasks only on business days for a specific country. The Python script in this flow checks if the date is a business day for the specified country. If it is, the flow continues to the next task. If it is not, the task fails, blocking the execution of subsequent tasks.
The Pebble expression "{{trigger.date ?? inputs.date}}" will make sure
that the flow will use either the schedule date from the trigger or the date
provided on the date input at runtime. You can use the inputs to test the
logic.
Make sure adjust the Python script to match your desired country.
To add the Python script, go to the VS Code Editor in the Kestra UI and add
the script schedule.py as a Namespace File:
import sys
from datetime import datetime
from workalendar.europe import France  # Import calendars for specific countries
from workalendar.usa import UnitedStates  # Example for another country
def is_business_day(date_str, country_calendar):
    # Remove 'Z' from kestra's timezone-specific timestamp and parse the date string
    date = datetime.fromisoformat(date_str.replace("Z", ""))
    # Check if the date is a business day
    return country_calendar.is_working_day(date)
def main():
    if len(sys.argv) != 3:
        print("Usage: script.py <date> <country_code>")
        sys.exit(1)
    date_str, country_code = sys.argv[1], sys.argv[2]
    # Dictionary mapping country codes to calendar objects
    calendars = {
        "FR": France(),
        "US": UnitedStates(),
    }
    country_calendar = calendars.get(country_code.upper())
    if not country_calendar:
        print(f"Calendar for '{country_code}' not supported or not found.")
        sys.exit(1)
    try:
        if not is_business_day(date_str, country_calendar):
            print(f"{date_str} is not a business day in {country_code}.")
            sys.exit(1)
        else:
            print(f"{date_str} is a business day in {country_code}.")
    except Exception as e:
        print(f"Error: {e}")
        sys.exit(1)
if __name__ == "__main__":
    main()
More Related Blueprints

